From abb6762450fb90aae5536641fef85cfd3c75b510 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Wed, 3 May 2023 14:06:25 +0200 Subject: [PATCH] Reduce memory usage for distributed analyze Use a per-tuple memory context when receiving chunk statistics from data nodes. Otherwise memory usage is proportional to the number of chunks and columns. --- CHANGELOG.md | 1 + tsl/src/chunk_api.c | 100 ++++++++++++++++++++++++---------- tsl/src/remote/tuplefactory.c | 12 +++- tsl/src/remote/tuplefactory.h | 1 + 4 files changed, 84 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b4b0381..7d05c79a4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ accidentally triggering the load of a previous DB version.** * #5613 Quote username identifier appropriately * #5525 Fix tablespace for compressed hypertable and corresponding toast * #5642 Fix ALTER TABLE SET with normal tables +* #5666 Reduce memory usage for distributed analyze **Thanks** * @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates diff --git a/tsl/src/chunk_api.c b/tsl/src/chunk_api.c index 01144c356..8493f447f 100644 --- a/tsl/src/chunk_api.c +++ b/tsl/src/chunk_api.c @@ -25,6 +25,7 @@ #include #include #include +#include #include #include "compat/compat.h" @@ -36,6 +37,7 @@ #include "hypercube.h" #include "hypertable_cache.h" #include "utils.h" +#include "deparse.h" #include "remote/async.h" #include "remote/dist_txn.h" @@ -1092,17 +1094,18 @@ typedef struct ChunkAttKey typedef struct StatsProcessContext { HTAB *htab; + MemoryContext per_tuple_mcxt; } StatsProcessContext; static void stats_process_context_init(StatsProcessContext *ctx, long nstats) { HASHCTL ctl; - MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(ChunkAttKey); ctl.entrysize = sizeof(ChunkAttKey); - + ctx->per_tuple_mcxt = + AllocSetContextCreate(CurrentMemoryContext, "Distributed ANALYZE", ALLOCSET_DEFAULT_SIZES); ctl.hcxt = CurrentMemoryContext; ctx->htab = hash_create("StatsProcessContext", nstats, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); @@ -1133,6 +1136,7 @@ static void stats_process_context_finish(StatsProcessContext *ctx) { hash_destroy(ctx->htab); + MemoryContextDelete(ctx->per_tuple_mcxt); } static void @@ -1222,6 +1226,7 @@ chunk_process_remote_colstats_row(StatsProcessContext *ctx, TupleFactory *tf, Tu } op_oids[i] = convert_strings_to_op_id(strings); + d = values[AttrNumberGetAttrOffset(Anum_chunk_colstats_slot1_numbers) + i]; if (DatumGetPointer(d) != NULL) @@ -1247,6 +1252,7 @@ chunk_process_remote_colstats_row(StatsProcessContext *ctx, TupleFactory *tf, Tu Assert(!isnull); ++vt_idx; } + valtype_oids[i] = convert_strings_to_type_id(strings); } } @@ -1355,12 +1361,12 @@ fetch_remote_chunk_stats(Hypertable *ht, FunctionCallInfo fcinfo, bool col_stats { StatsProcessContext statsctx; List *data_nodes; - DistCmdResult *cmdres; TupleDesc tupdesc; TupleFactory *tf; - Size i; - long num_rows; - long num_stats; + MemoryContext old_mcxt; + const char *sqlcmd; + AsyncRequestSet *rs; + ListCell *lc; Assert(hypertable_is_distributed(ht)); @@ -1370,45 +1376,81 @@ fetch_remote_chunk_stats(Hypertable *ht, FunctionCallInfo fcinfo, bool col_stats errmsg("function returning record called in context " "that cannot accept type record"))); + sqlcmd = deparse_func_call(fcinfo); + rs = async_request_set_create(); data_nodes = ts_hypertable_get_data_node_name_list(ht); - cmdres = ts_dist_cmd_invoke_func_call_on_data_nodes(fcinfo, data_nodes); - /* Expect TEXT response format since dist command API currently defaults - * to requesting TEXT */ + foreach (lc, data_nodes) + { + const char *node_name = lfirst(lc); + AsyncRequest *req; + TSConnection *connection = + data_node_get_connection(node_name, REMOTE_TXN_NO_PREP_STMT, true); + req = async_request_send(connection, sqlcmd); + /* Set single-row mode in order to reduce memory usage in case result + * set is big */ + async_request_set_single_row_mode(req); + async_request_attach_user_data(req, (char *) node_name); + async_request_set_add(rs, req); + } + tf = tuplefactory_create_for_tupdesc(tupdesc, true); - num_rows = ts_dist_cmd_total_row_count(cmdres); - /* Estimate the number of non-duplicate stats to use for initial size of + + /* Assume 500 non-duplicate stats to use for initial size of * StatsProcessContext. Use slightly bigger than strictly necessary to * avoid a resize. */ - num_stats = (5 * num_rows) / (ht->fd.replication_factor * 4); + stats_process_context_init(&statsctx, 500); + old_mcxt = MemoryContextSwitchTo(statsctx.per_tuple_mcxt); - stats_process_context_init(&statsctx, num_stats); - - for (i = 0; /* exit when res == NULL below */; i++) + for (;;) { + AsyncResponseResult *ar; PGresult *res; - const char *node_name; - int row; + int ntuples; - res = ts_dist_cmd_get_result_by_index(cmdres, i, &node_name); + MemoryContextReset(statsctx.per_tuple_mcxt); + ar = async_request_set_wait_any_result(rs); - if (NULL == res) + if (NULL == ar) break; - if (col_stats) - for (row = 0; row < PQntuples(res); row++) - chunk_process_remote_colstats_row(&statsctx, tf, tupdesc, res, row, node_name); - else - for (row = 0; row < PQntuples(res); row++) - chunk_process_remote_relstats_row(tf, tupdesc, res, row, node_name); + res = async_response_result_get_pg_result(ar); + Assert(res != NULL); + + /* The result should be PGRES_SINGLE_TUPLE when tuples are returned + * and PGRES_TUPLES_OK when there are no new tuples to return. */ + if (PQresultStatus(res) != PGRES_SINGLE_TUPLE && PQresultStatus(res) != PGRES_TUPLES_OK) + { + TSConnectionError err; + remote_connection_get_result_error(res, &err); + async_response_result_close(ar); + remote_connection_error_elog(&err, ERROR); + } + + /* Should be in single-row mode, so either one row or none when + * done. */ + ntuples = PQntuples(res); + Assert(ntuples == 1 || ntuples == 0); + + if (ntuples == 1) + { + const char *node_name = async_response_result_get_user_data(ar); + Assert(node_name != NULL); + + if (col_stats) + chunk_process_remote_colstats_row(&statsctx, tf, tupdesc, res, 0, node_name); + else + chunk_process_remote_relstats_row(tf, tupdesc, res, 0, node_name); + } /* Early cleanup of PGresult protects against ballooning memory usage * when there are a lot of rows */ - ts_dist_cmd_clear_result_by_index(cmdres, i); + async_response_result_close(ar); } + MemoryContextSwitchTo(old_mcxt); stats_process_context_finish(&statsctx); - ts_dist_cmd_close_response(cmdres); + tuplefactory_destroy(tf); } static void * @@ -1468,7 +1510,6 @@ chunk_api_fetch_next_colstats_tuple(FuncCallContext *funcctx) { ColStatContext *ctx = funcctx->user_fctx; HeapTuple tuple = NULL; - MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); while (tuple == NULL && ctx->chunk_oids != NIL) { @@ -1487,12 +1528,13 @@ chunk_api_fetch_next_colstats_tuple(FuncCallContext *funcctx) if (tuple == NULL) { + MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); ctx->chunk_oids = list_delete_first(ctx->chunk_oids); ctx->col_id = 1; + MemoryContextSwitchTo(oldcontext); } } - MemoryContextSwitchTo(oldcontext); return tuple; } diff --git a/tsl/src/remote/tuplefactory.c b/tsl/src/remote/tuplefactory.c index ce2fead0d..c4be1781d 100644 --- a/tsl/src/remote/tuplefactory.c +++ b/tsl/src/remote/tuplefactory.c @@ -221,6 +221,15 @@ tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs) return tuplefactory_create(NULL, ss, retrieved_attrs); } +void +tuplefactory_destroy(TupleFactory *tf) +{ + if (tf->temp_mctx) + MemoryContextDelete(tf->temp_mctx); + + pfree(tf); +} + bool tuplefactory_is_binary(TupleFactory *tf) { @@ -252,8 +261,9 @@ tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row, int fo ItemPointer ctid = NULL; ListCell *lc; int j; + int PG_USED_FOR_ASSERTS_ONLY ntuples = PQntuples(res); - Assert(row < PQntuples(res)); + Assert(row < ntuples); /* Install error callback */ if (tf->errcallback.callback != NULL) diff --git a/tsl/src/remote/tuplefactory.h b/tsl/src/remote/tuplefactory.h index 4b13bfb52..78eff452d 100644 --- a/tsl/src/remote/tuplefactory.h +++ b/tsl/src/remote/tuplefactory.h @@ -21,6 +21,7 @@ typedef struct TupleDescData *TupleDesc; extern TupleFactory *tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text); extern TupleFactory *tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs); extern TupleFactory *tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs); +extern void tuplefactory_destroy(TupleFactory *tf); extern HeapTuple tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format); extern ItemPointer tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row, int format, Datum *values, bool *nulls);