From 9c7ae3e8a983ff1a19645c3d2dc0508ae8c69550 Mon Sep 17 00:00:00 2001 From: Jan Nidzwetzki Date: Mon, 12 Jun 2023 15:52:06 +0200 Subject: [PATCH] Fixed two bugs in decompression sorted merge code SQLSmith found two bugs in the compression sorted merge code. * The unused_batch_states are not initialized properly. Therefore, non-existing unused batch states can be part of the BMS. This patch fixes the initialization. * For performance reasons, We reuse the same TupleDesc across all TupleTableSlots. PostgreSQL sometimes uses TupleDesc data structures with active reference counting. The way we use the TupleDesc structures collides with the reference counting of PostgreSQL. This patch introduces a private TupleDesc copy without reference counting. --- .unreleased/bugfix_5774 | 1 + tsl/src/nodes/decompress_chunk/exec.c | 42 ++++++++++++++++--- tsl/src/nodes/decompress_chunk/exec.h | 12 ++++++ .../expected/compression_sorted_merge-12.out | 21 ++++++++++ .../expected/compression_sorted_merge-13.out | 21 ++++++++++ .../expected/compression_sorted_merge-14.out | 21 ++++++++++ .../expected/compression_sorted_merge-15.out | 21 ++++++++++ tsl/test/sql/compression_sorted_merge.sql.in | 15 +++++++ 8 files changed, 149 insertions(+), 5 deletions(-) create mode 100644 .unreleased/bugfix_5774 diff --git a/.unreleased/bugfix_5774 b/.unreleased/bugfix_5774 new file mode 100644 index 000000000..84e382800 --- /dev/null +++ b/.unreleased/bugfix_5774 @@ -0,0 +1 @@ +Fixes: #5774 Fixed two bugs in decompression sorted merge code diff --git a/tsl/src/nodes/decompress_chunk/exec.c b/tsl/src/nodes/decompress_chunk/exec.c index 25edd7ce9..969f01721 100644 --- a/tsl/src/nodes/decompress_chunk/exec.c +++ b/tsl/src/nodes/decompress_chunk/exec.c @@ -152,8 +152,9 @@ batch_states_create(DecompressChunkState *chunk_state, int nbatches) decompress_initialize_batch_state(chunk_state, batch_state); } - chunk_state->unused_batch_states = - bms_add_range(chunk_state->unused_batch_states, 0, nbatches - 1); + chunk_state->unused_batch_states = bms_add_range(NULL, 0, nbatches - 1); + + Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states); } /* @@ -181,6 +182,10 @@ batch_states_enlarge(DecompressChunkState *chunk_state, int nbatches) chunk_state->unused_batch_states = bms_add_range(chunk_state->unused_batch_states, chunk_state->n_batch_states, nbatches - 1); + Assert(bms_num_members(chunk_state->unused_batch_states) == + nbatches - chunk_state->n_batch_states); + + /* Update number of available batch states */ chunk_state->n_batch_states = nbatches; } @@ -522,8 +527,14 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt /* Batch states can be re-used skip tuple slot creation in that case */ if (batch_state->compressed_slot == NULL) { + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->compressed_slot_tdesc == NULL) + chunk_state->compressed_slot_tdesc = + CreateTupleDescCopyConstr(subslot->tts_tupleDescriptor); + Assert(chunk_state->compressed_slot_tdesc->tdrefcount == -1); + batch_state->compressed_slot = - MakeSingleTupleTableSlot(subslot->tts_tupleDescriptor, subslot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->compressed_slot_tdesc, subslot->tts_ops); } else { @@ -540,8 +551,15 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt { /* Get a reference the the output TupleTableSlot */ TupleTableSlot *slot = chunk_state->csstate.ss.ss_ScanTupleSlot; + + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->decompressed_slot_scan_tdesc == NULL) + chunk_state->decompressed_slot_scan_tdesc = + CreateTupleDescCopyConstr(slot->tts_tupleDescriptor); + Assert(chunk_state->decompressed_slot_scan_tdesc->tdrefcount == -1); + batch_state->decompressed_slot_scan = - MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->decompressed_slot_scan_tdesc, slot->tts_ops); } else { @@ -558,8 +576,16 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt if (chunk_state->csstate.ss.ps.ps_ProjInfo != NULL) { TupleTableSlot *slot = chunk_state->csstate.ss.ps.ps_ProjInfo->pi_state.resultslot; + + /* Create a non ref-counted copy of the tuple descriptor */ + if (chunk_state->decompressed_slot_projected_tdesc == NULL) + chunk_state->decompressed_slot_projected_tdesc = + CreateTupleDescCopyConstr(slot->tts_tupleDescriptor); + Assert(chunk_state->decompressed_slot_projected_tdesc->tdrefcount == -1); + batch_state->decompressed_slot_projected = - MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops); + MakeSingleTupleTableSlot(chunk_state->decompressed_slot_projected_tdesc, + slot->tts_ops); } else { @@ -822,13 +848,18 @@ decompress_chunk_rescan(CustomScanState *node) DecompressChunkState *chunk_state = (DecompressChunkState *) node; if (chunk_state->merge_heap != NULL) + { decompress_sorted_merge_free(chunk_state); + Assert(chunk_state->merge_heap == NULL); + } for (int i = 0; i < chunk_state->n_batch_states; i++) { decompress_set_batch_state_to_unused(chunk_state, i); } + Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states); + ExecReScan(linitial(node->custom_ps)); } @@ -842,6 +873,7 @@ decompress_chunk_end(CustomScanState *node) if (chunk_state->merge_heap != NULL) { decompress_sorted_merge_free(chunk_state); + Assert(chunk_state->merge_heap == NULL); } for (i = 0; i < chunk_state->n_batch_states; i++) diff --git a/tsl/src/nodes/decompress_chunk/exec.h b/tsl/src/nodes/decompress_chunk/exec.h index 67976451f..7ea0d7ea3 100644 --- a/tsl/src/nodes/decompress_chunk/exec.h +++ b/tsl/src/nodes/decompress_chunk/exec.h @@ -110,6 +110,18 @@ typedef struct DecompressChunkState SortSupportData *sortkeys; /* Sort keys for binary heap compare function */ bool using_bulk_decompression; /* For EXPLAIN ANALYZE. */ + + /* + * Make non-refcounted copies of the tupdesc for reuse across all batch states + * and avoid spending CPU in ResourceOwner when creating a big number of table + * slots. This happens because each new slot pins its tuple descriptor using + * PinTupleDesc, and for reference-counting tuples this involves adding a new + * reference to ResourceOwner, which is not very efficient for a large number of + * references. + */ + TupleDesc decompressed_slot_projected_tdesc; + TupleDesc decompressed_slot_scan_tdesc; + TupleDesc compressed_slot_tdesc; } DecompressChunkState; extern Node *decompress_chunk_state_create(CustomScan *cscan); diff --git a/tsl/test/expected/compression_sorted_merge-12.out b/tsl/test/expected/compression_sorted_merge-12.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-12.out +++ b/tsl/test/expected/compression_sorted_merge-12.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-13.out b/tsl/test/expected/compression_sorted_merge-13.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-13.out +++ b/tsl/test/expected/compression_sorted_merge-13.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-14.out b/tsl/test/expected/compression_sorted_merge-14.out index 648624f5d..876c0d1dc 100644 --- a/tsl/test/expected/compression_sorted_merge-14.out +++ b/tsl/test/expected/compression_sorted_merge-14.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/expected/compression_sorted_merge-15.out b/tsl/test/expected/compression_sorted_merge-15.out index d5d79bd7a..a74b2e187 100644 --- a/tsl/test/expected/compression_sorted_merge-15.out +++ b/tsl/test/expected/compression_sorted_merge-15.out @@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST'); CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100'); CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + id +---- + 1 + 2 + 3 + 4 +(4 rows) + ------ -- Test enabling and disabling the optimization based on costs ------ diff --git a/tsl/test/sql/compression_sorted_merge.sql.in b/tsl/test/sql/compression_sorted_merge.sql.in index c659770b4..29da38ae6 100644 --- a/tsl/test/sql/compression_sorted_merge.sql.in +++ b/tsl/test/sql/compression_sorted_merge.sql.in @@ -407,6 +407,21 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 1 CALL order_test('SELECT * FROM test1 ORDER BY time DESC'); CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST'); +------ +-- Test window functions +------ +CREATE TABLE insert_test(id INT); +INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1; + +SELECT * FROM insert_test AS ref_0 +WHERE EXISTS ( + SELECT + sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time) + FROM + sensor_data AS sample_0 + WHERE (1 > sample_0.temperature) +); + ------ -- Test enabling and disabling the optimization based on costs ------