diff --git a/.unreleased/bugfix_5774 b/.unreleased/bugfix_5774 new file mode 100644 index 000000000..84e382800 --- /dev/null +++ b/.unreleased/bugfix_5774 @@ -0,0 +1 @@ +Fixes: #5774 Fixed two bugs in decompression sorted merge code diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 25edd7ce9..969f01721 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -152,8 +152,9 @@ batch_states_create(DecompressChunkState *chunk_state, int nbatches) decompress_initialize_batch_state(chunk_state, batch_state); } - chunk_state->unused_batch_states = - bms_add_range(chunk_state->unused_batch_states, 0, nbatches - 1); + chunk_state->unused_batch_states = bms_add_range(NULL, 0, nbatches - 1); + + Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states); } /* @@ -181,6 +182,10 @@ batch_states_enlarge(DecompressChunkState *chunk_state, int nbatches) chunk_state->unused_batch_states = bms_add_range(chunk_state->unused_batch_states, chunk_state->n_batch_states, nbatches - 1); + Assert(bms_num_members(chunk_state->unused_batch_states) == + nbatches - chunk_state->n_batch_states); + + /* Update number of available batch states */ chunk_state->n_batch_states = nbatches; } @@ -522,8 +527,14 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt /* Batch states can be re-used skip tuple slot creation in that case */ if (batch_state->compressed_slot == NULL) { + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->compressed_slot_tdesc == NULL) + chunk_state->compressed_slot_tdesc = + CreateTupleDescCopyConstr(subslot->tts_tupleDescriptor); + Assert(chunk_state->compressed_slot_tdesc->tdrefcount == -1); + batch_state->compressed_slot = - MakeSingleTupleTableSlot(subslot->tts_tupleDescriptor, subslot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->compressed_slot_tdesc, subslot->tts_ops); } else { @@ -540,8 +551,15 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt { /* Get a reference the the output TupleTableSlot */ TupleTableSlot *slot = chunk_state->csstate.ss.ss_ScanTupleSlot; + + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->decompressed_slot_scan_tdesc == NULL) + chunk_state->decompressed_slot_scan_tdesc = + CreateTupleDescCopyConstr(slot->tts_tupleDescriptor); + Assert(chunk_state->decompressed_slot_scan_tdesc->tdrefcount == -1); + batch_state->decompressed_slot_scan = - MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->decompressed_slot_scan_tdesc, slot->tts_ops); } else { @@ -558,8 +576,16 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt if (chunk_state->csstate.ss.ps.ps_ProjInfo != NULL) { TupleTableSlot *slot = chunk_state->csstate.ss.ps.ps_ProjInfo->pi_state.resultslot; + + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->decompressed_slot_projected_tdesc == NULL) + chunk_state->decompressed_slot_projected_tdesc = + CreateTupleDescCopyConstr(slot->tts_tupleDescriptor); + Assert(chunk_state->decompressed_slot_projected_tdesc->tdrefcount == -1); + batch_state->decompressed_slot_projected = - MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->decompressed_slot_projected_tdesc, + slot->tts_ops); } else { @@ -822,13 +848,18 @@ decompress_chunk_rescan(CustomScanState *node) DecompressChunkState *chunk_state = (DecompressChunkState *) node; if (chunk_state->merge_heap != NULL) + { decompress_sorted_merge_free(chunk_state); + Assert(chunk_state->merge_heap == NULL); + } for (int i = 0; i < chunk_state->n_batch_states; i++) { decompress_set_batch_state_to_unused(chunk_state, i); } + Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states); + ExecReScan(linitial(node->custom_ps)); } @@ -842,6 +873,7 @@ decompress_chunk_end(CustomScanState *node) if (chunk_state->merge_heap != NULL) { decompress_sorted_merge_free(chunk_state); + Assert(chunk_state->merge_heap == NULL); } for (i = 0; i < chunk_state->n_batch_states; i++) diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index 67976451f..7ea0d7ea3 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -110,6 +110,18 @@ typedef struct DecompressChunkState SortSupportData *sortkeys; /* Sort keys for binary heap compare function */ bool using_bulk_decompression; /* For EXPLAIN ANALYZE. */ + + /* + * Make non-refcounted copies of the tupdesc for reuse across all batch states + * and avoid spending CPU in ResourceOwner when creating a big number of table + * slots. This happens because each new slot pins its tuple descriptor using + * PinTupleDesc, and for reference-counting tuples this involves adding a new + * reference to ResourceOwner, which is not very efficient for a large number of + * references. + */ + TupleDesc decompressed_slot_projected_tdesc; + TupleDesc decompressed_slot_scan_tdesc; + TupleDesc compressed_slot_tdesc; } DecompressChunkState; extern Node *decompress_chunk_state_create(CustomScan *cscan); diff --git a/tsl/test/expected/compression_sorted_merge-12.out b/tsl/test/expected/compression_sorted_merge-12.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-12.out +++ b/tsl/test/expected/compression_sorted_merge-12.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-13.out b/tsl/test/expected/compression_sorted_merge-13.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-13.out +++ b/tsl/test/expected/compression_sorted_merge-13.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-14.out b/tsl/test/expected/compression_sorted_merge-14.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-14.out +++ b/tsl/test/expected/compression_sorted_merge-14.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-15.out b/tsl/test/expected/compression_sorted_merge-15.out index d5d79bd7a..a74b2e187 100644 --- a/tsl/test/expected/compression_sorted_merge-15.out +++ b/tsl/test/expected/compression_sorted_merge-15.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/sql/compression_sorted_merge.sql.in b/tsl/test/sql/compression_sorted_merge.sql.in index c659770b4..29da38ae6 100644 --- a/tsl/test/sql/compression_sorted_merge.sql.in +++ b/tsl/test/sql/compression_sorted_merge.sql.in @@ -407,6 +407,21 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 1 CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; + +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + ------ -- Test enabling and disabling the optimization based on costs ------