1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-21 21:21:22 +08:00

Fixed two bugs in decompression sorted merge code

SQLSmith found two bugs in the compression sorted merge code.

* The unused_batch_states are not initialized properly. Therefore,
  non-existing unused batch states can be part of the BMS. This patch
  fixes the initialization.

* For performance reasons, We reuse the same TupleDesc across all
  TupleTableSlots. PostgreSQL sometimes uses TupleDesc data structures
  with active reference counting. The way we use the TupleDesc
  structures collides with the reference counting of PostgreSQL. This
  patch introduces a private TupleDesc copy without reference counting.
This commit is contained in:
Jan Nidzwetzki 2023-06-12 15:52:06 +02:00 committed by Jan Nidzwetzki
parent 4dce87a1c4
commit 9c7ae3e8a9
8 changed files with 149 additions and 5 deletions

1
.unreleased/bugfix_5774 Normal file

@ -0,0 +1 @@
Fixes: #5774 Fixed two bugs in decompression sorted merge code

@ -152,8 +152,9 @@ batch_states_create(DecompressChunkState *chunk_state, int nbatches)
decompress_initialize_batch_state(chunk_state, batch_state);
}
chunk_state->unused_batch_states =
bms_add_range(chunk_state->unused_batch_states, 0, nbatches - 1);
chunk_state->unused_batch_states = bms_add_range(NULL, 0, nbatches - 1);
Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states);
}
/*
@ -181,6 +182,10 @@ batch_states_enlarge(DecompressChunkState *chunk_state, int nbatches)
chunk_state->unused_batch_states =
bms_add_range(chunk_state->unused_batch_states, chunk_state->n_batch_states, nbatches - 1);
Assert(bms_num_members(chunk_state->unused_batch_states) ==
nbatches - chunk_state->n_batch_states);
/* Update number of available batch states */
chunk_state->n_batch_states = nbatches;
}
@ -522,8 +527,14 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt
/* Batch states can be re-used skip tuple slot creation in that case */
if (batch_state->compressed_slot == NULL)
{
/* Create a non ref-counted copy of the tuple descriptor */
if (chunk_state->compressed_slot_tdesc == NULL)
chunk_state->compressed_slot_tdesc =
CreateTupleDescCopyConstr(subslot->tts_tupleDescriptor);
Assert(chunk_state->compressed_slot_tdesc->tdrefcount == -1);
batch_state->compressed_slot =
MakeSingleTupleTableSlot(subslot->tts_tupleDescriptor, subslot->tts_ops);
MakeSingleTupleTableSlot(chunk_state->compressed_slot_tdesc, subslot->tts_ops);
}
else
{
@ -540,8 +551,15 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt
{
/* Get a reference the the output TupleTableSlot */
TupleTableSlot *slot = chunk_state->csstate.ss.ss_ScanTupleSlot;
/* Create a non ref-counted copy of the tuple descriptor */
if (chunk_state->decompressed_slot_scan_tdesc == NULL)
chunk_state->decompressed_slot_scan_tdesc =
CreateTupleDescCopyConstr(slot->tts_tupleDescriptor);
Assert(chunk_state->decompressed_slot_scan_tdesc->tdrefcount == -1);
batch_state->decompressed_slot_scan =
MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops);
MakeSingleTupleTableSlot(chunk_state->decompressed_slot_scan_tdesc, slot->tts_ops);
}
else
{
@ -558,8 +576,16 @@ decompress_initialize_batch(DecompressChunkState *chunk_state, DecompressBatchSt
if (chunk_state->csstate.ss.ps.ps_ProjInfo != NULL)
{
TupleTableSlot *slot = chunk_state->csstate.ss.ps.ps_ProjInfo->pi_state.resultslot;
/* Create a non ref-counted copy of the tuple descriptor */
if (chunk_state->decompressed_slot_projected_tdesc == NULL)
chunk_state->decompressed_slot_projected_tdesc =
CreateTupleDescCopyConstr(slot->tts_tupleDescriptor);
Assert(chunk_state->decompressed_slot_projected_tdesc->tdrefcount == -1);
batch_state->decompressed_slot_projected =
MakeSingleTupleTableSlot(slot->tts_tupleDescriptor, slot->tts_ops);
MakeSingleTupleTableSlot(chunk_state->decompressed_slot_projected_tdesc,
slot->tts_ops);
}
else
{
@ -822,13 +848,18 @@ decompress_chunk_rescan(CustomScanState *node)
DecompressChunkState *chunk_state = (DecompressChunkState *) node;
if (chunk_state->merge_heap != NULL)
{
decompress_sorted_merge_free(chunk_state);
Assert(chunk_state->merge_heap == NULL);
}
for (int i = 0; i < chunk_state->n_batch_states; i++)
{
decompress_set_batch_state_to_unused(chunk_state, i);
}
Assert(bms_num_members(chunk_state->unused_batch_states) == chunk_state->n_batch_states);
ExecReScan(linitial(node->custom_ps));
}
@ -842,6 +873,7 @@ decompress_chunk_end(CustomScanState *node)
if (chunk_state->merge_heap != NULL)
{
decompress_sorted_merge_free(chunk_state);
Assert(chunk_state->merge_heap == NULL);
}
for (i = 0; i < chunk_state->n_batch_states; i++)

@ -110,6 +110,18 @@ typedef struct DecompressChunkState
SortSupportData *sortkeys; /* Sort keys for binary heap compare function */
bool using_bulk_decompression; /* For EXPLAIN ANALYZE. */
/*
* Make non-refcounted copies of the tupdesc for reuse across all batch states
* and avoid spending CPU in ResourceOwner when creating a big number of table
* slots. This happens because each new slot pins its tuple descriptor using
* PinTupleDesc, and for reference-counting tuples this involves adding a new
* reference to ResourceOwner, which is not very efficient for a large number of
* references.
*/
TupleDesc decompressed_slot_projected_tdesc;
TupleDesc decompressed_slot_scan_tdesc;
TupleDesc compressed_slot_tdesc;
} DecompressChunkState;
extern Node *decompress_chunk_state_create(CustomScan *cscan);

@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST');
CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100');
CALL order_test('SELECT * FROM test1 ORDER BY time DESC');
CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST');
------
-- Test window functions
------
CREATE TABLE insert_test(id INT);
INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1;
SELECT * FROM insert_test AS ref_0
WHERE EXISTS (
SELECT
sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time)
FROM
sensor_data AS sample_0
WHERE (1 > sample_0.temperature)
);
id
----
1
2
3
4
(4 rows)
------
-- Test enabling and disabling the optimization based on costs
------

@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST');
CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100');
CALL order_test('SELECT * FROM test1 ORDER BY time DESC');
CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST');
------
-- Test window functions
------
CREATE TABLE insert_test(id INT);
INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1;
SELECT * FROM insert_test AS ref_0
WHERE EXISTS (
SELECT
sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time)
FROM
sensor_data AS sample_0
WHERE (1 > sample_0.temperature)
);
id
----
1
2
3
4
(4 rows)
------
-- Test enabling and disabling the optimization based on costs
------

@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST');
CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100');
CALL order_test('SELECT * FROM test1 ORDER BY time DESC');
CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST');
------
-- Test window functions
------
CREATE TABLE insert_test(id INT);
INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1;
SELECT * FROM insert_test AS ref_0
WHERE EXISTS (
SELECT
sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time)
FROM
sensor_data AS sample_0
WHERE (1 > sample_0.temperature)
);
id
----
1
2
3
4
(4 rows)
------
-- Test enabling and disabling the optimization based on costs
------

@ -1274,6 +1274,27 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST');
CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 100');
CALL order_test('SELECT * FROM test1 ORDER BY time DESC');
CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST');
------
-- Test window functions
------
CREATE TABLE insert_test(id INT);
INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1;
SELECT * FROM insert_test AS ref_0
WHERE EXISTS (
SELECT
sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time)
FROM
sensor_data AS sample_0
WHERE (1 > sample_0.temperature)
);
id
----
1
2
3
4
(4 rows)
------
-- Test enabling and disabling the optimization based on costs
------

@ -407,6 +407,21 @@ CALL order_test('SELECT * FROM sensor_data ORDER BY time ASC NULLS FIRST LIMIT 1
CALL order_test('SELECT * FROM test1 ORDER BY time DESC');
CALL order_test('SELECT * FROM test1 ORDER BY time ASC NULLS LAST');
------
-- Test window functions
------
CREATE TABLE insert_test(id INT);
INSERT INTO insert_test SELECT time_bucket_gapfill(1,time,1,5) FROM (VALUES (1),(2)) v(time) GROUP BY 1 ORDER BY 1;
SELECT * FROM insert_test AS ref_0
WHERE EXISTS (
SELECT
sum(ref_0.id) OVER (partition by ref_0.id ORDER BY ref_0.id,ref_0.id,sample_0.time)
FROM
sensor_data AS sample_0
WHERE (1 > sample_0.temperature)
);
------
-- Test enabling and disabling the optimization based on costs
------