Fix performance regressions in the copy code

In 8375b9aa536a619a5ac2644e0dae3c25880a4ead, a patch was added to handle
chunks closes during an ongoing copy operation. However, this patch
introduces a performance regression. All MultiInsertBuffers are deleted
after they are flushed. In this PR, the performance regression is fixed.
The most commonly used MultiInsertBuffers survive flushing.

The 51259b31c4c62b87228b059af0bbf28caa143eb3 commit changes the way the
per-tuple context is used. Since this commit, more objects are stored in
this context. The size of the context was used to set the tuple size to
PG < 14. The extra objects in the context lead to wrong (very large)
results and flushes almost after every tuple read.

The cache synchronization introduced in
296601b1d7aba7f23aea3d47c617e2d6df81de3e is reverted. With the current
implementation, `MAX_PARTITION_BUFFERS` survive the flash. If
`timescaledb.max_open_chunks_per_insert` is lower than
`MAX_PARTITION_BUFFERS` , a buffer flush would be performed after each
tuple read.
This commit is contained in:
Jan Nidzwetzki 2022-10-17 11:57:28 +02:00 committed by Jan Nidzwetzki
parent 40a6c4cf87
commit e555eea9db
4 changed files with 141 additions and 82 deletions

View File

@ -25,6 +25,7 @@ argument or resolve the type ambiguity by casting to the intended type.
* #4745 Fix FK constraint violation error while insert into hypertable which references partitioned table
* #4756 Improve compression job IO performance
* #4807 Fix segmentation fault during INSERT into compressed hypertable.
* #4840 Fix performance regressions in the copy code
**Thanks**
* @jvanns for reporting hypertable FK reference to vanilla PostgreSQL partitioned table doesn't seem to work

View File

@ -85,6 +85,9 @@
*/
#define MAX_BUFFERED_BYTES 65535
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct TSCopyMultiInsertBuffer
{
@ -151,10 +154,8 @@ on_chunk_insert_state_changed(ChunkInsertState *state, void *data)
{
BulkInsertState bistate = data;
/* Different chunk so must release BulkInsertState */
if (bistate->current_buf != InvalidBuffer)
ReleaseBuffer(bistate->current_buf);
bistate->current_buf = InvalidBuffer;
/* Chunk changed, so release the buffer held in BulkInsertState */
ReleaseBulkInsertStatePin(bistate);
}
static CopyChunkState *
@ -266,16 +267,6 @@ TSCopyMultiInsertInfoIsFull(TSCopyMultiInsertInfo *miinfo)
miinfo->bufferedBytes >= MAX_BUFFERED_BYTES)
return true;
if (hash_get_num_entries(miinfo->multiInsertBuffers) >= ts_guc_max_open_chunks_per_insert)
{
/*
* Flushing each multi-insert buffer will require looking up the
* corresponding chunk insert state in the cache, so don't accumulate
* more inserts than the cache can fit, to avoid thrashing.
*/
return true;
}
return false;
}
@ -291,7 +282,7 @@ TSCopyMultiInsertInfoIsEmpty(TSCopyMultiInsertInfo *miinfo)
/*
* Write the tuples stored in 'buffer' out to the table.
*/
static inline void
static inline int
TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuffer *buffer)
{
MemoryContext oldcontext;
@ -318,11 +309,15 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
* table is closed and pointers (e.g., result_relation_info point) to invalid
* addresses. Re-reading the chunk insert state ensures that the table is
* open and the pointers are valid.
*
* No callback on changed chunk is needed, the bulk insert state buffer is
* freed in TSCopyMultiInsertBufferCleanup().
*/
ChunkInsertState *cis = ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch,
buffer->point,
on_chunk_insert_state_changed,
buffer->bistate);
ChunkInsertState *cis =
ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch,
buffer->point,
NULL /* on chunk changed function */,
NULL /* payload for on chunk changed function */);
ResultRelInfo *resultRelInfo = cis->result_relation_info;
@ -412,6 +407,14 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
/* Mark that all slots are free */
buffer->nused = 0;
/* Chunk could be closed on a subsequent call of ts_chunk_dispatch_get_chunk_insert_state
* (e.g., due to timescaledb.max_open_chunks_per_insert). So, ensure the bulk insert is
* finished after the flush is complete.
*/
ResultRelInfo *result_relation_info = cis->result_relation_info;
Assert(result_relation_info != NULL);
table_finish_bulk_insert(result_relation_info->ri_RelationDesc, miinfo->ti_options);
/* Reset cur_lineno and line_buf_valid to what they were */
#if PG14_GE
if (cstate != NULL)
@ -420,6 +423,8 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf
cstate->cur_lineno = save_cur_lineno;
}
#endif
return cis->chunk_id;
}
/*
@ -431,37 +436,119 @@ static inline void
TSCopyMultiInsertBufferCleanup(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuffer *buffer)
{
int i;
ResultRelInfo *result_relation_info;
/* Ensure buffer was flushed */
Assert(buffer->nused == 0);
ChunkInsertState *cis = ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch,
buffer->point,
on_chunk_insert_state_changed,
buffer->bistate);
result_relation_info = cis->result_relation_info;
Assert(result_relation_info != NULL);
FreeBulkInsertState(buffer->bistate);
/* Since we only create slots on demand, just drop the non-null ones. */
for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++)
ExecDropSingleTupleTableSlot(buffer->slots[i]);
table_finish_bulk_insert(result_relation_info->ri_RelationDesc, miinfo->ti_options);
pfree(buffer->point);
pfree(buffer);
}
#if PG13_LT
/* list_sort comparator to sort TSCopyMultiInsertBuffer by usage */
static int
TSCmpBuffersByUsage(const void *a, const void *b)
{
int b1 = ((const TSCopyMultiInsertBuffer *) lfirst(*(ListCell **) a))->nused;
int b2 = ((const TSCopyMultiInsertBuffer *) lfirst(*(ListCell **) b))->nused;
Assert(b1 >= 0);
Assert(b2 >= 0);
return (b1 > b2) ? 1 : (b1 == b2) ? 0 : -1;
}
#else
/* list_sort comparator to sort TSCopyMultiInsertBuffer by usage */
static int
TSCmpBuffersByUsage(const ListCell *a, const ListCell *b)
{
int b1 = ((const TSCopyMultiInsertBuffer *) lfirst(a))->nused;
int b2 = ((const TSCopyMultiInsertBuffer *) lfirst(b))->nused;
Assert(b1 >= 0);
Assert(b2 >= 0);
return (b1 > b2) ? 1 : (b1 == b2) ? 0 : -1;
}
#endif
/*
* Write out all stored tuples of all buffers to the chunks. Also,
* cleanup the allocated buffers and free memory.
* Flush all buffers by writing the tuples to the chunks. In addition, trim down the
* amount of multi-insert buffers to MAX_PARTITION_BUFFERS by deleting the least used
* buffers (the buffers that store least tuples).
*/
static inline void
TSCopyMultiInsertInfoFlush(TSCopyMultiInsertInfo *miinfo, ChunkInsertState *cur_cis)
{
HASH_SEQ_STATUS status;
MultiInsertBufferEntry *entry;
int current_multi_insert_buffers;
int buffers_to_delete;
bool found;
int32 flushed_chunk_id;
List *buffer_list = NIL;
ListCell *lc;
current_multi_insert_buffers = hash_get_num_entries(miinfo->multiInsertBuffers);
/* Create a list of buffers that can be sorted by usage */
hash_seq_init(&status, miinfo->multiInsertBuffers);
for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status))
{
buffer_list = lappend(buffer_list, entry->buffer);
}
buffers_to_delete = Max(current_multi_insert_buffers - MAX_PARTITION_BUFFERS, 0);
/* Sorting is only needed if we want to remove the least used buffers */
if (buffers_to_delete > 0)
buffer_list = list_sort_compat(buffer_list, TSCmpBuffersByUsage);
/* Flush buffers and delete them if needed */
foreach (lc, buffer_list)
{
TSCopyMultiInsertBuffer *buffer = (TSCopyMultiInsertBuffer *) lfirst(lc);
flushed_chunk_id = TSCopyMultiInsertBufferFlush(miinfo, buffer);
if (buffers_to_delete > 0)
{
/*
* Reduce active multi-insert buffers. However, the current used buffer
* should not be deleted because it might reused for the next insert.
*/
if (cur_cis == NULL || flushed_chunk_id != cur_cis->chunk_id)
{
TSCopyMultiInsertBufferCleanup(miinfo, buffer);
hash_search(miinfo->multiInsertBuffers, &flushed_chunk_id, HASH_REMOVE, &found);
Assert(found);
buffers_to_delete--;
}
}
}
list_free(buffer_list);
/* All buffers have been flushed */
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
}
/*
* All existing buffers are flushed and the multi-insert states
* are freed. So, delete old hash map and create a new one for further
* inserts.
*/
static inline void
TSCopyMultiInsertInfoFlushAndCleanup(TSCopyMultiInsertInfo *miinfo)
{
TSCopyMultiInsertInfoFlush(miinfo, NULL);
HASH_SEQ_STATUS status;
MultiInsertBufferEntry *entry;
@ -470,24 +557,10 @@ TSCopyMultiInsertInfoFlushAndCleanup(TSCopyMultiInsertInfo *miinfo)
for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status))
{
TSCopyMultiInsertBuffer *buffer = entry->buffer;
TSCopyMultiInsertBufferFlush(miinfo, buffer);
/*
* Cleanup the buffer and finish the bulk insert. The chunk could
* be closed (e.g., due to timescaledb.max_open_chunks_per_insert)
* and the bulk insert must be finished first.
*/
TSCopyMultiInsertBufferCleanup(miinfo, buffer);
}
/* All existing buffers are flushed and the multi-insert states
* are freed. So, delete old hash map and create a new one for further
* inserts.
*/
hash_destroy(miinfo->multiInsertBuffers);
miinfo->multiInsertBuffers = TSCopyCreateNewInsertBufferHashMap();
miinfo->bufferedTuples = 0;
miinfo->bufferedBytes = 0;
}
/*
@ -543,37 +616,17 @@ TSCopyMultiInsertInfoStore(TSCopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
/* Update how many tuples are stored and their size */
miinfo->bufferedTuples++;
/*
* Note: There is no reliable way to determine the in-memory size of a virtual
* tuple. So, we perform flushing in PG < 14 only based on the number of buffered
* tuples and not based on the size.
*/
#if PG14_GE
if (cstate != NULL)
{
int tuplen = cstate->line_buf.len;
miinfo->bufferedBytes += tuplen;
}
else
{
#endif
/*
* Determine the size of the tuple by calculating the size of the per-tuple
* memory context. The per-tuple memory context is deleted in the copyfrom
* function per processed tuple. So, at this time only the tuple is stored
* in this context.
*/
MemoryContextCounters context_counter;
MemoryContext tuple_context = GetPerTupleMemoryContext(miinfo->estate);
#if PG14_GE
tuple_context->methods->stats(tuple_context, NULL, 0, &context_counter, false);
#else
tuple_context->methods->stats(tuple_context, false, 0, &context_counter);
#endif
Size used_memory = context_counter.totalspace - context_counter.freespace;
Assert(used_memory > 0);
miinfo->bufferedBytes += used_memory;
#if PG14_GE
}
#endif
}
@ -919,7 +972,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
* batching, so rows are visible to triggers etc.
*/
if (insertMethod == CIM_MULTI_CONDITIONAL)
TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo);
TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
currentTupleInsertMethod = CIM_SINGLE;
}
@ -1108,7 +1161,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
multiInsertInfo.bufferedBytes,
multiInsertInfo.bufferedTuples)));
TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo);
TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis);
}
}
}
@ -1135,12 +1188,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte
/* Flush any remaining buffered tuples */
if (insertMethod != CIM_SINGLE)
{
if (!TSCopyMultiInsertInfoIsEmpty(&multiInsertInfo))
TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo);
hash_destroy(multiInsertInfo.multiInsertBuffers);
}
TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo);
/* Done, clean up */
if (errcallback.previous)

View File

@ -50,12 +50,16 @@ select count(*) from portal_memory_log;
5
(1 row)
--- Check the memory usage of the PortalContext. Ensure that the copy commands do
--- not allocate memory in this context and the context does not grow. Allow 10%
--- change of memory usage to account for some randomness.
select bytes as bytes_begin from portal_memory_log order by id asc limit 1 \gset
select bytes as bytes_end from portal_memory_log order by id desc limit 1 \gset
-- We'll only compare the biggest runs, because the smaller ones have variance
-- due to new chunks being created and other unknown reasons. Allow 10% change of
-- memory usage to account for some randomness.
select * from portal_memory_log where (
select (max(bytes) - min(bytes)) / max(bytes)::float > 0.1
from portal_memory_log where id >= 3
select abs(:bytes_begin - :bytes_end) / :bytes_begin::float > 0.1
);
id | bytes
----+-------

View File

@ -50,10 +50,16 @@ set timescaledb.max_cached_chunks_per_hypertable = 3;
select count(*) from portal_memory_log;
--- Check the memory usage of the PortalContext. Ensure that the copy commands do
--- not allocate memory in this context and the context does not grow. Allow 10%
--- change of memory usage to account for some randomness.
select bytes as bytes_begin from portal_memory_log order by id asc limit 1 \gset
select bytes as bytes_end from portal_memory_log order by id desc limit 1 \gset
-- We'll only compare the biggest runs, because the smaller ones have variance
-- due to new chunks being created and other unknown reasons. Allow 10% change of
-- memory usage to account for some randomness.
select * from portal_memory_log where (
select (max(bytes) - min(bytes)) / max(bytes)::float > 0.1
from portal_memory_log where id >= 3
select abs(:bytes_begin - :bytes_end) / :bytes_begin::float > 0.1
);