mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-18 03:23:37 +08:00
Fix column filtering for DML decompression
With index scans during INSERT decompression caused by unique constraints, we need to make sure we are using key columns from constraints while building scan keys. This makes the code more flexible for any kind of index found on the chunk by not relying on compression settings.
This commit is contained in:
parent
64af307eb9
commit
2908b15899
@ -26,6 +26,7 @@
|
||||
#include <nodes/print.h>
|
||||
#include <optimizer/optimizer.h>
|
||||
#include <parser/parse_coerce.h>
|
||||
#include <parser/parse_relation.h>
|
||||
#include <parser/parsetree.h>
|
||||
#include <storage/lmgr.h>
|
||||
#include <storage/predicate.h>
|
||||
@ -2245,8 +2246,9 @@ compressed_insert_key_columns(Relation relation)
|
||||
*/
|
||||
static ScanKeyData *
|
||||
build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation out_rel,
|
||||
CompressionSettings *settings, TupleTableSlot *slot,
|
||||
Relation *result_index_rel, int *num_scan_keys)
|
||||
Bitmapset *key_columns, TupleTableSlot *slot,
|
||||
Relation *result_index_rel, Bitmapset **index_columns,
|
||||
int *num_scan_keys)
|
||||
{
|
||||
List *index_oids;
|
||||
ListCell *lc;
|
||||
@ -2259,7 +2261,6 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
|
||||
{
|
||||
Relation index_rel = index_open(lfirst_oid(lc), AccessShareLock);
|
||||
IndexInfo *index_info = BuildIndexInfo(index_rel);
|
||||
bool matches = false;
|
||||
|
||||
/* Can't use partial or expression indexes */
|
||||
if (index_info->ii_Predicate != NIL || index_info->ii_Expressions != NIL)
|
||||
@ -2294,26 +2295,16 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
|
||||
for (int i = 0; i < index_rel->rd_index->indnkeyatts; i++)
|
||||
{
|
||||
AttrNumber attnum = AttrOffsetGetAttrNumber(i);
|
||||
char *attname = get_attname(RelationGetRelid(index_rel), attnum, false);
|
||||
const NameData *attname = attnumAttName(index_rel, attnum);
|
||||
|
||||
/* If we are at the last attribute, check its the sequence number attribute.
|
||||
* This means we found all other attributes on the hypertable and this could be our
|
||||
* index.
|
||||
*/
|
||||
if (index_rel->rd_index->indnatts - 1 == i)
|
||||
{
|
||||
if (strcmp(attname, COMPRESSION_COLUMN_METADATA_SEQUENCE_NUM_NAME) == 0)
|
||||
matches = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!ts_array_is_member(settings->fd.segmentby, attname))
|
||||
/* Make sure we find columns in key columns in order to select the right index */
|
||||
if (!bms_is_member(get_attnum(out_rel->rd_id, NameStr(*attname)), key_columns))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
bool isnull;
|
||||
AttrNumber ht_attno = get_attnum(hypertable_relid, attname);
|
||||
AttrNumber ht_attno = get_attnum(hypertable_relid, NameStr(*attname));
|
||||
Datum value = slot_getattr(slot, ht_attno, &isnull);
|
||||
|
||||
if (isnull)
|
||||
@ -2369,7 +2360,7 @@ build_index_scankeys_using_slot(Oid hypertable_relid, Relation in_rel, Relation
|
||||
value);
|
||||
}
|
||||
|
||||
if (matches)
|
||||
if (*num_scan_keys > 0)
|
||||
{
|
||||
*result_index_rel = index_rel;
|
||||
break;
|
||||
@ -2672,6 +2663,16 @@ decompress_batches_seqscan(Relation in_rel, Relation out_rel, Snapshot snapshot,
|
||||
|
||||
write_logical_replication_msg_decompression_start();
|
||||
result = delete_compressed_tuple(&decompressor, snapshot, compressed_tuple);
|
||||
/* skip reporting error if isolation level is < Repeatable Read
|
||||
* since somebody decompressed the data concurrently, we need to take
|
||||
* that data into account as well when in Read Committed level
|
||||
*/
|
||||
if (result == TM_Deleted && !IsolationUsesXactSnapshot())
|
||||
{
|
||||
write_logical_replication_msg_decompression_end();
|
||||
stats.batches_decompressed++;
|
||||
continue;
|
||||
}
|
||||
if (result != TM_Ok)
|
||||
{
|
||||
table_endscan(scan);
|
||||
@ -2732,6 +2733,7 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
|
||||
Assert(settings);
|
||||
|
||||
Bitmapset *key_columns = compressed_insert_key_columns(out_rel);
|
||||
Bitmapset *index_columns = NULL;
|
||||
Bitmapset *null_columns = NULL;
|
||||
struct decompress_batches_stats stats;
|
||||
|
||||
@ -2740,37 +2742,26 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
|
||||
ScanKeyData *index_scankeys = build_index_scankeys_using_slot(cis->hypertable_relid,
|
||||
in_rel,
|
||||
out_rel,
|
||||
settings,
|
||||
key_columns,
|
||||
slot,
|
||||
&index_rel,
|
||||
&index_columns,
|
||||
&num_index_scankeys);
|
||||
|
||||
if (index_rel)
|
||||
{
|
||||
/*
|
||||
* Prepare the heap scan keys if any
|
||||
* This assumes that columns in segmentby are
|
||||
* handled by the index scan keys and potentially
|
||||
* might need to be handled by going through
|
||||
* index scan keys instead
|
||||
* Prepare the heap scan keys for all
|
||||
* key columns not found in the index
|
||||
*/
|
||||
Bitmapset *filtered_key_columns = NULL;
|
||||
int i = -1;
|
||||
while ((i = bms_next_member(key_columns, i)) > 0)
|
||||
{
|
||||
AttrNumber attno = i + FirstLowInvalidHeapAttributeNumber;
|
||||
char *attname = get_attname(out_rel->rd_id, attno, false);
|
||||
if (!ts_array_is_member(settings->fd.segmentby, attname))
|
||||
{
|
||||
filtered_key_columns = bms_add_member(filtered_key_columns, i);
|
||||
}
|
||||
}
|
||||
key_columns = bms_difference(key_columns, index_columns);
|
||||
|
||||
int num_heap_scankeys;
|
||||
ScanKeyData *heap_scankeys = build_heap_scankeys(cis->hypertable_relid,
|
||||
in_rel,
|
||||
out_rel,
|
||||
settings,
|
||||
filtered_key_columns,
|
||||
key_columns,
|
||||
&null_columns,
|
||||
slot,
|
||||
&num_heap_scankeys);
|
||||
|
@ -2121,8 +2121,7 @@ time|device|location|value
|
||||
7| 1| 100| 20
|
||||
8| 1| 100| 20
|
||||
9| 1| 100| 20
|
||||
1| 1| 200| 100
|
||||
(11 rows)
|
||||
(10 rows)
|
||||
|
||||
step SChunkStat: SELECT status from _timescaledb_catalog.chunk
|
||||
WHERE id = ( select min(ch.id) FROM _timescaledb_catalog.hypertable ht, _timescaledb_catalog.chunk ch WHERE ch.hypertable_id = ht.id AND ht.table_name like 'ts_device_table');
|
||||
|
Loading…
x
Reference in New Issue
Block a user