mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
Fix backwards scan on Hypercore TAM
Fix a bug when backwards scanning a ColumnarScan and add a test case.
This commit is contained in:
parent
8cff1c22a1
commit
cde81858e8
@ -383,18 +383,19 @@ arrow_slot_try_getnext(TupleTableSlot *slot, ScanDirection direction)
|
||||
Assert(direction == ForwardScanDirection || direction == BackwardScanDirection);
|
||||
|
||||
/* If empty or not containing a compressed tuple, there is nothing to do */
|
||||
if (unlikely(TTS_EMPTY(slot)) || aslot->tuple_index == InvalidTupleIndex)
|
||||
if (unlikely(TTS_EMPTY(slot)) || aslot->tuple_index == InvalidTupleIndex ||
|
||||
arrow_slot_is_consumed(slot))
|
||||
return false;
|
||||
|
||||
if (direction == ForwardScanDirection)
|
||||
if (likely(direction == ForwardScanDirection))
|
||||
{
|
||||
if (aslot->tuple_index < aslot->total_row_count)
|
||||
if (!arrow_slot_is_last(slot))
|
||||
{
|
||||
ExecStoreNextArrowTuple(slot);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
else if (aslot->tuple_index > 1)
|
||||
else if (!arrow_slot_is_first(slot))
|
||||
{
|
||||
Assert(direction == BackwardScanDirection);
|
||||
ExecStorePreviousArrowTuple(slot);
|
||||
|
@ -873,7 +873,7 @@ hypercore_getnextslot_noncompressed(HypercoreScanDesc scan, ScanDirection direct
|
||||
return result;
|
||||
}
|
||||
|
||||
static bool
|
||||
static inline bool
|
||||
should_read_new_compressed_slot(TupleTableSlot *slot, ScanDirection direction)
|
||||
{
|
||||
/* Scans are never invoked with NoMovementScanDirection */
|
||||
@ -881,18 +881,18 @@ should_read_new_compressed_slot(TupleTableSlot *slot, ScanDirection direction)
|
||||
|
||||
/* A slot can be empty if just started the scan or (or moved back to the
|
||||
* start due to backward scan) */
|
||||
if (TTS_EMPTY(slot))
|
||||
if (TTS_EMPTY(slot) || arrow_slot_is_consumed(slot))
|
||||
return true;
|
||||
|
||||
if (direction == ForwardScanDirection)
|
||||
if (likely(direction == ForwardScanDirection))
|
||||
{
|
||||
if (arrow_slot_is_last(slot) || arrow_slot_is_consumed(slot))
|
||||
if (arrow_slot_is_last(slot))
|
||||
return true;
|
||||
}
|
||||
else if (direction == BackwardScanDirection)
|
||||
{
|
||||
/* Check if backward scan reached the start or the slot values */
|
||||
if (arrow_slot_row_index(slot) <= 1)
|
||||
if (arrow_slot_is_first(slot))
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -914,7 +914,7 @@ hypercore_getnextslot_compressed(HypercoreScanDesc scan, ScanDirection direction
|
||||
{
|
||||
ExecClearTuple(slot);
|
||||
|
||||
if (direction == ForwardScanDirection)
|
||||
if (likely(direction == ForwardScanDirection))
|
||||
{
|
||||
scan->hs_scan_state = HYPERCORE_SCAN_NON_COMPRESSED;
|
||||
return hypercore_getnextslot(&scan->rs_base, direction, slot);
|
||||
|
@ -5,6 +5,7 @@
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include "nodes/decompress_chunk/vector_quals.h"
|
||||
#include <access/sdir.h>
|
||||
#include <utils/memutils.h>
|
||||
|
||||
#include "arrow_tts.h"
|
||||
@ -88,12 +89,13 @@ uint16
|
||||
ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
|
||||
{
|
||||
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
||||
const uint16 rowindex = arrow_slot_row_index(slot);
|
||||
ScanDirection direction = econtext->ecxt_estate->es_direction;
|
||||
|
||||
/* Compute the vector quals over both compressed and non-compressed
|
||||
* tuples. In case a non-compressed tuple is filtered, return SomeRowsPass
|
||||
* although only one row will pass. */
|
||||
if (rowindex <= 1)
|
||||
if ((direction == ForwardScanDirection && arrow_slot_is_first(slot)) ||
|
||||
(direction == BackwardScanDirection && arrow_slot_is_last(slot)))
|
||||
{
|
||||
vector_qual_state_reset(vqstate);
|
||||
VectorQualSummary vector_qual_summary = vqstate->vectorized_quals_constified != NIL ?
|
||||
@ -114,6 +116,8 @@ ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
|
||||
case SomeRowsPass:
|
||||
break;
|
||||
}
|
||||
|
||||
arrow_slot_set_qual_result(slot, vqstate->vector_qual_result);
|
||||
}
|
||||
|
||||
/* Fast path when all rows have passed (i.e., no rows filtered). No need
|
||||
@ -123,16 +127,27 @@ ExecVectorQual(VectorQualState *vqstate, ExprContext *econtext)
|
||||
|
||||
const uint16 nrows = arrow_slot_total_row_count(slot);
|
||||
const uint16 off = arrow_slot_arrow_offset(slot);
|
||||
const uint64 *qual_result = arrow_slot_get_qual_result(slot);
|
||||
uint16 nfiltered = 0;
|
||||
|
||||
for (uint16 i = off; i < nrows; i++)
|
||||
if (direction == ForwardScanDirection)
|
||||
{
|
||||
if (arrow_row_is_valid(vqstate->vector_qual_result, i))
|
||||
break;
|
||||
nfiltered++;
|
||||
for (uint16 i = off; i < nrows; i++)
|
||||
{
|
||||
if (arrow_row_is_valid(qual_result, i))
|
||||
break;
|
||||
nfiltered++;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
for (uint16 i = off; i > 0; i--)
|
||||
{
|
||||
if (arrow_row_is_valid(qual_result, i))
|
||||
break;
|
||||
nfiltered++;
|
||||
}
|
||||
}
|
||||
|
||||
arrow_slot_set_qual_result(slot, vqstate->vector_qual_result);
|
||||
|
||||
return nfiltered;
|
||||
}
|
||||
|
@ -51,7 +51,7 @@ typedef struct ColumnarScanState
|
||||
{
|
||||
CustomScanState css;
|
||||
VectorQualState vqstate;
|
||||
ExprState *segmentby_qual;
|
||||
ExprState *segmentby_exprstate;
|
||||
ScanKey scankeys;
|
||||
int nscankeys;
|
||||
List *scankey_quals;
|
||||
@ -487,13 +487,13 @@ ExecSegmentbyQual(ExprState *qual, ExprContext *econtext)
|
||||
TupleTableSlot *slot = econtext->ecxt_scantuple;
|
||||
ScanDirection direction = econtext->ecxt_estate->es_direction;
|
||||
|
||||
Assert(direction == ForwardScanDirection || direction == BackwardScanDirection);
|
||||
Assert(TTS_IS_ARROWTUPLE(slot));
|
||||
|
||||
if (qual == NULL || !should_check_segmentby_qual(direction, slot))
|
||||
return true;
|
||||
|
||||
Assert(!arrow_slot_is_consumed(slot));
|
||||
|
||||
return ExecQual(qual, econtext);
|
||||
}
|
||||
|
||||
@ -540,7 +540,7 @@ columnar_scan_exec(CustomScanState *state)
|
||||
* If no quals to check, do the fast path and just return the raw scan
|
||||
* tuple or a projected one.
|
||||
*/
|
||||
if (!qual && !has_vecquals && !cstate->segmentby_qual)
|
||||
if (!qual && !has_vecquals && !cstate->segmentby_exprstate)
|
||||
{
|
||||
bool gottuple = getnextslot(scandesc, direction, slot);
|
||||
|
||||
@ -594,7 +594,7 @@ columnar_scan_exec(CustomScanState *state)
|
||||
* since segmentby quals don't need decompression and can filter all
|
||||
* values in an arrow slot in one go.
|
||||
*/
|
||||
if (!ExecSegmentbyQual(cstate->segmentby_qual, econtext))
|
||||
if (!ExecSegmentbyQual(cstate->segmentby_exprstate, econtext))
|
||||
{
|
||||
/* The slot didn't pass filters so read the next slot */
|
||||
const uint16 nrows = arrow_slot_total_row_count(slot);
|
||||
@ -613,7 +613,14 @@ columnar_scan_exec(CustomScanState *state)
|
||||
TS_DEBUG_LOG("vectorized filtering of %u rows", nfiltered);
|
||||
|
||||
/* Skip ahead with the amount filtered */
|
||||
ExecIncrArrowTuple(slot, nfiltered);
|
||||
if (direction == ForwardScanDirection)
|
||||
ExecIncrArrowTuple(slot, nfiltered);
|
||||
else
|
||||
{
|
||||
Assert(direction == BackwardScanDirection);
|
||||
ExecDecrArrowTuple(slot, nfiltered);
|
||||
}
|
||||
|
||||
InstrCountFiltered1(state, nfiltered);
|
||||
|
||||
if (nfiltered == total_nrows && total_nrows > 1)
|
||||
@ -787,7 +794,7 @@ columnar_scan_begin(CustomScanState *state, EState *estate, int eflags)
|
||||
if (cstate->css.ss.ps.ps_ProjInfo)
|
||||
create_simple_projection_state_if_possible(cstate);
|
||||
|
||||
cstate->segmentby_qual = ExecInitQual(cstate->segmentby_quals, (PlanState *) state);
|
||||
cstate->segmentby_exprstate = ExecInitQual(cstate->segmentby_quals, (PlanState *) state);
|
||||
|
||||
/*
|
||||
* After having initialized ExprState for processing regular and segmentby
|
||||
|
@ -1159,3 +1159,50 @@ where time <= '2022-06-02' and device = '1'::oid;
|
||||
1381.63850675326
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- Test backwards scan with segmentby and vector quals
|
||||
--
|
||||
select count(*)-4 as myoffset from readings
|
||||
where time <= '2022-06-02' and device in (1, 2)
|
||||
\gset
|
||||
-- Get the last four values to compare with cursor fetch backward from
|
||||
-- the end
|
||||
select * from readings
|
||||
where time <= '2022-06-02' and device in (1, 2)
|
||||
offset :myoffset;
|
||||
time | location | device | temp | humidity
|
||||
------------------------------+----------+--------+------------------+------------------
|
||||
Wed Jun 01 19:10:00 2022 PDT | 3 | 2 | 25.6339083021175 | 85.7531443688847
|
||||
Wed Jun 01 20:30:00 2022 PDT | 2 | 2 | 6.02098537951642 | 56.3153986241908
|
||||
Wed Jun 01 23:15:00 2022 PDT | 2 | 2 | 35.7529756426646 | 86.0243391529811
|
||||
Wed Jun 01 23:40:00 2022 PDT | 3 | 2 | 3.94232546784524 | 55.454709690509
|
||||
(4 rows)
|
||||
|
||||
begin;
|
||||
declare cur1 scroll cursor for
|
||||
select * from readings
|
||||
where time <= '2022-06-02' and device in (1, 2);
|
||||
move last cur1;
|
||||
-- move one step beyond last
|
||||
fetch forward 1 from cur1;
|
||||
time | location | device | temp | humidity
|
||||
------+----------+--------+------+----------
|
||||
(0 rows)
|
||||
|
||||
-- fetch the last 4 values with two fetches
|
||||
fetch backward 2 from cur1;
|
||||
time | location | device | temp | humidity
|
||||
------------------------------+----------+--------+------------------+------------------
|
||||
Wed Jun 01 23:40:00 2022 PDT | 3 | 2 | 3.94232546784524 | 55.454709690509
|
||||
Wed Jun 01 23:15:00 2022 PDT | 2 | 2 | 35.7529756426646 | 86.0243391529811
|
||||
(2 rows)
|
||||
|
||||
fetch backward 2 from cur1;
|
||||
time | location | device | temp | humidity
|
||||
------------------------------+----------+--------+------------------+------------------
|
||||
Wed Jun 01 20:30:00 2022 PDT | 2 | 2 | 6.02098537951642 | 56.3153986241908
|
||||
Wed Jun 01 19:10:00 2022 PDT | 3 | 2 | 25.6339083021175 | 85.7531443688847
|
||||
(2 rows)
|
||||
|
||||
close cur1;
|
||||
commit;
|
||||
|
@ -449,3 +449,29 @@ where time <= '2022-06-02' and device = '1'::oid;
|
||||
|
||||
select sum(humidity) from readings
|
||||
where time <= '2022-06-02' and device = '1'::oid;
|
||||
|
||||
--
|
||||
-- Test backwards scan with segmentby and vector quals
|
||||
--
|
||||
select count(*)-4 as myoffset from readings
|
||||
where time <= '2022-06-02' and device in (1, 2)
|
||||
\gset
|
||||
|
||||
-- Get the last four values to compare with cursor fetch backward from
|
||||
-- the end
|
||||
select * from readings
|
||||
where time <= '2022-06-02' and device in (1, 2)
|
||||
offset :myoffset;
|
||||
|
||||
begin;
|
||||
declare cur1 scroll cursor for
|
||||
select * from readings
|
||||
where time <= '2022-06-02' and device in (1, 2);
|
||||
move last cur1;
|
||||
-- move one step beyond last
|
||||
fetch forward 1 from cur1;
|
||||
-- fetch the last 4 values with two fetches
|
||||
fetch backward 2 from cur1;
|
||||
fetch backward 2 from cur1;
|
||||
close cur1;
|
||||
commit;
|
||||
|
Loading…
x
Reference in New Issue
Block a user