diff --git a/tsl/src/compression/api.c b/tsl/src/compression/api.c index ddb35ddc9..1e4338547 100644 --- a/tsl/src/compression/api.c +++ b/tsl/src/compression/api.c @@ -8,6 +8,7 @@ * compress and decompress chunks */ #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "hypercube.h" #include "hypertable.h" #include "hypertable_cache.h" +#include "ts_catalog/catalog.h" #include "ts_catalog/continuous_agg.h" #include "ts_catalog/hypertable_compression.h" #include "ts_catalog/compression_chunk_size.h" @@ -934,6 +936,7 @@ bool tsl_recompress_chunk_wrapper(Chunk *uncompressed_chunk) { Oid uncompressed_chunk_relid = uncompressed_chunk->table_id; + if (ts_chunk_is_unordered(uncompressed_chunk)) { if (!decompress_chunk_impl(uncompressed_chunk->hypertable_relid, @@ -1129,28 +1132,23 @@ fetch_unmatched_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tuples Relation uncompressed_chunk_rel, bool *unmatched_rows_exist) { - TableScanDesc heapScan; - HeapTuple uncompressed_tuple; - TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel); + TableScanDesc scan; + TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL); + Snapshot snapshot = GetLatestSnapshot(); - heapScan = table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), 0, NULL); - TupleTableSlot *heap_tuple_slot = - MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple); + scan = table_beginscan(uncompressed_chunk_rel, snapshot, 0, NULL); - while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL) + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { if (!(*unmatched_rows_exist)) *unmatched_rows_exist = true; - ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false); - slot_getallattrs(heap_tuple_slot); - - tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot); - - simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self); + slot_getallattrs(slot); + tuplesort_puttupleslot(segment_tuplesortstate, slot); + simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot); } - ExecDropSingleTupleTableSlot(heap_tuple_slot); - table_endscan(heapScan); + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); } static bool @@ -1159,9 +1157,8 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso Relation uncompressed_chunk_rel, CompressedSegmentInfo **current_segment) { - TableScanDesc heapScan; - HeapTuple uncompressed_tuple; - TupleDesc uncompressed_rel_tupdesc = RelationGetDescr(uncompressed_chunk_rel); + TableScanDesc scan; + Snapshot snapshot; int index = 0; int nsegbycols_nonnull = 0; Bitmapset *null_segbycols = NULL; @@ -1202,21 +1199,18 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso index++; } - heapScan = - table_beginscan(uncompressed_chunk_rel, GetLatestSnapshot(), nsegbycols_nonnull, scankey); - TupleTableSlot *heap_tuple_slot = - MakeTupleTableSlot(uncompressed_rel_tupdesc, &TTSOpsHeapTuple); + snapshot = GetLatestSnapshot(); + scan = table_beginscan(uncompressed_chunk_rel, snapshot, nsegbycols_nonnull, scankey); + TupleTableSlot *slot = table_slot_create(uncompressed_chunk_rel, NULL); - while ((uncompressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL) + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { bool valid = true; /* check for NULL values in this segment manually */ for (int attno = bms_next_member(null_segbycols, -1); attno >= 0; attno = bms_next_member(null_segbycols, attno)) { - if (!heap_attisnull(uncompressed_tuple, - attno, - RelationGetDescr(uncompressed_chunk_rel))) + if (!slot_attisnull(slot, attno)) { valid = false; break; @@ -1225,16 +1219,15 @@ fetch_matching_uncompressed_chunk_into_tuplesort(Tuplesortstate *segment_tupleso if (valid) { matching_exist = true; - ExecStoreHeapTuple(uncompressed_tuple, heap_tuple_slot, false); - slot_getallattrs(heap_tuple_slot); - tuplesort_puttupleslot(segment_tuplesortstate, heap_tuple_slot); - /* simple_heap_delete since we don't expect concurrent updates, have exclusive lock on - * the relation */ - simple_heap_delete(uncompressed_chunk_rel, &uncompressed_tuple->t_self); + slot_getallattrs(slot); + tuplesort_puttupleslot(segment_tuplesortstate, slot); + /* simple_table_tuple_delete since we don't expect concurrent + * updates, have exclusive lock on the relation */ + simple_table_tuple_delete(uncompressed_chunk_rel, &slot->tts_tid, snapshot); } } - ExecDropSingleTupleTableSlot(heap_tuple_slot); - table_endscan(heapScan); + ExecDropSingleTupleTableSlot(slot); + table_endscan(scan); if (null_segbycols != NULL) pfree(null_segbycols); diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index a2ab16cff..fdd7f4c05 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -3,11 +3,8 @@ * Please see the included NOTICE for copyright information and * LICENSE-TIMESCALE for a copy of the license. */ - -#include - #include -#include +#include #include #include #include @@ -45,6 +42,7 @@ #include #include #include +#include #include "compat/compat.h" @@ -64,6 +62,7 @@ #include "nodes/hypertable_modify.h" #include "indexing.h" #include "segment_meta.h" +#include "ts_catalog/catalog.h" #include "ts_catalog/compression_chunk_size.h" #include "ts_catalog/hypertable_compression.h" @@ -522,9 +521,8 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio { TupleDesc tupDesc = RelationGetDescr(in_rel); Tuplesortstate *tuplesortstate; - HeapTuple tuple; - TableScanDesc heapScan; - TupleTableSlot *heap_tuple_slot = MakeTupleTableSlot(tupDesc, &TTSOpsHeapTuple); + TableScanDesc scan; + TupleTableSlot *slot; AttrNumber *sort_keys = palloc(sizeof(*sort_keys) * n_keys); Oid *sort_operators = palloc(sizeof(*sort_operators) * n_keys); Oid *sort_collations = palloc(sizeof(*sort_collations) * n_keys); @@ -549,23 +547,22 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio NULL, false /*=randomAccess*/); - heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); - for (tuple = heap_getnext(heapScan, ForwardScanDirection); tuple != NULL; - tuple = heap_getnext(heapScan, ForwardScanDirection)) + scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); + slot = table_slot_create(in_rel, NULL); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { - if (HeapTupleIsValid(tuple)) + if (!TTS_EMPTY(slot)) { /* This may not be the most efficient way to do things. * Since we use begin_heap() the tuplestore expects tupleslots, * so ISTM that the options are this or maybe putdatum(). */ - ExecStoreHeapTuple(tuple, heap_tuple_slot, false); - - tuplesort_puttupleslot(tuplesortstate, heap_tuple_slot); + tuplesort_puttupleslot(tuplesortstate, slot); } } - table_endscan(heapScan); + table_endscan(scan); /* Perform an analyze on the chunk to get up-to-date stats before compressing. * We do it at this point because we've just read out the entire chunk into @@ -573,7 +570,7 @@ compress_chunk_sort_relation(Relation in_rel, int n_keys, const ColumnCompressio */ run_analyze_on_chunk(in_rel->rd_id); - ExecDropSingleTupleTableSlot(heap_tuple_slot); + ExecDropSingleTupleTableSlot(slot); tuplesort_performsort(tuplesortstate); @@ -727,32 +724,29 @@ static int32 table_scan_sequence_number(Relation table_rel, int16 seq_num_column_num, ScanKeyData *scankey, int num_scankeys) { - int32 curr_seq_num = 0, max_seq_num = 0; - bool is_null; - HeapTuple compressed_tuple; - Datum seq_num; - TupleDesc in_desc = RelationGetDescr(table_rel); + int32 max_seq_num = 0; + TupleTableSlot *slot; + TableScanDesc scan; - TableScanDesc heap_scan = - table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey); + slot = table_slot_create(table_rel, NULL); + scan = table_beginscan(table_rel, GetLatestSnapshot(), num_scankeys, scankey); - for (compressed_tuple = heap_getnext(heap_scan, ForwardScanDirection); compressed_tuple != NULL; - compressed_tuple = heap_getnext(heap_scan, ForwardScanDirection)) + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { - Assert(HeapTupleIsValid(compressed_tuple)); + bool is_null; + Datum seq_num = slot_getattr(slot, seq_num_column_num, &is_null); - seq_num = heap_getattr(compressed_tuple, seq_num_column_num, in_desc, &is_null); if (!is_null) { - curr_seq_num = DatumGetInt32(seq_num); + int32 curr_seq_num = DatumGetInt32(seq_num); + if (max_seq_num < curr_seq_num) - { max_seq_num = curr_seq_num; - } } } - table_endscan(heap_scan); + table_endscan(scan); + ExecDropSingleTupleTableSlot(slot); return max_seq_num; } @@ -1447,24 +1441,27 @@ decompress_chunk(Oid in_table, Oid out_table) Relation in_rel = table_open(in_table, ExclusiveLock); RowDecompressor decompressor = build_decompressor(in_rel, out_rel); + TupleTableSlot *slot = table_slot_create(in_rel, NULL); + TableScanDesc scan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); - HeapTuple compressed_tuple; - TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), 0, (ScanKey) NULL); - - for (compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); compressed_tuple != NULL; - compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { - Assert(HeapTupleIsValid(compressed_tuple)); - heap_deform_tuple(compressed_tuple, + bool should_free; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, false, &should_free); + + heap_deform_tuple(tuple, decompressor.in_desc, decompressor.compressed_datums, decompressor.compressed_is_nulls); + if (should_free) + heap_freetuple(tuple); + row_decompressor_decompress_row_to_table(&decompressor); } - table_endscan(heapScan); - + table_endscan(scan); + ExecDropSingleTupleTableSlot(slot); FreeBulkInsertState(decompressor.bistate); MemoryContextDelete(decompressor.per_compressed_row_ctx); ts_catalog_close_indexes(decompressor.indexstate); @@ -2167,13 +2164,12 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo * the index on the uncompressed chunks in order to do speculative insertion * which is always built from all tuples (even in higher levels of isolation). */ - TableScanDesc heapScan = table_beginscan(in_rel, GetLatestSnapshot(), num_scankeys, scankeys); + TupleTableSlot *compressed_slot = table_slot_create(in_rel, NULL); + Snapshot snapshot = GetLatestSnapshot(); + TableScanDesc scan = table_beginscan(in_rel, snapshot, num_scankeys, scankeys); - for (HeapTuple compressed_tuple = heap_getnext(heapScan, ForwardScanDirection); - compressed_tuple != NULL; - compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) + while (table_scan_getnextslot(scan, ForwardScanDirection, compressed_slot)) { - Assert(HeapTupleIsValid(compressed_tuple)); bool valid = true; /* @@ -2183,7 +2179,7 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo for (int attno = bms_next_member(null_columns, -1); attno >= 0; attno = bms_next_member(null_columns, attno)) { - if (!heap_attisnull(compressed_tuple, attno, decompressor.in_desc)) + if (!slot_attisnull(compressed_slot, attno)) { valid = false; break; @@ -2196,11 +2192,16 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo if (!valid) continue; - heap_deform_tuple(compressed_tuple, + bool should_free; + HeapTuple tuple = ExecFetchSlotHeapTuple(compressed_slot, false, &should_free); + heap_deform_tuple(tuple, decompressor.in_desc, decompressor.compressed_datums, decompressor.compressed_is_nulls); + if (should_free) + heap_freetuple(tuple); + write_logical_replication_msg_decompression_start(); row_decompressor_decompress_row_to_table(&decompressor); write_logical_replication_msg_decompression_end(); @@ -2208,9 +2209,9 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo TM_FailureData tmfd; TM_Result result pg_attribute_unused(); result = table_tuple_delete(in_rel, - &compressed_tuple->t_self, + &compressed_slot->tts_tid, decompressor.mycid, - GetTransactionSnapshot(), + snapshot, InvalidSnapshot, true, &tmfd, @@ -2220,8 +2221,8 @@ decompress_batches_for_insert(ChunkInsertState *cis, Chunk *chunk, TupleTableSlo cis->cds->tuples_decompressed += decompressor.tuples_decompressed; } - table_endscan(heapScan); - + table_endscan(scan); + ExecDropSingleTupleTableSlot(compressed_slot); ts_catalog_close_indexes(decompressor.indexstate); FreeExecutorState(decompressor.estate); FreeBulkInsertState(decompressor.bistate); @@ -3057,22 +3058,6 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int * return scankeys; } -static TM_Result -delete_compressed_tuple(RowDecompressor *decompressor, HeapTuple compressed_tuple) -{ - TM_FailureData tmfd; - TM_Result result; - result = table_tuple_delete(decompressor->in_rel, - &compressed_tuple->t_self, - decompressor->mycid, - GetTransactionSnapshot(), - InvalidSnapshot, - true, - &tmfd, - false); - return result; -} - static void report_error(TM_Result result) { @@ -3129,13 +3114,12 @@ static bool decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys, Bitmapset *null_columns, List *is_nulls, bool *chunk_status_changed) { - TM_Result result; - HeapTuple compressed_tuple; Snapshot snapshot = GetTransactionSnapshot(); - TableScanDesc heapScan = - table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys); - while ((compressed_tuple = heap_getnext(heapScan, ForwardScanDirection)) != NULL) + TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL); + TableScanDesc scan = table_beginscan(decompressor->in_rel, snapshot, num_scankeys, scankeys); + + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { bool skip_tuple = false; int attrno = bms_next_member(null_columns, -1); @@ -3145,7 +3129,7 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num for (; attrno >= 0; attrno = bms_next_member(null_columns, attrno)) { is_null_condition = list_nth_int(is_nulls, pos); - seg_col_is_null = heap_attisnull(compressed_tuple, attrno, decompressor->in_desc); + seg_col_is_null = slot_attisnull(slot, attrno); if ((seg_col_is_null && !is_null_condition) || (!seg_col_is_null && is_null_condition)) { /* @@ -3160,15 +3144,32 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num } if (skip_tuple) continue; + + TM_FailureData tmfd; + TM_Result result; + bool should_free; + HeapTuple compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free); + heap_deform_tuple(compressed_tuple, decompressor->in_desc, decompressor->compressed_datums, decompressor->compressed_is_nulls); - result = delete_compressed_tuple(decompressor, compressed_tuple); + if (should_free) + heap_freetuple(compressed_tuple); + + result = table_tuple_delete(decompressor->in_rel, + &slot->tts_tid, + decompressor->mycid, + snapshot, + InvalidSnapshot, + true, + &tmfd, + false); + if (result != TM_Ok) { - table_endscan(heapScan); + table_endscan(scan); report_error(result); } row_decompressor_decompress_row_to_table(decompressor); @@ -3176,7 +3177,9 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num } if (scankeys) pfree(scankeys); - table_endscan(heapScan); + table_endscan(scan); + ExecDropSingleTupleTableSlot(slot); + return true; } @@ -3230,24 +3233,24 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel ScanKeyData *index_scankeys, int num_index_scankeys, ScanKeyData *scankeys, int num_scankeys, bool *chunk_status_changed) { - HeapTuple compressed_tuple; - Snapshot snapshot; + Snapshot snapshot = GetTransactionSnapshot(); int num_segmentby_filtered_rows = 0; int num_orderby_filtered_rows = 0; - snapshot = GetTransactionSnapshot(); IndexScanDesc scan = index_beginscan(decompressor->in_rel, index_rel, snapshot, num_index_scankeys, 0); TupleTableSlot *slot = table_slot_create(decompressor->in_rel, NULL); index_rescan(scan, index_scankeys, num_index_scankeys, NULL, 0); + while (index_getnext_slot(scan, ForwardScanDirection, slot)) { TM_Result result; - /* Deconstruct the tuple */ - slot_getallattrs(slot); - compressed_tuple = - heap_form_tuple(slot->tts_tupleDescriptor, slot->tts_values, slot->tts_isnull); - compressed_tuple->t_self = slot->tts_tid; + TM_FailureData tmfd; + bool should_free; + HeapTuple compressed_tuple; + + compressed_tuple = ExecFetchSlotHeapTuple(slot, false, &should_free); + num_segmentby_filtered_rows++; if (num_scankeys) { @@ -3268,27 +3271,41 @@ decompress_batches_using_index(RowDecompressor *decompressor, Relation index_rel if (!valid) { num_orderby_filtered_rows++; + + if (should_free) + heap_freetuple(compressed_tuple); continue; } } + heap_deform_tuple(compressed_tuple, decompressor->in_desc, decompressor->compressed_datums, decompressor->compressed_is_nulls); - result = delete_compressed_tuple(decompressor, compressed_tuple); + if (should_free) + heap_freetuple(compressed_tuple); + + result = table_tuple_delete(decompressor->in_rel, + &slot->tts_tid, + decompressor->mycid, + snapshot, + InvalidSnapshot, + true, + &tmfd, + false); + /* skip reporting error if isolation level is < Repeatable Read */ if (result == TM_Deleted && !IsolationUsesXactSnapshot()) continue; + if (result != TM_Ok) { - heap_freetuple(compressed_tuple); index_endscan(scan); index_close(index_rel, AccessShareLock); report_error(result); } row_decompressor_decompress_row_to_table(decompressor); - heap_freetuple(compressed_tuple); *chunk_status_changed = true; }