diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index e711ff6e7..689d8ebeb 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -62,7 +62,6 @@ static void lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id); static void drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out); static void insert_materialization_invalidation_logs(List *caggs, List *invalidations, Relation rel); -static void invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold); static Datum internal_to_time_value_or_infinite(int64 internal, Oid time_type, bool *is_infinite_out); static InternalTimeRange materialization_invalidation_log_get_range(int32 materialization_id, @@ -259,8 +258,8 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op { LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); - invalidation_threshold_set(cagg_data.raw_hypertable_id, - materialization_invalidation_threshold); + continuous_agg_invalidation_threshold_set(cagg_data.raw_hypertable_id, + materialization_invalidation_threshold); } table_close(materialization_invalidation_log_table_relation, NoLock); @@ -1136,23 +1135,30 @@ continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable_id, return; } +typedef struct InvalidationThresholdData +{ + int64 threshold; + bool was_updated; +} InvalidationThresholdData; + static ScanTupleResult scan_update_invalidation_threshold(TupleInfo *ti, void *data) { - int64 new_threshold = *(int64 *) data; + InvalidationThresholdData *invthresh = data; bool should_free; HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free); Form_continuous_aggs_invalidation_threshold form = (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(tuple); - if (new_threshold > form->watermark) + if (invthresh->threshold > form->watermark) { HeapTuple new_tuple = heap_copytuple(tuple); form = (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(new_tuple); - form->watermark = new_threshold; + form->watermark = invthresh->threshold; ts_catalog_update(ti->scanrel, new_tuple); heap_freetuple(new_tuple); + invthresh->was_updated = true; } else { @@ -1161,7 +1167,7 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) " " INT64_FORMAT, form->hypertable_id, form->watermark, - new_threshold); + invthresh->threshold); } if (should_free) @@ -1174,10 +1180,14 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data) *refresh_lag etc. We update the raw hypertable's invalidation threshold * only if this new value is greater than the existsing one. */ -static void -invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold) +bool +continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold) { - bool updated_threshold; + bool threshold_found; + InvalidationThresholdData data = { + .threshold = invalidation_threshold, + .was_updated = false, + }; ScanKeyData scankey[1]; ScanKeyInit(&scankey[0], @@ -1195,7 +1205,7 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold * cause us to lose the updates. The AccessExclusiveLock ensures no one else can possibly be * reading the threshold. */ - updated_threshold = + threshold_found = ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/, scankey /*=scankey*/, @@ -1203,9 +1213,9 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold scan_update_invalidation_threshold /*=tuple_found*/, AccessExclusiveLock /*=lockmode*/, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/, - &invalidation_threshold /*=data*/); + &data /*=data*/); - if (!updated_threshold) + if (!threshold_found) { Catalog *catalog = ts_catalog_get(); /* NOTE: this function deliberately takes a stronger lock than RowExclusive, see the comment @@ -1225,7 +1235,10 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold ts_catalog_insert_values(rel, desc, values, nulls); table_close(rel, NoLock); + data.was_updated = true; } + + return data.was_updated; } static ScanTupleResult diff --git a/tsl/src/continuous_aggs/materialize.h b/tsl/src/continuous_aggs/materialize.h index 4507f577d..cb8a52b6a 100644 --- a/tsl/src/continuous_aggs/materialize.h +++ b/tsl/src/continuous_aggs/materialize.h @@ -55,5 +55,7 @@ void continuous_agg_update_materialization(SchemaAndName partial_view, InternalTimeRange new_materialization_range, InternalTimeRange invalidation_range, int64 bucket_width); +bool continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, + int64 invalidation_threshold); #endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_MATERIALIZE_H */ diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 77f0ca539..3d4de0199 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -6,6 +6,8 @@ #include #include #include +#include +#include #include #include #include @@ -312,6 +314,8 @@ get_time_value_from_arg(Datum arg, Oid argtype, Oid cagg_timetype) return ts_time_value_to_internal(arg, argtype); } +#define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()" + /* * Refresh a continuous aggregate across the given window. */ @@ -319,6 +323,7 @@ Datum continuous_agg_refresh(PG_FUNCTION_ARGS) { Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); + Catalog *catalog = ts_catalog_get(); ContinuousAgg *cagg; Hypertable *cagg_ht; Dimension *time_dim; @@ -328,7 +333,16 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) .end = PG_INT64_MAX, }; - PreventCommandIfReadOnly("refresh_continuous_aggregate()"); + PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME); + + /* Prevent running refresh if we're in a transaction block since a refresh + * can run two transactions and might take a long time to release locks if + * there's a lot to materialize. Strictly, it is optional to prohibit + * transaction blocks since there will be only one transaction if the + * invalidation threshold needs no update. However, materialization might + * still take a long time and it is probably best for conistency to always + * prevent transaction blocks. */ + PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME); if (!OidIsValid(cagg_relid)) ereport(ERROR, @@ -372,6 +386,19 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) errmsg("invalid refresh window"), errhint("The start of the window must be before the end."))); + LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), + AccessExclusiveLock); + + if (continuous_agg_invalidation_threshold_set(cagg->data.raw_hypertable_id, refresh_window.end)) + { + /* Start a new transaction if the threshold was set. Note that this + * invalidates previous memory allocations (and locks). */ + PopActiveSnapshot(); + CommitTransactionCommand(); + StartTransactionCommand(); + cagg = ts_continuous_agg_find_by_relid(cagg_relid); + } + continuous_agg_invalidation_process(cagg, &refresh_window); continuous_agg_refresh_with_window(cagg, &refresh_window); diff --git a/tsl/test/expected/continuous_aggs_invalidation.out b/tsl/test/expected/continuous_aggs_invalidation.out index 1edd59a23..66e8ee8cc 100644 --- a/tsl/test/expected/continuous_aggs_invalidation.out +++ b/tsl/test/expected/continuous_aggs_invalidation.out @@ -130,10 +130,54 @@ ORDER BY day DESC, device; -----+--------+---------- (0 rows) --- Must refresh with "legacy" functionality to move the invalidation --- threshold, or no invalidations will be generated. -REFRESH MATERIALIZED VIEW cond_10; -REFRESH MATERIALIZED VIEW measure_10; +-- Must refresh to move the invalidation threshold, or no +-- invalidations will be generated. Initially, there is no threshold +-- set: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- Now refresh up to 50, and the threshold should be updated accordingly: +CALL refresh_continuous_aggregate('cond_10', 1, 50); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Refreshing below the threshold does not move it: +CALL refresh_continuous_aggregate('cond_10', 20, 49); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Refreshing measure_10 moves the threshold only for the other hypertable: +CALL refresh_continuous_aggregate('measure_10', 1, 30); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 + 2 | 30 +(2 rows) + +-- Refresh on the second continuous aggregate, cond_20, on the first +-- hypertable moves the same threshold as when refreshing cond_10: +CALL refresh_continuous_aggregate('cond_20', 60, 100); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + -- There should be no invalidations initially: SELECT hypertable_id AS hyper_id, lowest_modified_value AS start, @@ -189,13 +233,21 @@ SELECT hypertable_id AS hyper_id, 1 | 10 | 19 1 | 60 | 70 2 | 20 | 20 - 2 | 30 | 80 -(5 rows) +(4 rows) -- First refresh a window where we don't have any invalidations. This -- allows us to see only the copying of the invalidations to the per -- cagg log without additional processing. CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidation threshold remains at 100: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + -- Invalidations should be moved from the hypertable invalidation log -- to the continuous aggregate log, but only for the hypertable that -- the refreshed aggregate belongs to: @@ -207,8 +259,7 @@ SELECT hypertable_id AS hyper_id, hyper_id | start | end ----------+-------+----- 2 | 20 | 20 - 2 | 30 | 80 -(2 rows) +(1 row) SELECT materialization_id AS cagg_id, lowest_modified_value AS start, @@ -258,8 +309,7 @@ SELECT hypertable_id AS hyper_id, 1 | 30 | 59 1 | 60 | 90 2 | 20 | 20 - 2 | 30 | 80 -(10 rows) +(9 rows) -- But nothing has yet changed in the cagg invalidation log: SELECT materialization_id AS cagg_id, @@ -289,8 +339,7 @@ SELECT hypertable_id AS hyper_id, hyper_id | start | end ----------+-------+----- 2 | 20 | 20 - 2 | 30 | 80 -(2 rows) +(1 row) -- Only the cond_10 cagg should have its entries cut: SELECT materialization_id AS cagg_id, diff --git a/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out b/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out index be38ba830..83d7b160b 100644 --- a/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out +++ b/tsl/test/isolation/expected/continuous_aggs_concurrent_refresh.out @@ -1,194 +1,393 @@ -Parsed test spec with 5 sessions +Parsed test spec with 8 sessions -starting permutation: R1_refresh R1_commit S1_select R2_commit R3_commit R4_commit +starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table step R1_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); - -refresh_continuous_aggregate - - -step R1_commit: - COMMIT; + CALL refresh_continuous_aggregate('cond_10', 35, 62); step S1_select: - SELECT day, avg_temp - FROM daily_temp + SELECT bucket, avg_temp + FROM cond_10 ORDER BY 1; - SELECT * FROM cagg_bucket_count('daily_temp'); + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; -day avg_temp +bucket avg_temp -Thu Apr 30 17:00:00 2020 PDT18.4411764705882 -Fri May 01 17:00:00 2020 PDT18.8541666666667 +30 18.3 +40 15.1 +50 26.9 +60 18.9 cagg_bucket_count -2 -step R2_commit: - COMMIT; +4 +hypertable threshold -step R3_commit: - COMMIT; +conditions 62 +step R3_refresh: + CALL refresh_continuous_aggregate('cond_10', 71, 97); -step R4_commit: - COMMIT; +step S1_select: + SELECT bucket, avg_temp + FROM cond_10 + ORDER BY 1; + + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; + +bucket avg_temp + +30 18.3 +40 15.1 +50 26.9 +60 18.9 +70 24.6 +80 23.6 +90 21.3 +cagg_bucket_count + +7 +hypertable threshold + +conditions 97 +step L2_read_unlock_threshold_table: + ROLLBACK; + +step L3_unlock_cagg_table: + ROLLBACK; + +step L1_unlock_threshold_table: + ROLLBACK; -starting permutation: R1_refresh R2_refresh R1_commit R2_commit S1_select R3_commit R4_commit -step R1_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); +starting permutation: L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table +step L2_read_lock_threshold_table: + LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold + IN ACCESS SHARE MODE; -refresh_continuous_aggregate - - -step R2_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); +step R3_refresh: + CALL refresh_continuous_aggregate('cond_10', 71, 97); -step R1_commit: - COMMIT; - -step R2_refresh: <... completed> -refresh_continuous_aggregate - - -step R2_commit: - COMMIT; +step L2_read_unlock_threshold_table: + ROLLBACK; +step R3_refresh: <... completed> step S1_select: - SELECT day, avg_temp - FROM daily_temp + SELECT bucket, avg_temp + FROM cond_10 ORDER BY 1; - SELECT * FROM cagg_bucket_count('daily_temp'); + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; -day avg_temp +bucket avg_temp -Thu Apr 30 17:00:00 2020 PDT18.4411764705882 -Fri May 01 17:00:00 2020 PDT18.8541666666667 +70 24.6 +80 23.6 +90 21.3 cagg_bucket_count -2 -step R3_commit: - COMMIT; +3 +hypertable threshold -step R4_commit: - COMMIT; +conditions 97 +step L3_unlock_cagg_table: + ROLLBACK; + +step L1_unlock_threshold_table: + ROLLBACK; -starting permutation: R1_refresh R2_refresh R2_commit R1_commit S1_select R3_commit R4_commit +starting permutation: R1_refresh L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table step R1_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); + CALL refresh_continuous_aggregate('cond_10', 35, 62); -refresh_continuous_aggregate +step L2_read_lock_threshold_table: + LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold + IN ACCESS SHARE MODE; - -step R2_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); +step R3_refresh: + CALL refresh_continuous_aggregate('cond_10', 71, 97); -step R2_refresh: <... completed> -ERROR: canceling statement due to lock timeout -step R2_commit: - COMMIT; - -step R1_commit: - COMMIT; +step L2_read_unlock_threshold_table: + ROLLBACK; +step R3_refresh: <... completed> step S1_select: - SELECT day, avg_temp - FROM daily_temp + SELECT bucket, avg_temp + FROM cond_10 ORDER BY 1; - SELECT * FROM cagg_bucket_count('daily_temp'); + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; -day avg_temp +bucket avg_temp -Thu Apr 30 17:00:00 2020 PDT18.4411764705882 -Fri May 01 17:00:00 2020 PDT18.8541666666667 +30 18.3 +40 15.1 +50 26.9 +60 18.9 +70 24.6 +80 23.6 +90 21.3 cagg_bucket_count -2 -step R3_commit: - COMMIT; +7 +hypertable threshold -step R4_commit: - COMMIT; +conditions 97 +step L3_unlock_cagg_table: + ROLLBACK; + +step L1_unlock_threshold_table: + ROLLBACK; -starting permutation: R1_refresh R3_refresh R3_commit R1_commit S1_select R2_commit R4_commit +starting permutation: R3_refresh L2_read_lock_threshold_table R1_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table +step R3_refresh: + CALL refresh_continuous_aggregate('cond_10', 71, 97); + +step L2_read_lock_threshold_table: + LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold + IN ACCESS SHARE MODE; + step R1_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); + CALL refresh_continuous_aggregate('cond_10', 35, 62); + +step L2_read_unlock_threshold_table: + ROLLBACK; -refresh_continuous_aggregate +step R1_refresh: <... completed> +step S1_select: + SELECT bucket, avg_temp + FROM cond_10 + ORDER BY 1; + + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; + +bucket avg_temp + +30 18.3 +40 15.1 +50 26.9 +60 18.9 +70 24.6 +80 23.6 +90 21.3 +cagg_bucket_count + +7 +hypertable threshold + +conditions 97 +step L3_unlock_cagg_table: + ROLLBACK; + +step L1_unlock_threshold_table: + ROLLBACK; + + +starting permutation: L3_lock_cagg_table R1_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table +step L3_lock_cagg_table: + SELECT lock_cagg('cond_10'); + +lock_cagg + + +step R1_refresh: + CALL refresh_continuous_aggregate('cond_10', 35, 62); + +step L3_unlock_cagg_table: + ROLLBACK; + +step R1_refresh: <... completed> +step S1_select: + SELECT bucket, avg_temp + FROM cond_10 + ORDER BY 1; + + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; + +bucket avg_temp + +30 18.3 +40 15.1 +50 26.9 +60 18.9 +cagg_bucket_count + +4 +hypertable threshold + +conditions 62 +step L1_unlock_threshold_table: + ROLLBACK; + +step L2_read_unlock_threshold_table: + ROLLBACK; + + +starting permutation: L3_lock_cagg_table R1_refresh R2_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table +step L3_lock_cagg_table: + SELECT lock_cagg('cond_10'); + +lock_cagg + + +step R1_refresh: + CALL refresh_continuous_aggregate('cond_10', 35, 62); + +step R2_refresh: + CALL refresh_continuous_aggregate('cond_10', 35, 62); + +step L3_unlock_cagg_table: + ROLLBACK; + +step R1_refresh: <... completed> +step R2_refresh: <... completed> +step S1_select: + SELECT bucket, avg_temp + FROM cond_10 + ORDER BY 1; + + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; + +bucket avg_temp + +30 18.3 +40 15.1 +50 26.9 +60 18.9 +cagg_bucket_count + +4 +hypertable threshold + +conditions 62 +step L1_unlock_threshold_table: + ROLLBACK; + +step L2_read_unlock_threshold_table: + ROLLBACK; + + +starting permutation: L3_lock_cagg_table R1_refresh R3_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table +step L3_lock_cagg_table: + SELECT lock_cagg('cond_10'); + +lock_cagg + + +step R1_refresh: + CALL refresh_continuous_aggregate('cond_10', 35, 62); + +step R3_refresh: + CALL refresh_continuous_aggregate('cond_10', 71, 97); + +step L3_unlock_cagg_table: + ROLLBACK; + +step R1_refresh: <... completed> +step R3_refresh: <... completed> +step S1_select: + SELECT bucket, avg_temp + FROM cond_10 + ORDER BY 1; + + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; + +bucket avg_temp + +30 18.3 +40 15.1 +50 26.9 +60 18.9 +70 24.6 +80 23.6 +90 21.3 +cagg_bucket_count + +7 +hypertable threshold + +conditions 97 +step L1_unlock_threshold_table: + ROLLBACK; + +step L2_read_unlock_threshold_table: + ROLLBACK; + + +starting permutation: L3_lock_cagg_table R3_refresh R4_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table +step L3_lock_cagg_table: + SELECT lock_cagg('cond_10'); + +lock_cagg step R3_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-08', '2020-05-10'); + CALL refresh_continuous_aggregate('cond_10', 71, 97); -step R3_refresh: <... completed> -ERROR: canceling statement due to lock timeout -step R3_commit: - COMMIT; - -step R1_commit: - COMMIT; - -step S1_select: - SELECT day, avg_temp - FROM daily_temp - ORDER BY 1; - - SELECT * FROM cagg_bucket_count('daily_temp'); - -day avg_temp - -Thu Apr 30 17:00:00 2020 PDT18.4411764705882 -Fri May 01 17:00:00 2020 PDT18.8541666666667 -cagg_bucket_count - -2 -step R2_commit: - COMMIT; - -step R4_commit: - COMMIT; - - -starting permutation: R1_refresh R4_refresh R4_commit R1_commit S1_select R2_commit R3_commit -step R1_refresh: - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); - -refresh_continuous_aggregate - - step R4_refresh: - SELECT refresh_continuous_aggregate('weekly_temp', '2020-05-01', '2020-05-10'); + CALL refresh_continuous_aggregate('cond_20', 39, 84); -refresh_continuous_aggregate - - -step R4_commit: - COMMIT; - -step R1_commit: - COMMIT; +step L3_unlock_cagg_table: + ROLLBACK; +step R3_refresh: <... completed> step S1_select: - SELECT day, avg_temp - FROM daily_temp + SELECT bucket, avg_temp + FROM cond_10 ORDER BY 1; - SELECT * FROM cagg_bucket_count('daily_temp'); + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; -day avg_temp +bucket avg_temp -Thu Apr 30 17:00:00 2020 PDT18.4411764705882 -Fri May 01 17:00:00 2020 PDT18.8541666666667 +70 24.6 +80 23.6 +90 21.3 cagg_bucket_count -2 -step R2_commit: - COMMIT; +3 +hypertable threshold -step R3_commit: - COMMIT; +conditions 97 +step L1_unlock_threshold_table: + ROLLBACK; + +step L2_read_unlock_threshold_table: + ROLLBACK; diff --git a/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec b/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec index f251e92ed..1d861153f 100644 --- a/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec +++ b/tsl/test/isolation/specs/continuous_aggs_concurrent_refresh.spec @@ -7,35 +7,44 @@ # # We define a function 'cagg_bucket_count' to get the number of # buckets in a continuous aggregate. We use it to verify that there -# aren't any duplicate buckets inserted after concurrent -# refreshes. Duplicate buckets are possible since there is no unique -# constraint on the GROUP BY keys in the materialized hypertable. +# aren't any duplicate buckets/rows inserted into the materialization +# hypertable after concurrent refreshes. Duplicate buckets are +# possible since there is no unique constraint on the GROUP BY keys in +# the materialized hypertable. # setup { SELECT _timescaledb_internal.stop_background_workers(); - CREATE TABLE conditions(time timestamptz, temp float); - SELECT create_hypertable('conditions', 'time'); + CREATE TABLE conditions(time int, temp float); + SELECT create_hypertable('conditions', 'time', chunk_time_interval => 20); INSERT INTO conditions - SELECT t, abs(timestamp_hash(t::timestamp))%40 - FROM generate_series('2020-05-01', '2020-05-10', '10 minutes'::interval) t; - CREATE VIEW daily_temp + SELECT t, abs(timestamp_hash(to_timestamp(t)::timestamp))%40 + FROM generate_series(1, 100, 1) t; + CREATE OR REPLACE FUNCTION cond_now() + RETURNS int LANGUAGE SQL STABLE AS + $$ + SELECT coalesce(max(time), 0) + FROM conditions + $$; + SELECT set_integer_now_func('conditions', 'cond_now'); + CREATE VIEW cond_10 WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS - SELECT time_bucket('1 day', time) AS day, avg(temp) AS avg_temp + SELECT time_bucket(10, time) AS bucket, avg(temp) AS avg_temp FROM conditions GROUP BY 1; - CREATE VIEW weekly_temp + CREATE VIEW cond_20 WITH (timescaledb.continuous, timescaledb.materialized_only=true) AS - SELECT time_bucket('1 week', time) AS day, avg(temp) AS avg_temp + SELECT time_bucket(20, time) AS bucket, avg(temp) AS avg_temp FROM conditions GROUP BY 1; - CREATE OR REPLACE FUNCTION cagg_bucket_count(cagg regclass) RETURNS int - AS $$ + CREATE OR REPLACE FUNCTION cagg_bucket_count(cagg regclass) + RETURNS int AS + $$ DECLARE cagg_schema name; cagg_name name; @@ -65,111 +74,202 @@ setup RETURN result; END $$ LANGUAGE plpgsql; + CREATE OR REPLACE FUNCTION lock_cagg(cagg name) RETURNS void AS $$ + DECLARE + mattable text; + BEGIN + SELECT format('%I.%I', user_view_schema, user_view_name) + FROM _timescaledb_catalog.continuous_agg + INTO mattable; + EXECUTE format('LOCK table %s IN EXCLUSIVE MODE', mattable); + END; $$ LANGUAGE plpgsql; } teardown { DROP TABLE conditions CASCADE; } -# Session to refresh the daily_temp continuous aggregate +# Session to refresh the cond_10 continuous aggregate session "R1" setup { - BEGIN; - SET LOCAL lock_timeout = '500ms'; - SET LOCAL deadlock_timeout = '500ms'; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; } step "R1_refresh" { - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); -} -step "R1_commit" -{ - COMMIT; + CALL refresh_continuous_aggregate('cond_10', 35, 62); } + # Refresh that overlaps with R1 session "R2" setup { - BEGIN; - SET LOCAL lock_timeout = '500ms'; - SET LOCAL deadlock_timeout = '500ms'; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; } step "R2_refresh" { - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02'); -} -step "R2_commit" -{ - COMMIT; + CALL refresh_continuous_aggregate('cond_10', 35, 62); } -# Refresh on same aggregate (daily_temp) that doesn't overlap with R1 and R2 + +# Refresh on same aggregate (cond_10) that doesn't overlap with R1 and R2 session "R3" setup { - BEGIN; - SET LOCAL lock_timeout = '500ms'; - SET LOCAL deadlock_timeout = '500ms'; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; } step "R3_refresh" { - SELECT refresh_continuous_aggregate('daily_temp', '2020-05-08', '2020-05-10'); -} -step "R3_commit" -{ - COMMIT; + CALL refresh_continuous_aggregate('cond_10', 71, 97); } -# Overlapping refresh on another continuous aggregate (weekly_temp) +# Overlapping refresh on another continuous aggregate (cond_20) session "R4" setup { - BEGIN; - SET LOCAL lock_timeout = '500ms'; - SET LOCAL deadlock_timeout = '500ms'; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; } step "R4_refresh" { - SELECT refresh_continuous_aggregate('weekly_temp', '2020-05-01', '2020-05-10'); -} -step "R4_commit" -{ - COMMIT; + CALL refresh_continuous_aggregate('cond_20', 39, 84); } -# Session to query +# Define a number of lock sessions to simulate concurrent refreshes +# by selectively grabbing the locks we use to handle concurrency. + +# The "L1" session exclusively locks the invalidation threshold +# table. This simulates an ongoing update of the invalidation +# threshold, which has not yet finished. +session "L1" +setup +{ + BEGIN; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; +} +step "L1_lock_threshold_table" +{ + LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold + IN ACCESS EXCLUSIVE MODE; +} +step "L1_unlock_threshold_table" +{ + ROLLBACK; +} + +# The "L2" session takes an access share lock on the invalidation +# threshold table. This simulates a reader, which has not yet finished +# (e.g., and insert into the hypertable, or a refresh that has not yet +# grabbed the exclusive lock). +session "L2" +setup +{ + BEGIN; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; +} +step "L2_read_lock_threshold_table" +{ + LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold + IN ACCESS SHARE MODE; +} +step "L2_read_unlock_threshold_table" +{ + ROLLBACK; +} + +# The "L3" session locks the cagg table. This simulates an ongoing +# refresh that has not yet completed and released the lock on the cagg +# materialization table. +# +session "L3" +setup +{ + BEGIN; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; +} +step "L3_lock_cagg_table" +{ + SELECT lock_cagg('cond_10'); +} +step "L3_unlock_cagg_table" +{ + ROLLBACK; +} + + +# Session to view the contents of a cagg after materialization. It +# also prints the bucket count (number of rows in the materialization +# hypertable) and the invalidation threshold. The bucket count should +# match the number of rows in the query if there are no duplicate +# buckets/rows. session "S1" setup { - SET LOCAL lock_timeout = '500ms'; - SET LOCAL deadlock_timeout = '100ms'; + SET SESSION lock_timeout = '500ms'; + SET SESSION deadlock_timeout = '500ms'; } step "S1_select" { - SELECT day, avg_temp - FROM daily_temp + SELECT bucket, avg_temp + FROM cond_10 ORDER BY 1; - SELECT * FROM cagg_bucket_count('daily_temp'); + SELECT * FROM cagg_bucket_count('cond_10'); + SELECT h.table_name AS hypertable, it.watermark AS threshold + FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it, + _timescaledb_catalog.hypertable h + WHERE it.hypertable_id = h.id; } -# Run single transaction refresh to get some reference output. -# The result of a query on the aggregate should always look like this example. -permutation "R1_refresh" "R1_commit" "S1_select" "R2_commit" "R3_commit" "R4_commit" +#################################################################### +# +# Tests for concurrent updates to the invalidation threshold (first +# transaction of a refresh). +# +#################################################################### -# Interleave two refreshes that are overlapping. Since we serialize -# refreshes, R2 should block until R1 commits. -permutation "R1_refresh" "R2_refresh" "R1_commit" "R2_commit" "S1_select" "R3_commit" "R4_commit" +# Run single transaction refresh to get some reference output. The +# result of a query on the aggregate should always look like this +# example (when refreshed with the same window). +permutation "R1_refresh" "S1_select" "R3_refresh" "S1_select" "L2_read_unlock_threshold_table" "L3_unlock_cagg_table" "L1_unlock_threshold_table" -# R2 starts after R1 but commits before. This should not work (lock timeout). -permutation "R1_refresh" "R2_refresh" "R2_commit" "R1_commit" "S1_select" "R3_commit" "R4_commit" +# A threshold reader (insert) should block a refresh if the threshold +# does not exist yet (insert of new threshold) +permutation "L2_read_lock_threshold_table" "R3_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table" -# R1 and R3 don't have overlapping refresh windows, but we serialize -# anyway, so not yet supported. -permutation "R1_refresh" "R3_refresh" "R3_commit" "R1_commit" "S1_select" "R2_commit" "R4_commit" +# A threshold reader (insert) should block a refresh if the threshold +# needs an update +permutation "R1_refresh" "L2_read_lock_threshold_table" "R3_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table" -# Concurrent refreshing across two different aggregates on the same -# hypertable should be OK: -permutation "R1_refresh" "R4_refresh" "R4_commit" "R1_commit" "S1_select" "R2_commit" "R3_commit" +# A threshold reader (insert) blocks a refresh even if the threshold +# doesn't need an update (could be improved) +permutation "R3_refresh" "L2_read_lock_threshold_table" "R1_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table" + +################################################################## +# +# Tests for concurrent refreshes of continuous aggregates (second +# transaction of a refresh). +# +################################################################## + +# Interleave two refreshes that are overlapping (one simulated). Since +# we serialize refreshes, R1 should block until the lock is released +permutation "L3_lock_cagg_table" "R1_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table" + +# R1 and R2 queued to refresh, both should serialize +permutation "L3_lock_cagg_table" "R1_refresh" "R2_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table" + +# R1 and R3 don't have overlapping refresh windows, but should serialize +# anyway. This could potentially be optimized in the future. +permutation "L3_lock_cagg_table" "R1_refresh" "R3_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table" + +# Concurrent refreshing across two different aggregates on same +# hypertable does not block +permutation "L3_lock_cagg_table" "R3_refresh" "R4_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table" diff --git a/tsl/test/sql/continuous_aggs_invalidation.sql b/tsl/test/sql/continuous_aggs_invalidation.sql index dfa71c010..b47a3eb13 100644 --- a/tsl/test/sql/continuous_aggs_invalidation.sql +++ b/tsl/test/sql/continuous_aggs_invalidation.sql @@ -86,10 +86,32 @@ ORDER BY day DESC, device; SELECT * FROM measure_10 ORDER BY day DESC, device; --- Must refresh with "legacy" functionality to move the invalidation --- threshold, or no invalidations will be generated. -REFRESH MATERIALIZED VIEW cond_10; -REFRESH MATERIALIZED VIEW measure_10; +-- Must refresh to move the invalidation threshold, or no +-- invalidations will be generated. Initially, there is no threshold +-- set: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + +-- Now refresh up to 50, and the threshold should be updated accordingly: +CALL refresh_continuous_aggregate('cond_10', 1, 50); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + +-- Refreshing below the threshold does not move it: +CALL refresh_continuous_aggregate('cond_10', 20, 49); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + +-- Refreshing measure_10 moves the threshold only for the other hypertable: +CALL refresh_continuous_aggregate('measure_10', 1, 30); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + +-- Refresh on the second continuous aggregate, cond_20, on the first +-- hypertable moves the same threshold as when refreshing cond_10: +CALL refresh_continuous_aggregate('cond_20', 60, 100); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; -- There should be no invalidations initially: SELECT hypertable_id AS hyper_id, @@ -136,6 +158,9 @@ SELECT hypertable_id AS hyper_id, -- allows us to see only the copying of the invalidations to the per -- cagg log without additional processing. CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidation threshold remains at 100: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; -- Invalidations should be moved from the hypertable invalidation log -- to the continuous aggregate log, but only for the hypertable that