Reduce memory usage for distributed analyze

Use a per-tuple memory context when receiving chunk statistics from
data nodes. Otherwise memory usage is proportional to the number of
chunks and columns.
This commit is contained in:
Erik Nordström 2023-05-03 14:06:25 +02:00 committed by Erik Nordström
parent 96d2acea30
commit abb6762450
4 changed files with 84 additions and 30 deletions

View File

@ -33,6 +33,7 @@ accidentally triggering the load of a previous DB version.**
* #5613 Quote username identifier appropriately
* #5525 Fix tablespace for compressed hypertable and corresponding toast
* #5642 Fix ALTER TABLE SET with normal tables
* #5666 Reduce memory usage for distributed analyze
**Thanks**
* @kovetskiy and @DZDomi for reporting peformance regression in Realtime Continuous Aggregates

View File

@ -25,6 +25,7 @@
#include <utils/builtins.h>
#include <utils/jsonb.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/syscache.h>
#include "compat/compat.h"
@ -36,6 +37,7 @@
#include "hypercube.h"
#include "hypertable_cache.h"
#include "utils.h"
#include "deparse.h"
#include "remote/async.h"
#include "remote/dist_txn.h"
@ -1092,17 +1094,18 @@ typedef struct ChunkAttKey
typedef struct StatsProcessContext
{
HTAB *htab;
MemoryContext per_tuple_mcxt;
} StatsProcessContext;
static void
stats_process_context_init(StatsProcessContext *ctx, long nstats)
{
HASHCTL ctl;
MemSet(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(ChunkAttKey);
ctl.entrysize = sizeof(ChunkAttKey);
ctx->per_tuple_mcxt =
AllocSetContextCreate(CurrentMemoryContext, "Distributed ANALYZE", ALLOCSET_DEFAULT_SIZES);
ctl.hcxt = CurrentMemoryContext;
ctx->htab =
hash_create("StatsProcessContext", nstats, &ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
@ -1133,6 +1136,7 @@ static void
stats_process_context_finish(StatsProcessContext *ctx)
{
hash_destroy(ctx->htab);
MemoryContextDelete(ctx->per_tuple_mcxt);
}
static void
@ -1222,6 +1226,7 @@ chunk_process_remote_colstats_row(StatsProcessContext *ctx, TupleFactory *tf, Tu
}
op_oids[i] = convert_strings_to_op_id(strings);
d = values[AttrNumberGetAttrOffset(Anum_chunk_colstats_slot1_numbers) + i];
if (DatumGetPointer(d) != NULL)
@ -1247,6 +1252,7 @@ chunk_process_remote_colstats_row(StatsProcessContext *ctx, TupleFactory *tf, Tu
Assert(!isnull);
++vt_idx;
}
valtype_oids[i] = convert_strings_to_type_id(strings);
}
}
@ -1355,12 +1361,12 @@ fetch_remote_chunk_stats(Hypertable *ht, FunctionCallInfo fcinfo, bool col_stats
{
StatsProcessContext statsctx;
List *data_nodes;
DistCmdResult *cmdres;
TupleDesc tupdesc;
TupleFactory *tf;
Size i;
long num_rows;
long num_stats;
MemoryContext old_mcxt;
const char *sqlcmd;
AsyncRequestSet *rs;
ListCell *lc;
Assert(hypertable_is_distributed(ht));
@ -1370,45 +1376,81 @@ fetch_remote_chunk_stats(Hypertable *ht, FunctionCallInfo fcinfo, bool col_stats
errmsg("function returning record called in context "
"that cannot accept type record")));
sqlcmd = deparse_func_call(fcinfo);
rs = async_request_set_create();
data_nodes = ts_hypertable_get_data_node_name_list(ht);
cmdres = ts_dist_cmd_invoke_func_call_on_data_nodes(fcinfo, data_nodes);
/* Expect TEXT response format since dist command API currently defaults
* to requesting TEXT */
foreach (lc, data_nodes)
{
const char *node_name = lfirst(lc);
AsyncRequest *req;
TSConnection *connection =
data_node_get_connection(node_name, REMOTE_TXN_NO_PREP_STMT, true);
req = async_request_send(connection, sqlcmd);
/* Set single-row mode in order to reduce memory usage in case result
* set is big */
async_request_set_single_row_mode(req);
async_request_attach_user_data(req, (char *) node_name);
async_request_set_add(rs, req);
}
tf = tuplefactory_create_for_tupdesc(tupdesc, true);
num_rows = ts_dist_cmd_total_row_count(cmdres);
/* Estimate the number of non-duplicate stats to use for initial size of
/* Assume 500 non-duplicate stats to use for initial size of
* StatsProcessContext. Use slightly bigger than strictly necessary to
* avoid a resize. */
num_stats = (5 * num_rows) / (ht->fd.replication_factor * 4);
stats_process_context_init(&statsctx, 500);
old_mcxt = MemoryContextSwitchTo(statsctx.per_tuple_mcxt);
stats_process_context_init(&statsctx, num_stats);
for (i = 0; /* exit when res == NULL below */; i++)
for (;;)
{
AsyncResponseResult *ar;
PGresult *res;
const char *node_name;
int row;
int ntuples;
res = ts_dist_cmd_get_result_by_index(cmdres, i, &node_name);
MemoryContextReset(statsctx.per_tuple_mcxt);
ar = async_request_set_wait_any_result(rs);
if (NULL == res)
if (NULL == ar)
break;
if (col_stats)
for (row = 0; row < PQntuples(res); row++)
chunk_process_remote_colstats_row(&statsctx, tf, tupdesc, res, row, node_name);
else
for (row = 0; row < PQntuples(res); row++)
chunk_process_remote_relstats_row(tf, tupdesc, res, row, node_name);
res = async_response_result_get_pg_result(ar);
Assert(res != NULL);
/* The result should be PGRES_SINGLE_TUPLE when tuples are returned
* and PGRES_TUPLES_OK when there are no new tuples to return. */
if (PQresultStatus(res) != PGRES_SINGLE_TUPLE && PQresultStatus(res) != PGRES_TUPLES_OK)
{
TSConnectionError err;
remote_connection_get_result_error(res, &err);
async_response_result_close(ar);
remote_connection_error_elog(&err, ERROR);
}
/* Should be in single-row mode, so either one row or none when
* done. */
ntuples = PQntuples(res);
Assert(ntuples == 1 || ntuples == 0);
if (ntuples == 1)
{
const char *node_name = async_response_result_get_user_data(ar);
Assert(node_name != NULL);
if (col_stats)
chunk_process_remote_colstats_row(&statsctx, tf, tupdesc, res, 0, node_name);
else
chunk_process_remote_relstats_row(tf, tupdesc, res, 0, node_name);
}
/* Early cleanup of PGresult protects against ballooning memory usage
* when there are a lot of rows */
ts_dist_cmd_clear_result_by_index(cmdres, i);
async_response_result_close(ar);
}
MemoryContextSwitchTo(old_mcxt);
stats_process_context_finish(&statsctx);
ts_dist_cmd_close_response(cmdres);
tuplefactory_destroy(tf);
}
static void *
@ -1468,7 +1510,6 @@ chunk_api_fetch_next_colstats_tuple(FuncCallContext *funcctx)
{
ColStatContext *ctx = funcctx->user_fctx;
HeapTuple tuple = NULL;
MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
while (tuple == NULL && ctx->chunk_oids != NIL)
{
@ -1487,12 +1528,13 @@ chunk_api_fetch_next_colstats_tuple(FuncCallContext *funcctx)
if (tuple == NULL)
{
MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
ctx->chunk_oids = list_delete_first(ctx->chunk_oids);
ctx->col_id = 1;
MemoryContextSwitchTo(oldcontext);
}
}
MemoryContextSwitchTo(oldcontext);
return tuple;
}

View File

@ -221,6 +221,15 @@ tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs)
return tuplefactory_create(NULL, ss, retrieved_attrs);
}
void
tuplefactory_destroy(TupleFactory *tf)
{
if (tf->temp_mctx)
MemoryContextDelete(tf->temp_mctx);
pfree(tf);
}
bool
tuplefactory_is_binary(TupleFactory *tf)
{
@ -252,8 +261,9 @@ tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row, int fo
ItemPointer ctid = NULL;
ListCell *lc;
int j;
int PG_USED_FOR_ASSERTS_ONLY ntuples = PQntuples(res);
Assert(row < PQntuples(res));
Assert(row < ntuples);
/* Install error callback */
if (tf->errcallback.callback != NULL)

View File

@ -21,6 +21,7 @@ typedef struct TupleDescData *TupleDesc;
extern TupleFactory *tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text);
extern TupleFactory *tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs);
extern TupleFactory *tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs);
extern void tuplefactory_destroy(TupleFactory *tf);
extern HeapTuple tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format);
extern ItemPointer tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row,
int format, Datum *values, bool *nulls);