mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 01:53:41 +08:00
Fix memory leak with INSERT into compressed hypertable
We used to allocate some temporary data in the ExecutorContext.
This commit is contained in:
parent
7e4ebd131f
commit
1b65297ff7
@ -159,7 +159,9 @@ chunk_dispatch_exec(CustomScanState *node)
|
||||
#endif
|
||||
Assert(ts_cm_functions->compress_row_exec != NULL);
|
||||
TupleTableSlot *orig_slot = slot;
|
||||
old = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||
slot = ts_cm_functions->compress_row_exec(cis->compress_info->compress_state, slot);
|
||||
MemoryContextSwitchTo(old);
|
||||
/* If we have cagg defined on the hypertable, we have to execute
|
||||
* the function that records invalidations directly as AFTER ROW
|
||||
* triggers do not work with compressed chunks.
|
||||
|
@ -815,6 +815,11 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
|
||||
destroy_on_conflict_state(state);
|
||||
ExecCloseIndices(state->result_relation_info);
|
||||
|
||||
/*
|
||||
* The chunk search functions may leak memory, so switch to a temporary
|
||||
* memory context.
|
||||
*/
|
||||
MemoryContext old_context = MemoryContextSwitchTo(GetPerTupleMemoryContext(state->estate));
|
||||
if (state->compress_info)
|
||||
{
|
||||
ResultRelInfo *orig_chunk_rri = state->compress_info->orig_result_relation_info;
|
||||
@ -838,6 +843,7 @@ ts_chunk_insert_state_destroy(ChunkInsertState *state)
|
||||
if (ts_chunk_is_compressed(chunk) && (!ts_chunk_is_unordered(chunk)))
|
||||
ts_chunk_set_unordered(chunk);
|
||||
}
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
||||
table_close(state->rel, NoLock);
|
||||
if (state->slot)
|
||||
|
@ -15,17 +15,17 @@ NOTICE: adding not-null constraint to column "date"
|
||||
(1 row)
|
||||
|
||||
-- This is where we log the memory usage.
|
||||
create table portal_memory_log(id serial, bytes int);
|
||||
create table portal_memory_log(id serial, bytes bigint);
|
||||
-- Returns the amount of memory currently allocated in a given
|
||||
-- memory context. Only works for PortalContext, and doesn't work for PG 12.
|
||||
create or replace function ts_debug_allocated_bytes(text) returns int
|
||||
create or replace function ts_debug_allocated_bytes(text) returns bigint
|
||||
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
|
||||
language c strict volatile;
|
||||
-- Log current memory usage into the log table.
|
||||
create function log_memory() returns trigger as $$
|
||||
begin
|
||||
insert into portal_memory_log
|
||||
values (default, (select ts_debug_allocated_bytes('PortalContext')));
|
||||
values (default, ts_debug_allocated_bytes('PortalContext'));
|
||||
return new;
|
||||
end;
|
||||
$$ language plpgsql;
|
||||
|
@ -13,11 +13,11 @@ create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2
|
||||
select create_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '90 day');
|
||||
|
||||
-- This is where we log the memory usage.
|
||||
create table portal_memory_log(id serial, bytes int);
|
||||
create table portal_memory_log(id serial, bytes bigint);
|
||||
|
||||
-- Returns the amount of memory currently allocated in a given
|
||||
-- memory context. Only works for PortalContext, and doesn't work for PG 12.
|
||||
create or replace function ts_debug_allocated_bytes(text) returns int
|
||||
create or replace function ts_debug_allocated_bytes(text) returns bigint
|
||||
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
|
||||
language c strict volatile;
|
||||
|
||||
@ -25,7 +25,7 @@ create or replace function ts_debug_allocated_bytes(text) returns int
|
||||
create function log_memory() returns trigger as $$
|
||||
begin
|
||||
insert into portal_memory_log
|
||||
values (default, (select ts_debug_allocated_bytes('PortalContext')));
|
||||
values (default, ts_debug_allocated_bytes('PortalContext'));
|
||||
return new;
|
||||
end;
|
||||
$$ language plpgsql;
|
||||
|
@ -243,15 +243,19 @@ TS_FUNCTION_INFO_V1(ts_debug_allocated_bytes);
|
||||
Datum
|
||||
ts_debug_allocated_bytes(PG_FUNCTION_ARGS)
|
||||
{
|
||||
MemoryContext context = NULL;
|
||||
char *context_name = text_to_cstring(PG_GETARG_TEXT_PP(0));
|
||||
if (strcmp(context_name, "PortalContext") == 0)
|
||||
{
|
||||
#if PG13_GE
|
||||
PG_RETURN_UINT64(MemoryContextMemAllocated(PortalContext, /* recurse = */ true));
|
||||
#else
|
||||
/* Don't have this function on PG 12. */
|
||||
PG_RETURN_UINT64(1);
|
||||
#endif
|
||||
context = PortalContext;
|
||||
}
|
||||
else if (strcmp(context_name, "CacheMemoryContext") == 0)
|
||||
{
|
||||
context = CacheMemoryContext;
|
||||
}
|
||||
else if (strcmp(context_name, "TopMemoryContext") == 0)
|
||||
{
|
||||
context = TopMemoryContext;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -261,4 +265,12 @@ ts_debug_allocated_bytes(PG_FUNCTION_ARGS)
|
||||
context_name)));
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
#if !PG13_GE
|
||||
/* Don't have this function on PG 12. */
|
||||
(void) context;
|
||||
PG_RETURN_UINT64(1);
|
||||
#else
|
||||
PG_RETURN_UINT64(MemoryContextMemAllocated(context, /* recurse = */ true));
|
||||
#endif
|
||||
}
|
||||
|
@ -1965,6 +1965,8 @@ compress_singlerow(CompressSingleRowState *cr, TupleTableSlot *in_slot)
|
||||
out_isnull[column->min_metadata_attr_offset] = true;
|
||||
out_isnull[column->max_metadata_attr_offset] = true;
|
||||
}
|
||||
|
||||
segment_meta_min_max_builder_reset(column->min_max_metadata_builder);
|
||||
}
|
||||
}
|
||||
/* if there is no compressor, this must be a segmenter */
|
||||
|
97
tsl/test/expected/insert_memory_usage.out
Normal file
97
tsl/test/expected/insert_memory_usage.out
Normal file
@ -0,0 +1,97 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
-- Test that transaction memory usage with COPY doesn't grow.
|
||||
-- We need memory usage in PortalContext after the completion of the query, so
|
||||
-- we'll have to log it from a trigger that runs after the query is completed.
|
||||
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
|
||||
create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint);
|
||||
-- Aim to about 100 partitions, the data is from 1995 to 2022.
|
||||
select create_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '90 day');
|
||||
NOTICE: adding not-null constraint to column "date"
|
||||
create_hypertable
|
||||
----------------------------
|
||||
(1,public,uk_price_paid,t)
|
||||
(1 row)
|
||||
|
||||
-- This is where we log the memory usage.
|
||||
create table portal_memory_log(id serial, bytes bigint);
|
||||
-- Returns the amount of memory currently allocated in a given
|
||||
-- memory context. Only works for PortalContext, and doesn't work for PG 12.
|
||||
create or replace function ts_debug_allocated_bytes(text) returns bigint
|
||||
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
|
||||
language c strict volatile;
|
||||
-- Log current memory usage into the log table.
|
||||
create function log_memory() returns trigger as $$
|
||||
begin
|
||||
insert into portal_memory_log
|
||||
values (default, ts_debug_allocated_bytes('PortalContext'));
|
||||
return new;
|
||||
end;
|
||||
$$ language plpgsql;
|
||||
-- Add a trigger that runs after completion of each INSERT/COPY and logs the
|
||||
-- current memory usage.
|
||||
create trigger check_update after insert on uk_price_paid
|
||||
for each statement execute function log_memory();
|
||||
-- Memory leaks often happen on cache invalidation, so make sure they are
|
||||
-- invalidated often and independently (at co-prime periods).
|
||||
set timescaledb.max_open_chunks_per_insert = 2;
|
||||
set timescaledb.max_cached_chunks_per_hypertable = 3;
|
||||
-- Try increasingly larger data sets by concatenating the same file multiple
|
||||
-- times. First, INSERT into an uncompressed table.
|
||||
create table uk_price_paid_one(like uk_price_paid);
|
||||
\copy uk_price_paid_one from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz)"';
|
||||
truncate uk_price_paid;
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
truncate portal_memory_log;
|
||||
alter sequence portal_memory_log_id_seq restart with 1;
|
||||
-- Don't use joins here because they might materialize a subquery which will
|
||||
-- lead to weird memory usage changes.
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
-- Use linear regression to check that the memory usage doesn't significanlty
|
||||
-- increase with increasing the insert size.
|
||||
-- Note that there's going to be some linear planning and parsing overhead for
|
||||
-- bigger queries, unlike the COPY. It can constitute a sizable percentage of
|
||||
-- the total memory usage, so we'll check the absolute value as well.
|
||||
\set parsing_overhead_bytes 50000
|
||||
select * from portal_memory_log where (
|
||||
select regr_slope(bytes, id - 1) / regr_intercept(bytes, id - 1)::float > 0.05
|
||||
and regr_slope(bytes, id - 1) > :parsing_overhead_bytes
|
||||
from portal_memory_log
|
||||
);
|
||||
id | bytes
|
||||
----+-------
|
||||
(0 rows)
|
||||
|
||||
-- INSERT into a compressed table.
|
||||
truncate uk_price_paid;
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
alter table uk_price_paid set (timescaledb.compress, timescaledb.compress_orderby = 'date');
|
||||
select count(compress_chunk(chunk)) from show_chunks('uk_price_paid') chunk;
|
||||
count
|
||||
-------
|
||||
111
|
||||
(1 row)
|
||||
|
||||
truncate portal_memory_log;
|
||||
alter sequence portal_memory_log_id_seq restart with 1;
|
||||
-- Don't use joins here because they might materialize a subquery which will
|
||||
-- lead to weird memory usage changes.
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
select * from portal_memory_log where (
|
||||
select regr_slope(bytes, id - 1) / regr_intercept(bytes, id - 1)::float > 0.05
|
||||
and regr_slope(bytes, id - 1) > :parsing_overhead_bytes
|
||||
from portal_memory_log
|
||||
);
|
||||
id | bytes
|
||||
----+-------
|
||||
(0 rows)
|
||||
|
@ -85,6 +85,7 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
dist_util.sql
|
||||
dist_triggers.sql
|
||||
dist_backup.sql
|
||||
insert_memory_usage.sql
|
||||
read_only.sql
|
||||
remote_connection_cache.sql
|
||||
remote_connection.sql
|
||||
|
96
tsl/test/sql/insert_memory_usage.sql
Normal file
96
tsl/test/sql/insert_memory_usage.sql
Normal file
@ -0,0 +1,96 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
-- Test that transaction memory usage with COPY doesn't grow.
|
||||
-- We need memory usage in PortalContext after the completion of the query, so
|
||||
-- we'll have to log it from a trigger that runs after the query is completed.
|
||||
|
||||
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
|
||||
|
||||
create table uk_price_paid(price integer, "date" date, postcode1 text, postcode2 text, type smallint, is_new bool, duration smallint, addr1 text, addr2 text, street text, locality text, town text, district text, country text, category smallint);
|
||||
-- Aim to about 100 partitions, the data is from 1995 to 2022.
|
||||
select create_hypertable('uk_price_paid', 'date', chunk_time_interval => interval '90 day');
|
||||
|
||||
-- This is where we log the memory usage.
|
||||
create table portal_memory_log(id serial, bytes bigint);
|
||||
|
||||
-- Returns the amount of memory currently allocated in a given
|
||||
-- memory context. Only works for PortalContext, and doesn't work for PG 12.
|
||||
create or replace function ts_debug_allocated_bytes(text) returns bigint
|
||||
as :MODULE_PATHNAME, 'ts_debug_allocated_bytes'
|
||||
language c strict volatile;
|
||||
|
||||
-- Log current memory usage into the log table.
|
||||
create function log_memory() returns trigger as $$
|
||||
begin
|
||||
insert into portal_memory_log
|
||||
values (default, ts_debug_allocated_bytes('PortalContext'));
|
||||
return new;
|
||||
end;
|
||||
$$ language plpgsql;
|
||||
|
||||
-- Add a trigger that runs after completion of each INSERT/COPY and logs the
|
||||
-- current memory usage.
|
||||
create trigger check_update after insert on uk_price_paid
|
||||
for each statement execute function log_memory();
|
||||
|
||||
-- Memory leaks often happen on cache invalidation, so make sure they are
|
||||
-- invalidated often and independently (at co-prime periods).
|
||||
set timescaledb.max_open_chunks_per_insert = 2;
|
||||
set timescaledb.max_cached_chunks_per_hypertable = 3;
|
||||
|
||||
-- Try increasingly larger data sets by concatenating the same file multiple
|
||||
-- times. First, INSERT into an uncompressed table.
|
||||
create table uk_price_paid_one(like uk_price_paid);
|
||||
\copy uk_price_paid_one from program 'bash -c "cat <(zcat < data/prices-10k-random-1.tsv.gz)"';
|
||||
|
||||
truncate uk_price_paid;
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
|
||||
truncate portal_memory_log;
|
||||
alter sequence portal_memory_log_id_seq restart with 1;
|
||||
|
||||
-- Don't use joins here because they might materialize a subquery which will
|
||||
-- lead to weird memory usage changes.
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
|
||||
-- Use linear regression to check that the memory usage doesn't significanlty
|
||||
-- increase with increasing the insert size.
|
||||
-- Note that there's going to be some linear planning and parsing overhead for
|
||||
-- bigger queries, unlike the COPY. It can constitute a sizable percentage of
|
||||
-- the total memory usage, so we'll check the absolute value as well.
|
||||
\set parsing_overhead_bytes 50000
|
||||
select * from portal_memory_log where (
|
||||
select regr_slope(bytes, id - 1) / regr_intercept(bytes, id - 1)::float > 0.05
|
||||
and regr_slope(bytes, id - 1) > :parsing_overhead_bytes
|
||||
from portal_memory_log
|
||||
);
|
||||
|
||||
|
||||
-- INSERT into a compressed table.
|
||||
truncate uk_price_paid;
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
alter table uk_price_paid set (timescaledb.compress, timescaledb.compress_orderby = 'date');
|
||||
select count(compress_chunk(chunk)) from show_chunks('uk_price_paid') chunk;
|
||||
|
||||
truncate portal_memory_log;
|
||||
alter sequence portal_memory_log_id_seq restart with 1;
|
||||
|
||||
-- Don't use joins here because they might materialize a subquery which will
|
||||
-- lead to weird memory usage changes.
|
||||
insert into uk_price_paid select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
insert into uk_price_paid select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one union all select * from uk_price_paid_one;
|
||||
|
||||
select * from portal_memory_log where (
|
||||
select regr_slope(bytes, id - 1) / regr_intercept(bytes, id - 1)::float > 0.05
|
||||
and regr_slope(bytes, id - 1) > :parsing_overhead_bytes
|
||||
from portal_memory_log
|
||||
);
|
Loading…
x
Reference in New Issue
Block a user