From 2ecb53e7bb3a38b94d0d4f8510b990c028dd7879 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Tue, 1 Dec 2020 14:28:34 +0100 Subject: [PATCH] Improve memory handling for remote COPY This change improves memory usage in the `COPY` code used for distributed hypertables. The following issues have been addressed: * `PGresult` objects were not cleared, leading to memory leaks. * The caching of chunk connections didn't work since the lookup compared ephemeral chunk pointers instead of chunk IDs. The effect was that cached chunk connection state was reallocated every time instead of being reused. This likely also caused worse performance. To address these issues, the following changes are made: * All `PGresult` objects are now cleared with `PQclear`. * Lookup for chunk connections now compares chunk IDs instead of chunk pointers. * The per-tuple memory context is moved the to the outer processing loop to ensure that everything in the loop is allocated on the per-tuple memory context, which is also reset at every iteration of the loop. * The use of memory contexts is also simplified to have only one memory context for state that should survive across resets of the per-tuple memory context. Fixes #2677 --- src/copy.c | 6 +- src/cross_module_fn.c | 7 +- src/cross_module_fn.h | 3 +- tsl/src/remote/dist_copy.c | 268 +++++++++++++++++++----------- tsl/src/remote/dist_copy.h | 3 +- tsl/test/expected/remote_copy.out | 10 +- 6 files changed, 182 insertions(+), 115 deletions(-) diff --git a/src/copy.c b/src/copy.c index aeab5d39c..2e63dc769 100644 --- a/src/copy.c +++ b/src/copy.c @@ -441,8 +441,6 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, void (*call /* Close any trigger target relations */ ExecCleanUpTriggerState(estate); - copy_chunk_state_destroy(ccstate); - /* * If we skipped writing WAL, then we need to sync the heap (but not * indexes since those use WAL anyway) @@ -663,10 +661,11 @@ timescaledb_DoCopy(const CopyStmt *stmt, const char *queryString, uint64 *proces ccstate->where_clause = where_clause; if (hypertable_is_distributed(ht)) - ts_cm_functions->distributed_copy(stmt, processed, ccstate, attnums); + *processed = ts_cm_functions->distributed_copy(stmt, ccstate, attnums); else *processed = copyfrom(ccstate, pstate->p_rtable, ht, CopyFromErrorCallback, cstate); + copy_chunk_state_destroy(ccstate); EndCopyFrom(cstate); free_parsestate(pstate); table_close(rel, NoLock); @@ -732,6 +731,7 @@ timescaledb_move_from_table_to_chunks(Hypertable *ht, LOCKMODE lockmode) scandesc = table_beginscan(rel, snapshot, 0, NULL); ccstate = copy_chunk_state_create(ht, rel, next_copy_from_table_to_chunks, NULL, scandesc); copyfrom(ccstate, pstate->p_rtable, ht, copy_table_to_chunk_error_callback, scandesc); + copy_chunk_state_destroy(ccstate); heap_endscan(scandesc); UnregisterSnapshot(snapshot); table_close(rel, lockmode); diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 445a2f042..2fb315e93 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -236,11 +236,12 @@ data_node_dispatch_path_create_default(PlannerInfo *root, ModifyTablePath *mtpat return NULL; } -static void -distributed_copy_default(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, - List *attnums) +static uint64 +distributed_copy_default(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums) { error_no_default_fn_community(); + + return 0; } static bool diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 5660f3d77..f26fff24b 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -139,8 +139,7 @@ typedef struct CrossModuleFunctions void (*create_chunk_on_data_nodes)(Chunk *chunk, Hypertable *ht); Path *(*data_node_dispatch_path_create)(PlannerInfo *root, ModifyTablePath *mtpath, Index hypertable_rti, int subpath_index); - void (*distributed_copy)(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, - List *attnums); + uint64 (*distributed_copy)(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums); bool (*set_distributed_id)(Datum id); void (*set_distributed_peer_id)(Datum id); bool (*is_frontend_session)(void); diff --git a/tsl/src/remote/dist_copy.c b/tsl/src/remote/dist_copy.c index bcb5042b0..d5eb137b7 100644 --- a/tsl/src/remote/dist_copy.c +++ b/tsl/src/remote/dist_copy.c @@ -38,7 +38,7 @@ */ typedef struct ChunkConnectionList { - Chunk *chunk; + int32 chunk_id; List *connections; } ChunkConnectionList; @@ -63,7 +63,6 @@ typedef struct CopyConnectionState List *cached_connections; List *connections_in_use; bool using_binary; - MemoryContext mctx; const char *outgoing_copy_cmd; } CopyConnectionState; @@ -71,12 +70,10 @@ typedef struct CopyConnectionState */ typedef struct TextCopyContext { - MemoryContext orig_context; int ndimensions; CopyDimensionInfo *dimensions; char delimiter; char *null_string; - MemoryContext tuple_context; char **fields; int nfields; } TextCopyContext; @@ -85,9 +82,8 @@ typedef struct TextCopyContext */ typedef struct BinaryCopyContext { - MemoryContext orig_context; + ExprContext *econtext; FmgrInfo *out_functions; - EState *estate; Datum *values; bool *nulls; } BinaryCopyContext; @@ -98,8 +94,10 @@ typedef struct RemoteCopyContext { /* Operation data */ CopyConnectionState connection_state; + CopyChunkState *ccstate; void *data_context; /* TextCopyContext or BinaryCopyContext */ bool binary_operation; + MemoryContext mctx; /* MemoryContext that holds the RemoteCopyContext */ /* Data for the current read row */ StringInfo row_data; @@ -112,7 +110,7 @@ typedef struct RemoteCopyContext static CopyDimensionInfo * generate_copy_dimensions(Dimension *dims, int ndimensions, List *attnums, Hypertable *ht) { - CopyDimensionInfo *result = palloc(ndimensions * sizeof(CopyDimensionInfo)); + CopyDimensionInfo *result = palloc0(ndimensions * sizeof(CopyDimensionInfo)); int idx; for (idx = 0; idx < ndimensions; ++idx) @@ -254,6 +252,7 @@ static void start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *connection) { PGconn *pg_conn = remote_connection_get_pg_conn(connection); + if (PQisnonblocking(pg_conn)) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -261,12 +260,28 @@ start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *co if (!list_member_ptr(state->connections_in_use, connection)) { - PGresult *res = PQexec(pg_conn, state->outgoing_copy_cmd); + PGresult *volatile res = NULL; - if (PQresultStatus(res) != PGRES_COPY_IN) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("unable to start remote COPY on data node (%d)", PQresultStatus(res)))); + PG_TRY(); + { + res = PQexec(pg_conn, state->outgoing_copy_cmd); + + if (PQresultStatus(res) != PGRES_COPY_IN) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("unable to start remote COPY on data node"), + errdetail("Remote command error: %s", PQresultErrorMessage(res)))); + + PQclear(res); + } + PG_CATCH(); + { + if (NULL != res) + PQclear(res); + + PG_RE_THROW(); + } + PG_END_TRY(); if (state->using_binary) send_binary_copy_header(pg_conn); @@ -276,15 +291,15 @@ start_remote_copy_on_new_connection(CopyConnectionState *state, TSConnection *co } static ChunkConnectionList * -create_connection_list_for_chunk(CopyConnectionState *state, Chunk *chunk) +create_connection_list_for_chunk(CopyConnectionState *state, const Chunk *chunk) { ChunkConnectionList *chunk_connections; ListCell *lc; - MemoryContext oldcontext = MemoryContextSwitchTo(state->mctx); - chunk_connections = palloc(sizeof(ChunkConnectionList)); - chunk_connections->chunk = ts_chunk_copy(chunk); + chunk_connections = palloc0(sizeof(ChunkConnectionList)); + chunk_connections->chunk_id = chunk->fd.id; chunk_connections->connections = NIL; + foreach (lc, chunk->data_nodes) { ChunkDataNode *cdn = lfirst(lc); @@ -295,7 +310,7 @@ create_connection_list_for_chunk(CopyConnectionState *state, Chunk *chunk) chunk_connections->connections = lappend(chunk_connections->connections, connection); } state->cached_connections = lappend(state->cached_connections, chunk_connections); - MemoryContextSwitchTo(oldcontext); + return chunk_connections; } @@ -306,54 +321,122 @@ send_end_binary_copy_data(PGconn *connection) return PQputCopyData(connection, (char *) &buf, sizeof(buf)); } +static char * +get_error_field_copy(PGresult *res, int fieldcode) +{ + const char *msg = PQresultErrorField(res, fieldcode); + + if (NULL == msg) + return NULL; + return pchomp(msg); +} + +static void +clear_results(const List *results, bool report_error) +{ + ListCell *lc; + const char *primary = NULL; + const char *detail = NULL; + const char *hint = NULL; + + if (NIL == results) + return; + + foreach (lc, results) + { + PGresult *res = lfirst(lc); + + if (PQresultStatus(res) != PGRES_COMMAND_OK && NULL == primary && report_error) + { + primary = get_error_field_copy(res, PG_DIAG_MESSAGE_PRIMARY); + detail = get_error_field_copy(res, PG_DIAG_MESSAGE_DETAIL); + hint = get_error_field_copy(res, PG_DIAG_MESSAGE_HINT); + } + + PQclear(res); + } + + if (NULL != primary) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("%s", primary), + detail == NULL ? 0 : errdetail("%s", detail), + hint == NULL ? 0 : errhint("%s", hint))); +} + static void finish_outstanding_copies(CopyConnectionState *state) { ListCell *lc; - List *results = NIL; + List *volatile results = NIL; + PGresult *volatile res = NULL; - foreach (lc, state->connections_in_use) + PG_TRY(); { - TSConnection *conn = lfirst(lc); - PGconn *pg_conn = remote_connection_get_pg_conn(conn); - PGresult PG_USED_FOR_ASSERTS_ONLY *res; + foreach (lc, state->connections_in_use) + { + TSConnection *conn = lfirst(lc); + PGconn *pg_conn = remote_connection_get_pg_conn(conn); - if (state->using_binary) - if (send_end_binary_copy_data(pg_conn) != 1) + if (state->using_binary) + if (send_end_binary_copy_data(pg_conn) != 1) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_EXCEPTION), + errmsg("%s", PQerrorMessage(pg_conn)))); + + if (PQputCopyEnd(pg_conn, NULL) == -1) ereport(ERROR, (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQerrorMessage(pg_conn)))); - if (PQputCopyEnd(pg_conn, NULL) == -1) - ereport(ERROR, - (errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQerrorMessage(pg_conn)))); + res = PQgetResult(pg_conn); + Assert(res != NULL); + results = lappend(results, res); - results = lappend(results, PQgetResult(pg_conn)); - /* Need to get result a second time to move the connection out of copy mode */ - res = PQgetResult(pg_conn); - if (res != NULL) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("COPY command resulted in unexpected state"))); + /* Need to get result a second time to move the connection out of copy mode */ + res = PQgetResult(pg_conn); + + if (res != NULL) + ereport(ERROR, + (errcode(ERRCODE_INTERNAL_ERROR), + errmsg("COPY command resulted in unexpected state"))); + } } + PG_CATCH(); + { + if (res != NULL) + PQclear(res); - foreach (lc, results) - if (PQresultStatus(lfirst(lc)) != PGRES_COMMAND_OK) - ereport(ERROR, - (errcode(ERRCODE_INTERNAL_ERROR), - errmsg("error during copy completion: %s", PQresultErrorMessage(lfirst(lc))))); + clear_results(results, false); + PG_RE_THROW(); + } + PG_END_TRY(); + + Assert(res == NULL); + clear_results(results, true); } static List * -get_connections_for_chunk(CopyConnectionState *state, Chunk *chunk) +get_connections_for_chunk(RemoteCopyContext *context, const Chunk *chunk) { + CopyConnectionState *state = &context->connection_state; + MemoryContext oldmctx; ListCell *lc; + List *conns; foreach (lc, state->cached_connections) - if (((ChunkConnectionList *) lfirst(lc))->chunk == chunk) - return ((ChunkConnectionList *) lfirst(lc))->connections; + { + ChunkConnectionList *chunkconns = lfirst(lc); - return create_connection_list_for_chunk(state, chunk)->connections; + if (chunkconns->chunk_id == chunk->fd.id) + return chunkconns->connections; + } + + oldmctx = MemoryContextSwitchTo(context->mctx); + conns = create_connection_list_for_chunk(state, chunk)->connections; + MemoryContextSwitchTo(oldmctx); + + return conns; } static bool @@ -566,16 +649,12 @@ validate_options(List *copy_options, char *delimiter, char **null_string) static TextCopyContext * generate_text_copy_context(const CopyStmt *stmt, Hypertable *ht, List *attnums) { - TextCopyContext *ctx = palloc(sizeof(TextCopyContext)); + TextCopyContext *ctx = palloc0(sizeof(TextCopyContext)); ctx->ndimensions = ht->space->num_dimensions; validate_options(stmt->options, &ctx->delimiter, &ctx->null_string); ctx->dimensions = generate_copy_dimensions(ht->space->dimensions, ctx->ndimensions, attnums, ht); - ctx->tuple_context = - AllocSetContextCreate(CurrentMemoryContext, "COPY", ALLOCSET_DEFAULT_SIZES); - - ctx->orig_context = MemoryContextSwitchTo(ctx->tuple_context); return ctx; } @@ -590,7 +669,7 @@ get_copy_conversion_functions(Hypertable *ht, List *copy_attnums, FmgrInfo **fun Relation rel = relation_open(ht->main_table_relid, AccessShareLock); TupleDesc tupDesc = RelationGetDescr(rel); - *functions = palloc(tupDesc->natts * sizeof(FmgrInfo)); + *functions = palloc0(tupDesc->natts * sizeof(FmgrInfo)); foreach (lc, copy_attnums) { int offset = AttrNumberGetAttrOffset(lfirst_int(lc)); @@ -607,36 +686,47 @@ get_copy_conversion_functions(Hypertable *ht, List *copy_attnums, FmgrInfo **fun } static BinaryCopyContext * -generate_binary_copy_context(const CopyStmt *stmt, Hypertable *ht, List *attnums) +generate_binary_copy_context(const CopyStmt *stmt, ExprContext *econtext, Hypertable *ht, + List *attnums) { - BinaryCopyContext *ctx = palloc(sizeof(BinaryCopyContext)); + BinaryCopyContext *ctx = palloc0(sizeof(BinaryCopyContext)); int columns = get_copy_conversion_functions(ht, attnums, &ctx->out_functions); - ctx->estate = CreateExecutorState(); - ctx->values = palloc(columns * sizeof(Datum)); - ctx->nulls = palloc(columns * sizeof(bool)); - ctx->orig_context = MemoryContextSwitchTo(GetPerTupleMemoryContext(ctx->estate)); + ctx->econtext = econtext; + ctx->values = palloc0(columns * sizeof(Datum)); + ctx->nulls = palloc0(columns * sizeof(bool)); + return ctx; } static RemoteCopyContext * -begin_remote_copy_operation(const CopyStmt *stmt, Hypertable *ht, List *attnums) +begin_remote_copy_operation(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums) { - RemoteCopyContext *context = palloc(sizeof(RemoteCopyContext)); bool binary_copy = copy_should_send_binary(); + MemoryContext mctx = + AllocSetContextCreate(CurrentMemoryContext, "Remote COPY", ALLOCSET_DEFAULT_SIZES); + Hypertable *ht = ccstate->dispatch->hypertable; + ExprContext *econtext = GetPerTupleExprContext(ccstate->estate); + RemoteCopyContext *context; + MemoryContext oldmctx; + oldmctx = MemoryContextSwitchTo(mctx); + context = palloc0(sizeof(RemoteCopyContext)); + context->ccstate = ccstate; + context->mctx = mctx; context->binary_operation = binary_copy; context->connection_state.connections_in_use = NIL; context->connection_state.cached_connections = NIL; - context->connection_state.mctx = CurrentMemoryContext; context->connection_state.using_binary = binary_copy; context->connection_state.outgoing_copy_cmd = deparse_copy_cmd(stmt, ht, binary_copy); if (binary_copy) - context->data_context = generate_binary_copy_context(stmt, ht, attnums); + context->data_context = generate_binary_copy_context(stmt, econtext, ht, attnums); else context->data_context = generate_text_copy_context(stmt, ht, attnums); + MemoryContextSwitchTo(oldmctx); + return context; } @@ -646,8 +736,6 @@ parse_next_text_row(CopyState cstate, List *attnums, TextCopyContext *ctx) StringInfo row_data; int i; - MemoryContextReset(ctx->tuple_context); - if (!NextCopyFromRawFields(cstate, &ctx->fields, &ctx->nfields)) return NULL; @@ -708,13 +796,11 @@ generate_binary_copy_data(Datum *values, bool *nulls, List *attnums, FmgrInfo *o static StringInfo parse_next_binary_row(CopyState cstate, List *attnums, BinaryCopyContext *ctx) { - ResetPerTupleExprContext(ctx->estate); - #if PG12_GE - if (!NextCopyFrom(cstate, GetPerTupleExprContext(ctx->estate), ctx->values, ctx->nulls)) + if (!NextCopyFrom(cstate, ctx->econtext, ctx->values, ctx->nulls)) return NULL; #else - if (!NextCopyFrom(cstate, GetPerTupleExprContext(ctx->estate), ctx->values, ctx->nulls, NULL)) + if (!NextCopyFrom(cstate, ctx->econtext, ctx->values, ctx->nulls, NULL)) return NULL; #endif @@ -780,10 +866,10 @@ reset_copy_connection_state(CopyConnectionState *state) state->connections_in_use = NIL; } -static Chunk * +static const Chunk * get_target_chunk(Hypertable *ht, Point *p, CopyConnectionState *state) { - Chunk *chunk = ts_hypertable_find_chunk_if_exists(ht, p); + const Chunk *chunk = ts_hypertable_find_chunk_if_exists(ht, p); if (chunk == NULL) { @@ -818,10 +904,11 @@ send_copy_data(StringInfo row_data, List *connections) } static void -process_and_send_copy_data(RemoteCopyContext *context, Hypertable *ht) +process_and_send_copy_data(RemoteCopyContext *context) { + Hypertable *ht = context->ccstate->dispatch->hypertable; + const Chunk *chunk; Point *point; - Chunk *chunk; List *connections; if (context->binary_operation) @@ -830,54 +917,38 @@ process_and_send_copy_data(RemoteCopyContext *context, Hypertable *ht) point = get_current_point_for_text_copy(ht, context->data_context); chunk = get_target_chunk(ht, point, &context->connection_state); - connections = get_connections_for_chunk(&context->connection_state, chunk); + connections = get_connections_for_chunk(context, chunk); send_copy_data(context->row_data, connections); } -static void -cleanup_text_copy_context(TextCopyContext *ctx) -{ - MemoryContextSwitchTo(ctx->orig_context); - MemoryContextDelete(ctx->tuple_context); -} - -static void -cleanup_binary_copy_context(BinaryCopyContext *ctx) -{ - MemoryContextSwitchTo(ctx->orig_context); - FreeExecutorState(ctx->estate); -} - static void end_copy_operation(RemoteCopyContext *context) { finish_outstanding_copies(&context->connection_state); - if (context->binary_operation) - cleanup_binary_copy_context(context->data_context); - else - cleanup_text_copy_context(context->data_context); + MemoryContextDelete(context->mctx); } -void -remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, CopyChunkState *ccstate, - List *attnums) +uint64 +remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums) { - Hypertable *ht = ccstate->dispatch->hypertable; - RemoteCopyContext *context = begin_remote_copy_operation(stmt, ht, attnums); - - *processed = 0; + MemoryContext oldmctx = CurrentMemoryContext; + RemoteCopyContext *context = begin_remote_copy_operation(stmt, ccstate, attnums); + uint64 processed = 0; PG_TRY(); { while (true) { + ResetPerTupleExprContext(ccstate->estate); + MemoryContextSwitchTo(GetPerTupleMemoryContext(ccstate->estate)); + CHECK_FOR_INTERRUPTS(); if (!read_next_copy_row(context, ccstate->cstate, attnums)) break; - process_and_send_copy_data(context, ht); - ++*processed; + process_and_send_copy_data(context); + ++processed; } } PG_CATCH(); @@ -890,4 +961,7 @@ remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, CopyChunkState PG_END_TRY(); end_copy_operation(context); + MemoryContextSwitchTo(oldmctx); + + return processed; } diff --git a/tsl/src/remote/dist_copy.h b/tsl/src/remote/dist_copy.h index dde2af6c4..2aff7d7fb 100644 --- a/tsl/src/remote/dist_copy.h +++ b/tsl/src/remote/dist_copy.h @@ -12,7 +12,6 @@ typedef struct Hypertable Hypertable; typedef struct CopyChunkState CopyChunkState; -extern void remote_distributed_copy(const CopyStmt *stmt, uint64 *processed, - CopyChunkState *ccstate, List *attnums); +extern uint64 remote_distributed_copy(const CopyStmt *stmt, CopyChunkState *ccstate, List *attnums); #endif /* TIMESCALEDB_TSL_REMOTE_DIST_COPY_H */ diff --git a/tsl/test/expected/remote_copy.out b/tsl/test/expected/remote_copy.out index 064b2a509..ea4027152 100644 --- a/tsl/test/expected/remote_copy.out +++ b/tsl/test/expected/remote_copy.out @@ -66,15 +66,9 @@ ERROR: unable to use default value for partitioning column "pH" -- Missing required column, these generate a WARNING with a transaction id in them (too flimsy to output) SET client_min_messages TO ERROR; COPY "+ri(k33_')" (thyme, flavor, "pH") FROM STDIN WITH DELIMITER ','; -ERROR: error during copy completion: ERROR: null value in column "))_" violates not-null constraint -DETAIL: Failing row contains (5, null, blue, 2, null). -CONTEXT: COPY +ri(k33_'), line 1: "5,blue,2.0" - +ERROR: null value in column "))_" violates not-null constraint COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ','; -ERROR: error during copy completion: ERROR: null value in column "))_" violates not-null constraint -DETAIL: Failing row contains (5, null, blue, 1, blah). -CONTEXT: COPY +ri(k33_'), line 1: "5,\N,blue,1,blah" - +ERROR: null value in column "))_" violates not-null constraint SET client_min_messages TO INFO; -- Invalid data after new chunk creation, data and chunks should be rolled back COPY "+ri(k33_')" FROM STDIN WITH DELIMITER ',';