diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 312f8b080..b2c56dfc3 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -2245,8 +2246,9 @@ compressed_insert_key_columns(Relation relation) */ static ScanKeyData * build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation out_rel, - CompressionSettings *settings, TupleTableSlot *slot, - Relation *result_index_rel, int *num_scan_keys) + Bitmapset *key_columns, TupleTableSlot *slot, + Relation *result_index_rel, Bitmapset **index_columns, + int *num_scan_keys) { List *index_oids; ListCell *lc; @@ -2259,7 +2261,6 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation { Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock); IndexInfo *index_info = BuildIndexInfo(index_rel); - bool matches = false; /* Can't use partial or expression indexes */ if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL) @@ -2294,26 +2295,16 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation for (int i = 0; i < index_rel->rd_index->indnkeyatts; i++) { AttrNumber attnum = AttrOffsetGetAttrNumber(i); - char *attname = get_attname(RelationGetRelid(index_rel), attnum, false); + const NameData *attname = attnumAttName(index_rel, attnum); - /* If we are at the last attribute, check its the sequence number attribute. - * This means we found all other attributes on the hypertable and this could be our - * index. - */ - if (index_rel->rd_index->indnatts - 1 == i) - { - if (strcmp(attname, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME) == 0) - matches = true; - break; - } - - if (!ts_array_is_member(settings->fd.segmentby, attname)) + /* Make sure we find columns in key columns in order to select the right index */ + if (!bms_is_member(get_attnum(out_rel->rd_id, NameStr(*attname)), key_columns)) { break; } bool isnull; - AttrNumber ht_attno = get_attnum(hypertable_relid, attname); + AttrNumber ht_attno = get_attnum(hypertable_relid, NameStr(*attname)); Datum value = slot_getattr(slot, ht_attno, &isnull); if (isnull) @@ -2369,7 +2360,7 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation value); } - if (matches) + if (*num_scan_keys > 0) { *result_index_rel = index_rel; break; @@ -2672,6 +2663,16 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot, write_logical_replication_msg_decompression_start(); result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple); + /* skip reporting error if isolation level is < Repeatable Read + * since somebody decompressed the data concurrently, we need to take + * that data into account as well when in Read Committed level + */ + if (result == TM_Deleted && !IsolationUsesXactSnapshot()) + { + write_logical_replication_msg_decompression_end(); + stats.batches_decompressed++; + continue; + } if (result != TM_Ok) { table_endscan(scan); @@ -2732,6 +2733,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) Assert(settings); Bitmapset *key_columns = compressed_insert_key_columns(out_rel); + Bitmapset *index_columns = NULL; Bitmapset *null_columns = NULL; struct decompress_batches_stats stats; @@ -2740,37 +2742,26 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot) ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid, in_rel, out_rel, - settings, + key_columns, slot, &index_rel, + &index_columns, &num_index_scankeys); if (index_rel) { /* - * Prepare the heap scan keys if any - * This assumes that columns in segmentby are - * handled by the index scan keys and potentially - * might need to be handled by going through - * index scan keys instead + * Prepare the heap scan keys for all + * key columns not found in the index */ - Bitmapset *filtered_key_columns = NULL; - int i = -1; - while ((i = bms_next_member(key_columns, i)) > 0) - { - AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber; - char *attname = get_attname(out_rel->rd_id, attno, false); - if (!ts_array_is_member(settings->fd.segmentby, attname)) - { - filtered_key_columns = bms_add_member(filtered_key_columns, i); - } - } + key_columns = bms_difference(key_columns, index_columns); + int num_heap_scankeys; ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid, in_rel, out_rel, settings, - filtered_key_columns, + key_columns, &null_columns, slot, &num_heap_scankeys); diff --git a/tsl/test/isolation/expected/compression_conflicts_iso.out b/tsl/test/isolation/expected/compression_conflicts_iso.out index 722046f26..7c6ea66c9 100644 --- a/tsl/test/isolation/expected/compression_conflicts_iso.out +++ b/tsl/test/isolation/expected/compression_conflicts_iso.out @@ -2121,8 +2121,7 @@ time|device|location|value 7| 1| 100| 20 8| 1| 100| 20 9| 1| 100| 20 - 1| 1| 200| 100 -(11 rows) +(10 rows) step SChunkStat: SELECT status from _timescaledb_catalog.chunk WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table');