Use table scan API in compression code

Refactor the compression code to only use the table scan API when
scanning relations instead of a mix of the table and heap scan
APIs. The table scan API is a higher-level API and recommended as it
works for any type of relation and uses table slots directly, which
means that in some cases a full heap tuple need not be materialized.
This commit is contained in:
Erik Nordström 2023-12-08 11:25:55 +01:00 committed by Erik Nordström
parent 0f60f88621
commit e8b81c2ebe
2 changed files with 131 additions and 121 deletions

View File

@ -8,6 +8,7 @@
* compress and decompress chunks
*/
#include <postgres.h>
#include <access/tableam.h>
#include <access/xact.h>
#include <catalog/dependency.h>
#include <commands/tablecmds.h>
@ -36,6 +37,7 @@
#include "hypercube.h"
#include "hypertable.h"
#include "hypertable_cache.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/continuous_agg.h"
#include "ts_catalog/hypertable_compression.h"
#include "ts_catalog/compression_chunk_size.h"
@ -934,6 +936,7 @@ bool
tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk)
{
Oid uncompressed_chunk_relid = uncompressed_chunk->table_id;
if (ts_chunk_is_unordered(uncompressed_chunk))
{
if (!decompress_chunk_impl(uncompressed_chunk->hypertable_relid,
@ -1129,28 +1132,23 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples
Relation uncompressed_chunk_rel,
bool *unmatched_rows_exist)
{
TableScanDesc heapScan;
HeapTuple uncompressed_tuple;
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);
TableScanDesc scan;
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();
heapScan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, NULL);
TupleTableSlot *heap_tuple_slot =
MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple);
scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL);
while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
if (!(*unmatched_rows_exist))
*unmatched_rows_exist = true;
ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false);
slot_getallattrs(heap_tuple_slot);
tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot);
simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self);
slot_getallattrs(slot);
tuplesort_puttupleslot(segment_tuplesortstate, slot);
simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot);
}
ExecDropSingleTupleTableSlot(heap_tuple_slot);
table_endscan(heapScan);
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
}
static bool
@ -1159,9 +1157,8 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
Relation uncompressed_chunk_rel,
CompressedSegmentInfo **current_segment)
{
TableScanDesc heapScan;
HeapTuple uncompressed_tuple;
TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel);
TableScanDesc scan;
Snapshot snapshot;
int index = 0;
int nsegbycols_nonnull = 0;
Bitmapset *null_segbycols = NULL;
@ -1202,21 +1199,18 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
index++;
}
heapScan =
table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), nsegbycols_nonnull, scankey);
TupleTableSlot *heap_tuple_slot =
MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple);
snapshot = GetLatestSnapshot();
scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey);
TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL);
while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
bool valid = true;
/* check for NULL values in this segment manually */
for (int attno = bms_next_member(null_segbycols, -1); attno >= 0;
attno = bms_next_member(null_segbycols, attno))
{
if (!heap_attisnull(uncompressed_tuple,
attno,
RelationGetDescr(uncompressed_chunk_rel)))
if (!slot_attisnull(slot, attno))
{
valid = false;
break;
@ -1225,16 +1219,15 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso
if (valid)
{
matching_exist = true;
ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false);
slot_getallattrs(heap_tuple_slot);
tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot);
/* simple_heap_delete since we don't expect concurrent updates, have exclusive lock on
* the relation */
simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self);
slot_getallattrs(slot);
tuplesort_puttupleslot(segment_tuplesortstate, slot);
/* simple_table_tuple_delete since we don't expect concurrent
* updates, have exclusive lock on the relation */
simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot);
}
}
ExecDropSingleTupleTableSlot(heap_tuple_slot);
table_endscan(heapScan);
ExecDropSingleTupleTableSlot(slot);
table_endscan(scan);
if (null_segbycols != NULL)
pfree(null_segbycols);

View File

@ -3,11 +3,8 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <math.h>
#include <postgres.h>
#include <access/heapam.h>
#include <access/tableam.h>
#include <access/htup_details.h>
#include <access/multixact.h>
#include <access/valid.h>
@ -45,6 +42,7 @@
#include <utils/tuplesort.h>
#include <utils/typcache.h>
#include <replication/message.h>
#include <math.h>
#include "compat/compat.h"
@ -64,6 +62,7 @@
#include "nodes/hypertable_modify.h"
#include "indexing.h"
#include "segment_meta.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/hypertable_compression.h"
@ -522,9 +521,8 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio
{
TupleDesc tupDesc = RelationGetDescr(in_rel);
Tuplesortstate *tuplesortstate;
HeapTuple tuple;
TableScanDesc heapScan;
TupleTableSlot *heap_tuple_slot = MakeTupleTableSlot(tupDesc, &TTSOpsHeapTuple);
TableScanDesc scan;
TupleTableSlot *slot;
AttrNumber *sort_keys = palloc(sizeof(*sort_keys) * n_keys);
Oid *sort_operators = palloc(sizeof(*sort_operators) * n_keys);
Oid *sort_collations = palloc(sizeof(*sort_collations) * n_keys);
@ -549,23 +547,22 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio
NULL,
false /*=randomAccess*/);
heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
for (tuple = heap_getnext(heapScan, ForwardScanDirection); tuple != NULL;
tuple = heap_getnext(heapScan, ForwardScanDirection))
scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
slot = table_slot_create(in_rel, NULL);
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
if (HeapTupleIsValid(tuple))
if (!TTS_EMPTY(slot))
{
/* This may not be the most efficient way to do things.
* Since we use begin_heap() the tuplestore expects tupleslots,
* so ISTM that the options are this or maybe putdatum().
*/
ExecStoreHeapTuple(tuple, heap_tuple_slot, false);
tuplesort_puttupleslot(tuplesortstate, heap_tuple_slot);
tuplesort_puttupleslot(tuplesortstate, slot);
}
}
table_endscan(heapScan);
table_endscan(scan);
/* Perform an analyze on the chunk to get up-to-date stats before compressing.
* We do it at this point because we've just read out the entire chunk into
@ -573,7 +570,7 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio
*/
run_analyze_on_chunk(in_rel->rd_id);
ExecDropSingleTupleTableSlot(heap_tuple_slot);
ExecDropSingleTupleTableSlot(slot);
tuplesort_performsort(tuplesortstate);
@ -727,32 +724,29 @@ static int32
table_scan_sequence_number(Relation table_rel, int16 seq_num_column_num, ScanKeyData *scankey,
int num_scankeys)
{
int32 curr_seq_num = 0, max_seq_num = 0;
bool is_null;
HeapTuple compressed_tuple;
Datum seq_num;
TupleDesc in_desc = RelationGetDescr(table_rel);
int32 max_seq_num = 0;
TupleTableSlot *slot;
TableScanDesc scan;
TableScanDesc heap_scan =
table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey);
slot = table_slot_create(table_rel, NULL);
scan = table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey);
for (compressed_tuple = heap_getnext(heap_scan, ForwardScanDirection); compressed_tuple != NULL;
compressed_tuple = heap_getnext(heap_scan, ForwardScanDirection))
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Assert(HeapTupleIsValid(compressed_tuple));
bool is_null;
Datum seq_num = slot_getattr(slot, seq_num_column_num, &is_null);
seq_num = heap_getattr(compressed_tuple, seq_num_column_num, in_desc, &is_null);
if (!is_null)
{
curr_seq_num = DatumGetInt32(seq_num);
int32 curr_seq_num = DatumGetInt32(seq_num);
if (max_seq_num < curr_seq_num)
{
max_seq_num = curr_seq_num;
}
}
}
table_endscan(heap_scan);
table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
return max_seq_num;
}
@ -1447,24 +1441,27 @@ decompress_chunk(Oid in_table, Oid out_table)
Relation in_rel = table_open(in_table, ExclusiveLock);
RowDecompressor decompressor = build_decompressor(in_rel, out_rel);
TupleTableSlot *slot = table_slot_create(in_rel, NULL);
TableScanDesc scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
HeapTuple compressed_tuple;
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL);
for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
Assert(HeapTupleIsValid(compressed_tuple));
heap_deform_tuple(compressed_tuple,
bool should_free;
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);
heap_deform_tuple(tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
if (should_free)
heap_freetuple(tuple);
row_decompressor_decompress_row_to_table(&decompressor);
}
table_endscan(heapScan);
table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
ts_catalog_close_indexes(decompressor.indexstate);
@ -2167,13 +2164,12 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
* the index on the uncompressed chunks in order to do speculative insertion
* which is always built from all tuples (even in higher levels of isolation).
*/
TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys);
TupleTableSlot *compressed_slot = table_slot_create(in_rel, NULL);
Snapshot snapshot = GetLatestSnapshot();
TableScanDesc scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys);
for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection);
compressed_tuple != NULL;
compressed_tuple = heap_getnext(heapScan, ForwardScanDirection))
while (table_scan_getnextslot(scan, ForwardScanDirection, compressed_slot))
{
Assert(HeapTupleIsValid(compressed_tuple));
bool valid = true;
/*
@ -2183,7 +2179,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
for (int attno = bms_next_member(null_columns, -1); attno >= 0;
attno = bms_next_member(null_columns, attno))
{
if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc))
if (!slot_attisnull(compressed_slot, attno))
{
valid = false;
break;
@ -2196,11 +2192,16 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
if (!valid)
continue;
heap_deform_tuple(compressed_tuple,
bool should_free;
HeapTuple tuple = ExecFetchSlotHeapTuple(compressed_slot, false, &should_free);
heap_deform_tuple(tuple,
decompressor.in_desc,
decompressor.compressed_datums,
decompressor.compressed_is_nulls);
if (should_free)
heap_freetuple(tuple);
write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row_to_table(&decompressor);
write_logical_replication_msg_decompression_end();
@ -2208,9 +2209,9 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
TM_FailureData tmfd;
TM_Result result pg_attribute_unused();
result = table_tuple_delete(in_rel,
&compressed_tuple->t_self,
&compressed_slot->tts_tid,
decompressor.mycid,
GetTransactionSnapshot(),
snapshot,
InvalidSnapshot,
true,
&tmfd,
@ -2220,8 +2221,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo
cis->cds->tuples_decompressed += decompressor.tuples_decompressed;
}
table_endscan(heapScan);
table_endscan(scan);
ExecDropSingleTupleTableSlot(compressed_slot);
ts_catalog_close_indexes(decompressor.indexstate);
FreeExecutorState(decompressor.estate);
FreeBulkInsertState(decompressor.bistate);
@ -3057,22 +3058,6 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *
return scankeys;
}
static TM_Result
delete_compressed_tuple(RowDecompressor *decompressor, HeapTuple compressed_tuple)
{
TM_FailureData tmfd;
TM_Result result;
result = table_tuple_delete(decompressor->in_rel,
&compressed_tuple->t_self,
decompressor->mycid,
GetTransactionSnapshot(),
InvalidSnapshot,
true,
&tmfd,
false);
return result;
}
static void
report_error(TM_Result result)
{
@ -3129,13 +3114,12 @@ static bool
decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
Bitmapset *null_columns, List *is_nulls, bool *chunk_status_changed)
{
TM_Result result;
HeapTuple compressed_tuple;
Snapshot snapshot = GetTransactionSnapshot();
TableScanDesc heapScan =
table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys);
while ((compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL)
TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL);
TableScanDesc scan = table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys);
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
bool skip_tuple = false;
int attrno = bms_next_member(null_columns, -1);
@ -3145,7 +3129,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno))
{
is_null_condition = list_nth_int(is_nulls, pos);
seg_col_is_null = heap_attisnull(compressed_tuple, attrno, decompressor->in_desc);
seg_col_is_null = slot_attisnull(slot, attrno);
if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition))
{
/*
@ -3160,15 +3144,32 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
}
if (skip_tuple)
continue;
TM_FailureData tmfd;
TM_Result result;
bool should_free;
HeapTuple compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);
heap_deform_tuple(compressed_tuple,
decompressor->in_desc,
decompressor->compressed_datums,
decompressor->compressed_is_nulls);
result = delete_compressed_tuple(decompressor, compressed_tuple);
if (should_free)
heap_freetuple(compressed_tuple);
result = table_tuple_delete(decompressor->in_rel,
&slot->tts_tid,
decompressor->mycid,
snapshot,
InvalidSnapshot,
true,
&tmfd,
false);
if (result != TM_Ok)
{
table_endscan(heapScan);
table_endscan(scan);
report_error(result);
}
row_decompressor_decompress_row_to_table(decompressor);
@ -3176,7 +3177,9 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
}
if (scankeys)
pfree(scankeys);
table_endscan(heapScan);
table_endscan(scan);
ExecDropSingleTupleTableSlot(slot);
return true;
}
@ -3230,24 +3233,24 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
ScanKeyData *index_scankeys, int num_index_scankeys,
ScanKeyData *scankeys, int num_scankeys, bool *chunk_status_changed)
{
HeapTuple compressed_tuple;
Snapshot snapshot;
Snapshot snapshot = GetTransactionSnapshot();
int num_segmentby_filtered_rows = 0;
int num_orderby_filtered_rows = 0;
snapshot = GetTransactionSnapshot();
IndexScanDesc scan =
index_beginscan(decompressor->in_rel, index_rel, snapshot, num_index_scankeys, 0);
TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL);
index_rescan(scan, index_scankeys, num_index_scankeys, NULL, 0);
while (index_getnext_slot(scan, ForwardScanDirection, slot))
{
TM_Result result;
/* Deconstruct the tuple */
slot_getallattrs(slot);
compressed_tuple =
heap_form_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull);
compressed_tuple->t_self = slot->tts_tid;
TM_FailureData tmfd;
bool should_free;
HeapTuple compressed_tuple;
compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free);
num_segmentby_filtered_rows++;
if (num_scankeys)
{
@ -3268,27 +3271,41 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel
if (!valid)
{
num_orderby_filtered_rows++;
if (should_free)
heap_freetuple(compressed_tuple);
continue;
}
}
heap_deform_tuple(compressed_tuple,
decompressor->in_desc,
decompressor->compressed_datums,
decompressor->compressed_is_nulls);
result = delete_compressed_tuple(decompressor, compressed_tuple);
if (should_free)
heap_freetuple(compressed_tuple);
result = table_tuple_delete(decompressor->in_rel,
&slot->tts_tid,
decompressor->mycid,
snapshot,
InvalidSnapshot,
true,
&tmfd,
false);
/* skip reporting error if isolation level is < Repeatable Read */
if (result == TM_Deleted && !IsolationUsesXactSnapshot())
continue;
if (result != TM_Ok)
{
heap_freetuple(compressed_tuple);
index_endscan(scan);
index_close(index_rel, AccessShareLock);
report_error(result);
}
row_decompressor_decompress_row_to_table(decompressor);
heap_freetuple(compressed_tuple);
*chunk_status_changed = true;
}