Fix copy of continuous agg invalidation entries

When we copy the invalidation logs for individual continuous
aggregates, the lowest-value was globally overwritten. Fix this
so that the change is specific to a continuous aggregate.
This bug could result in missing invalidations.
This commit is contained in:
gayyappan 2020-03-09 14:54:54 -04:00 committed by gayyappan
parent bfab289121
commit d1700711cf
3 changed files with 70 additions and 5 deletions

View File

@ -811,6 +811,7 @@ insert_materialization_invalidation_logs(List *caggs, List *invalidations,
foreach (lc2, invalidations)
{
Invalidation *entry = (Invalidation *) lfirst(lc2);
int64 lowest_modified_value = entry->lowest_modified_value;
int64 minimum_invalidation_time =
ts_continuous_aggs_get_minimum_invalidation_time(entry->modification_time,
ignore_invalidation_older_than);
@ -818,7 +819,7 @@ insert_materialization_invalidation_logs(List *caggs, List *invalidations,
continue;
if (entry->lowest_modified_value < minimum_invalidation_time)
entry->lowest_modified_value = minimum_invalidation_time;
lowest_modified_value = minimum_invalidation_time;
values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_materialization_invalidation_log_materialization_id)] =
@ -828,7 +829,7 @@ insert_materialization_invalidation_logs(List *caggs, List *invalidations,
Int64GetDatum(entry->modification_time);
values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_materialization_invalidation_log_lowest_modified_value)] =
Int64GetDatum(entry->lowest_modified_value);
Int64GetDatum(lowest_modified_value);
values[AttrNumberGetAttrOffset(
Anum_continuous_aggs_materialization_invalidation_log_greatest_modified_value)] =
Int64GetDatum(entry->greatest_modified_value);

View File

@ -379,6 +379,10 @@ select * from _timescaledb_catalog.continuous_aggs_materialization_invalidation_
6 | 18 | 18 | 18
(2 rows)
DROP VIEW cagg_1 cascade;
NOTICE: drop cascades to table _timescaledb_internal._hyper_5_11_chunk
DROP VIEW cagg_2 cascade;
NOTICE: drop cascades to table _timescaledb_internal._hyper_6_12_chunk
--test the ignore_invalidation_older_than setting
CREATE TABLE continuous_agg_test_ignore_invalidation_older_than(timeval integer, col1 integer, col2 integer);
select create_hypertable('continuous_agg_test_ignore_invalidation_older_than', 'timeval', chunk_time_interval=> 2);
@ -453,8 +457,19 @@ SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
--move the time up (40), but invalidation logic should apply to old time (12)
INSERT INTO continuous_agg_test_ignore_invalidation_older_than VALUES (32,4,2),(36,5,5),(40,3,9);
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
materialization_id | modification_time | lowest_modified_value | greatest_modified_value
--------------------+-------------------+-----------------------+-------------------------
(0 rows)
refresh materialized view cagg_iia1;
LOG: materializing continuous aggregate public.cagg_iia1: processing invalidations, new range up to 42
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
materialization_id | modification_time | lowest_modified_value | greatest_modified_value
--------------------+-------------------+-----------------------+-------------------------
9 | 12 | 5 | 12
(1 row)
--should see change to the 12, 10 bucket but not the 4 or 1 bucket
select * from cagg_iia1 order by 1;
timed | cnt
@ -493,12 +508,26 @@ select * from cagg_iia3 order by 1;
40 | 9
(5 rows)
--tes UPDATES
--test UPDATES
UPDATE continuous_agg_test_ignore_invalidation_older_than set col1=NULL, col2=200 where timeval=32;
UPDATE continuous_agg_test_ignore_invalidation_older_than set col1=NULL, col2=120 where timeval=36;
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
hypertable_id | modification_time | lowest_modified_value | greatest_modified_value
---------------+-------------------+-----------------------+-------------------------
7 | 40 | 32 | 32
7 | 40 | 36 | 36
(2 rows)
refresh materialized view cagg_iia1;
LOG: new materialization range not found for public.continuous_agg_test_ignore_invalidation_older_than (time column timeval): not enough new data past completion threshold of 42 as of 40
LOG: materializing continuous aggregate public.cagg_iia1: processing invalidations, no new range
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
materialization_id | modification_time | lowest_modified_value | greatest_modified_value
--------------------+-------------------+-----------------------+-------------------------
9 | 40 | 32 | 32
9 | 40 | 36 | 36
(2 rows)
--should see change only for the 36 bucket not 32
select * from cagg_iia1 order by 1;
timed | cnt
@ -543,9 +572,23 @@ select * from cagg_iia3 order by 1;
--test DELETE
DELETE FROM continuous_agg_test_ignore_invalidation_older_than WHERE timeval = 32;
DELETE FROM continuous_agg_test_ignore_invalidation_older_than WHERE timeval = 36;
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
hypertable_id | modification_time | lowest_modified_value | greatest_modified_value
---------------+-------------------+-----------------------+-------------------------
7 | 40 | 32 | 32
7 | 40 | 36 | 36
(2 rows)
refresh materialized view cagg_iia1;
LOG: new materialization range not found for public.continuous_agg_test_ignore_invalidation_older_than (time column timeval): not enough new data past completion threshold of 42 as of 40
LOG: materializing continuous aggregate public.cagg_iia1: processing invalidations, no new range
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
materialization_id | modification_time | lowest_modified_value | greatest_modified_value
--------------------+-------------------+-----------------------+-------------------------
9 | 40 | 32 | 32
9 | 40 | 36 | 36
(2 rows)
--should see change only for the 36 bucket not 32
select * from cagg_iia1 order by 1;
timed | cnt
@ -589,9 +632,20 @@ ALTER VIEW cagg_iia3 set (timescaledb.ignore_invalidation_older_than = 100);
INSERT INTO continuous_agg_test_ignore_invalidation_older_than VALUES
(10, -3, 20);
--sees the change now
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
hypertable_id | modification_time | lowest_modified_value | greatest_modified_value
---------------+-------------------+-----------------------+-------------------------
7 | 40 | 10 | 10
(1 row)
refresh materialized view cagg_iia3;
LOG: new materialization range not found for public.continuous_agg_test_ignore_invalidation_older_than (time column timeval): not enough new data past completion threshold of 42 as of 40
LOG: materializing continuous aggregate public.cagg_iia3: processing invalidations, no new range
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
materialization_id | modification_time | lowest_modified_value | greatest_modified_value
--------------------+-------------------+-----------------------+-------------------------
(0 rows)
select * from cagg_iia3 order by 1;
timed | maxval
-------+--------

View File

@ -144,7 +144,8 @@ refresh materialized view cagg_1;
select * from cagg_1 where timed = 18 ;
--copied over for cagg_2 to process later?
select * from _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
DROP VIEW cagg_1 cascade;
DROP VIEW cagg_2 cascade;
--test the ignore_invalidation_older_than setting
CREATE TABLE continuous_agg_test_ignore_invalidation_older_than(timeval integer, col1 integer, col2 integer);
@ -193,7 +194,9 @@ SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
--move the time up (40), but invalidation logic should apply to old time (12)
INSERT INTO continuous_agg_test_ignore_invalidation_older_than VALUES (32,4,2),(36,5,5),(40,3,9);
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
refresh materialized view cagg_iia1;
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
--should see change to the 12, 10 bucket but not the 4 or 1 bucket
select * from cagg_iia1 order by 1;
--should see change to the 12, 10 and 4 bucket but not the 1 bucket
@ -203,11 +206,13 @@ select * from cagg_iia2 order by 1;
refresh materialized view cagg_iia3;
select * from cagg_iia3 order by 1;
--tes UPDATES
--test UPDATES
UPDATE continuous_agg_test_ignore_invalidation_older_than set col1=NULL, col2=200 where timeval=32;
UPDATE continuous_agg_test_ignore_invalidation_older_than set col1=NULL, col2=120 where timeval=36;
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
refresh materialized view cagg_iia1;
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
--should see change only for the 36 bucket not 32
select * from cagg_iia1 order by 1;
--should see change to the 36 and 32
@ -221,7 +226,9 @@ select * from cagg_iia3 order by 1;
DELETE FROM continuous_agg_test_ignore_invalidation_older_than WHERE timeval = 32;
DELETE FROM continuous_agg_test_ignore_invalidation_older_than WHERE timeval = 36;
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
refresh materialized view cagg_iia1;
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
--should see change only for the 36 bucket not 32
select * from cagg_iia1 order by 1;
--should see change to the 36 and 32
@ -236,5 +243,8 @@ ALTER VIEW cagg_iia3 set (timescaledb.ignore_invalidation_older_than = 100);
INSERT INTO continuous_agg_test_ignore_invalidation_older_than VALUES
(10, -3, 20);
--sees the change now
SELECT * FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
refresh materialized view cagg_iia3;
SELECT * FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log order by 1;
select * from cagg_iia3 order by 1;