diff --git a/CHANGELOG.md b/CHANGELOG.md index 78264de3a..c9e6d9c0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ argument or resolve the type ambiguity by casting to the intended type. * #4745 Fix FK constraint violation error while insert into hypertable which references partitioned table * #4756 Improve compression job IO performance * #4807 Fix segmentation fault during INSERT into compressed hypertable. +* #4840 Fix performance regressions in the copy code **Thanks** * @jvanns for reporting hypertable FK reference to vanilla PostgreSQL partitioned table doesn't seem to work diff --git a/src/copy.c b/src/copy.c index 8e7f27321..3f67a5a27 100644 --- a/src/copy.c +++ b/src/copy.c @@ -85,6 +85,9 @@ */ #define MAX_BUFFERED_BYTES 65535 +/* Trim the list of buffers back down to this number after flushing */ +#define MAX_PARTITION_BUFFERS 32 + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct TSCopyMultiInsertBuffer { @@ -151,10 +154,8 @@ on_chunk_insert_state_changed(ChunkInsertState *state, void *data) { BulkInsertState bistate = data; - /* Different chunk so must release BulkInsertState */ - if (bistate->current_buf != InvalidBuffer) - ReleaseBuffer(bistate->current_buf); - bistate->current_buf = InvalidBuffer; + /* Chunk changed, so release the buffer held in BulkInsertState */ + ReleaseBulkInsertStatePin(bistate); } static CopyChunkState * @@ -266,16 +267,6 @@ TSCopyMultiInsertInfoIsFull(TSCopyMultiInsertInfo *miinfo) miinfo->bufferedBytes >= MAX_BUFFERED_BYTES) return true; - if (hash_get_num_entries(miinfo->multiInsertBuffers) >= ts_guc_max_open_chunks_per_insert) - { - /* - * Flushing each multi-insert buffer will require looking up the - * corresponding chunk insert state in the cache, so don't accumulate - * more inserts than the cache can fit, to avoid thrashing. - */ - return true; - } - return false; } @@ -291,7 +282,7 @@ TSCopyMultiInsertInfoIsEmpty(TSCopyMultiInsertInfo *miinfo) /* * Write the tuples stored in 'buffer' out to the table. */ -static inline void +static inline int TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuffer *buffer) { MemoryContext oldcontext; @@ -318,11 +309,15 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf * table is closed and pointers (e.g., result_relation_info point) to invalid * addresses. Re-reading the chunk insert state ensures that the table is * open and the pointers are valid. + * + * No callback on changed chunk is needed, the bulk insert state buffer is + * freed in TSCopyMultiInsertBufferCleanup(). */ - ChunkInsertState *cis = ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch, - buffer->point, - on_chunk_insert_state_changed, - buffer->bistate); + ChunkInsertState *cis = + ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch, + buffer->point, + NULL /* on chunk changed function */, + NULL /* payload for on chunk changed function */); ResultRelInfo *resultRelInfo = cis->result_relation_info; @@ -412,6 +407,14 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf /* Mark that all slots are free */ buffer->nused = 0; + /* Chunk could be closed on a subsequent call of ts_chunk_dispatch_get_chunk_insert_state + * (e.g., due to timescaledb.max_open_chunks_per_insert). So, ensure the bulk insert is + * finished after the flush is complete. + */ + ResultRelInfo *result_relation_info = cis->result_relation_info; + Assert(result_relation_info != NULL); + table_finish_bulk_insert(result_relation_info->ri_RelationDesc, miinfo->ti_options); + /* Reset cur_lineno and line_buf_valid to what they were */ #if PG14_GE if (cstate != NULL) @@ -420,6 +423,8 @@ TSCopyMultiInsertBufferFlush(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuf cstate->cur_lineno = save_cur_lineno; } #endif + + return cis->chunk_id; } /* @@ -431,37 +436,119 @@ static inline void TSCopyMultiInsertBufferCleanup(TSCopyMultiInsertInfo *miinfo, TSCopyMultiInsertBuffer *buffer) { int i; - ResultRelInfo *result_relation_info; /* Ensure buffer was flushed */ Assert(buffer->nused == 0); - ChunkInsertState *cis = ts_chunk_dispatch_get_chunk_insert_state(miinfo->ccstate->dispatch, - buffer->point, - on_chunk_insert_state_changed, - buffer->bistate); - result_relation_info = cis->result_relation_info; - Assert(result_relation_info != NULL); - FreeBulkInsertState(buffer->bistate); /* Since we only create slots on demand, just drop the non-null ones. */ for (i = 0; i < MAX_BUFFERED_TUPLES && buffer->slots[i] != NULL; i++) ExecDropSingleTupleTableSlot(buffer->slots[i]); - table_finish_bulk_insert(result_relation_info->ri_RelationDesc, miinfo->ti_options); - pfree(buffer->point); pfree(buffer); } +#if PG13_LT +/* list_sort comparator to sort TSCopyMultiInsertBuffer by usage */ +static int +TSCmpBuffersByUsage(const void *a, const void *b) +{ + int b1 = ((const TSCopyMultiInsertBuffer *) lfirst(*(ListCell **) a))->nused; + int b2 = ((const TSCopyMultiInsertBuffer *) lfirst(*(ListCell **) b))->nused; + + Assert(b1 >= 0); + Assert(b2 >= 0); + + return (b1 > b2) ? 1 : (b1 == b2) ? 0 : -1; +} +#else +/* list_sort comparator to sort TSCopyMultiInsertBuffer by usage */ +static int +TSCmpBuffersByUsage(const ListCell *a, const ListCell *b) +{ + int b1 = ((const TSCopyMultiInsertBuffer *) lfirst(a))->nused; + int b2 = ((const TSCopyMultiInsertBuffer *) lfirst(b))->nused; + + Assert(b1 >= 0); + Assert(b2 >= 0); + + return (b1 > b2) ? 1 : (b1 == b2) ? 0 : -1; +} +#endif + /* - * Write out all stored tuples of all buffers to the chunks. Also, - * cleanup the allocated buffers and free memory. + * Flush all buffers by writing the tuples to the chunks. In addition, trim down the + * amount of multi-insert buffers to MAX_PARTITION_BUFFERS by deleting the least used + * buffers (the buffers that store least tuples). + */ +static inline void +TSCopyMultiInsertInfoFlush(TSCopyMultiInsertInfo *miinfo, ChunkInsertState *cur_cis) +{ + HASH_SEQ_STATUS status; + MultiInsertBufferEntry *entry; + int current_multi_insert_buffers; + int buffers_to_delete; + bool found; + int32 flushed_chunk_id; + List *buffer_list = NIL; + ListCell *lc; + + current_multi_insert_buffers = hash_get_num_entries(miinfo->multiInsertBuffers); + + /* Create a list of buffers that can be sorted by usage */ + hash_seq_init(&status, miinfo->multiInsertBuffers); + for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status)) + { + buffer_list = lappend(buffer_list, entry->buffer); + } + + buffers_to_delete = Max(current_multi_insert_buffers - MAX_PARTITION_BUFFERS, 0); + + /* Sorting is only needed if we want to remove the least used buffers */ + if (buffers_to_delete > 0) + buffer_list = list_sort_compat(buffer_list, TSCmpBuffersByUsage); + + /* Flush buffers and delete them if needed */ + foreach (lc, buffer_list) + { + TSCopyMultiInsertBuffer *buffer = (TSCopyMultiInsertBuffer *) lfirst(lc); + flushed_chunk_id = TSCopyMultiInsertBufferFlush(miinfo, buffer); + + if (buffers_to_delete > 0) + { + /* + * Reduce active multi-insert buffers. However, the current used buffer + * should not be deleted because it might reused for the next insert. + */ + if (cur_cis == NULL || flushed_chunk_id != cur_cis->chunk_id) + { + TSCopyMultiInsertBufferCleanup(miinfo, buffer); + hash_search(miinfo->multiInsertBuffers, &flushed_chunk_id, HASH_REMOVE, &found); + Assert(found); + buffers_to_delete--; + } + } + } + + list_free(buffer_list); + + /* All buffers have been flushed */ + miinfo->bufferedTuples = 0; + miinfo->bufferedBytes = 0; +} + +/* + * All existing buffers are flushed and the multi-insert states + * are freed. So, delete old hash map and create a new one for further + * inserts. */ static inline void TSCopyMultiInsertInfoFlushAndCleanup(TSCopyMultiInsertInfo *miinfo) { + TSCopyMultiInsertInfoFlush(miinfo, NULL); + HASH_SEQ_STATUS status; MultiInsertBufferEntry *entry; @@ -470,24 +557,10 @@ TSCopyMultiInsertInfoFlushAndCleanup(TSCopyMultiInsertInfo *miinfo) for (entry = hash_seq_search(&status); entry != NULL; entry = hash_seq_search(&status)) { TSCopyMultiInsertBuffer *buffer = entry->buffer; - TSCopyMultiInsertBufferFlush(miinfo, buffer); - - /* - * Cleanup the buffer and finish the bulk insert. The chunk could - * be closed (e.g., due to timescaledb.max_open_chunks_per_insert) - * and the bulk insert must be finished first. - */ TSCopyMultiInsertBufferCleanup(miinfo, buffer); } - /* All existing buffers are flushed and the multi-insert states - * are freed. So, delete old hash map and create a new one for further - * inserts. - */ hash_destroy(miinfo->multiInsertBuffers); - miinfo->multiInsertBuffers = TSCopyCreateNewInsertBufferHashMap(); - miinfo->bufferedTuples = 0; - miinfo->bufferedBytes = 0; } /* @@ -543,37 +616,17 @@ TSCopyMultiInsertInfoStore(TSCopyMultiInsertInfo *miinfo, ResultRelInfo *rri, /* Update how many tuples are stored and their size */ miinfo->bufferedTuples++; + /* + * Note: There is no reliable way to determine the in-memory size of a virtual + * tuple. So, we perform flushing in PG < 14 only based on the number of buffered + * tuples and not based on the size. + */ #if PG14_GE if (cstate != NULL) { int tuplen = cstate->line_buf.len; miinfo->bufferedBytes += tuplen; } - else - { -#endif - - /* - * Determine the size of the tuple by calculating the size of the per-tuple - * memory context. The per-tuple memory context is deleted in the copyfrom - * function per processed tuple. So, at this time only the tuple is stored - * in this context. - */ - MemoryContextCounters context_counter; - MemoryContext tuple_context = GetPerTupleMemoryContext(miinfo->estate); - -#if PG14_GE - tuple_context->methods->stats(tuple_context, NULL, 0, &context_counter, false); -#else - tuple_context->methods->stats(tuple_context, false, 0, &context_counter); -#endif - - Size used_memory = context_counter.totalspace - context_counter.freespace; - Assert(used_memory > 0); - miinfo->bufferedBytes += used_memory; - -#if PG14_GE - } #endif } @@ -919,7 +972,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte * batching, so rows are visible to triggers etc. */ if (insertMethod == CIM_MULTI_CONDITIONAL) - TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo); + TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis); currentTupleInsertMethod = CIM_SINGLE; } @@ -1108,7 +1161,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte multiInsertInfo.bufferedBytes, multiInsertInfo.bufferedTuples))); - TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo); + TSCopyMultiInsertInfoFlush(&multiInsertInfo, cis); } } } @@ -1135,12 +1188,7 @@ copyfrom(CopyChunkState *ccstate, List *range_table, Hypertable *ht, MemoryConte /* Flush any remaining buffered tuples */ if (insertMethod != CIM_SINGLE) - { - if (!TSCopyMultiInsertInfoIsEmpty(&multiInsertInfo)) - TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo); - - hash_destroy(multiInsertInfo.multiInsertBuffers); - } + TSCopyMultiInsertInfoFlushAndCleanup(&multiInsertInfo); /* Done, clean up */ if (errcallback.previous) diff --git a/test/expected/copy_memory_usage.out b/test/expected/copy_memory_usage.out index 6bd5ca450..1bc62fe9c 100644 --- a/test/expected/copy_memory_usage.out +++ b/test/expected/copy_memory_usage.out @@ -50,12 +50,16 @@ select count(*) from portal_memory_log; 5 (1 row) +--- Check the memory usage of the PortalContext. Ensure that the copy commands do +--- not allocate memory in this context and the context does not grow. Allow 10% +--- change of memory usage to account for some randomness. +select bytes as bytes_begin from portal_memory_log order by id asc limit 1 \gset +select bytes as bytes_end from portal_memory_log order by id desc limit 1 \gset -- We'll only compare the biggest runs, because the smaller ones have variance -- due to new chunks being created and other unknown reasons. Allow 10% change of -- memory usage to account for some randomness. select * from portal_memory_log where ( - select (max(bytes) - min(bytes)) / max(bytes)::float > 0.1 - from portal_memory_log where id >= 3 + select abs(:bytes_begin - :bytes_end) / :bytes_begin::float > 0.1 ); id | bytes ----+------- diff --git a/test/sql/copy_memory_usage.sql b/test/sql/copy_memory_usage.sql index b0173fdeb..a314dec2b 100644 --- a/test/sql/copy_memory_usage.sql +++ b/test/sql/copy_memory_usage.sql @@ -50,10 +50,16 @@ set timescaledb.max_cached_chunks_per_hypertable = 3; select count(*) from portal_memory_log; +--- Check the memory usage of the PortalContext. Ensure that the copy commands do +--- not allocate memory in this context and the context does not grow. Allow 10% +--- change of memory usage to account for some randomness. +select bytes as bytes_begin from portal_memory_log order by id asc limit 1 \gset +select bytes as bytes_end from portal_memory_log order by id desc limit 1 \gset + -- We'll only compare the biggest runs, because the smaller ones have variance -- due to new chunks being created and other unknown reasons. Allow 10% change of -- memory usage to account for some randomness. select * from portal_memory_log where ( - select (max(bytes) - min(bytes)) / max(bytes)::float > 0.1 - from portal_memory_log where id >= 3 + select abs(:bytes_begin - :bytes_end) / :bytes_begin::float > 0.1 ); +