diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f4ab5a39..3d5599e9f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,11 +15,13 @@ accidentally triggering the load of a previous DB version.** * #4486 Adding boolean column with default value doesn't work on compressed table * #4555 Handle properly default privileges on Continuous Aggregates * #4575 Fix use of `get_partition_hash` and `get_partition_for_key` inside an IMMUTABLE function +* #4416 Handle TRUNCATE TABLE on chunks **Thanks** @janko for reporting @AlmiS for reporting error on `get_partition_hash` executed inside an IMMUTABLE function @michaelkitson for reporting permission errors using default privileges on Continuous Aggregates +@jayadevanm for reporting error of TRUNCATE TABLE on compressed chunk ## 2.7.2 (2022-07-26) diff --git a/src/process_utility.c b/src/process_utility.c index ace88ff38..b3489c03b 100644 --- a/src/process_utility.c +++ b/src/process_utility.c @@ -1001,7 +1001,7 @@ process_truncate(ProcessUtilityArgs *args) List *hypertables = NIL; List *relations = NIL; bool list_changed = false; - MemoryContext parsetreectx = GetMemoryChunkContext(args->parsetree); + MemoryContext oldctx, parsetreectx = GetMemoryChunkContext(args->parsetree); /* For all hypertables, we drop the now empty chunks. We also propagate the * TRUNCATE call to the compressed version of the hypertable, if it exists. @@ -1037,7 +1037,6 @@ process_truncate(ProcessUtilityArgs *args) if (cagg) { Hypertable *mat_ht, *raw_ht; - MemoryContext oldctx; if (!relation_should_recurse(rv)) ereport(ERROR, @@ -1071,13 +1070,14 @@ process_truncate(ProcessUtilityArgs *args) break; } case RELKIND_RELATION: + /* TRUNCATE for foreign tables not implemented yet. This will raise an error. */ + case RELKIND_FOREIGN_TABLE: { Hypertable *ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK); + Chunk *chunk; - if (!ht) - list_append = true; - else + if (ht) { ContinuousAggHypertableStatus agg_status; @@ -1114,6 +1114,38 @@ process_truncate(ProcessUtilityArgs *args) */ list_changed = true; } + else if ((chunk = ts_chunk_get_by_relid(relid, false)) != NULL) + { /* this is a chunk */ + ht = ts_hypertable_cache_get_entry(hcache, + chunk->hypertable_relid, + CACHE_FLAG_NONE); + + Assert(ht != NULL); + + /* If the hypertable has continuous aggregates, then invalidate + * the truncated region. */ + if (ts_continuous_agg_hypertable_status(ht->fd.id) == HypertableIsRawTable) + ts_continuous_agg_invalidate_chunk(ht, chunk); + /* Truncate the compressed chunk too. */ + if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID) + { + Chunk *compressed_chunk = + ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, false); + if (compressed_chunk != NULL) + { + /* Create list item into the same context of the list. */ + oldctx = MemoryContextSwitchTo(parsetreectx); + rv = makeRangeVar(NameStr(compressed_chunk->fd.schema_name), + NameStr(compressed_chunk->fd.table_name), + -1); + MemoryContextSwitchTo(oldctx); + list_changed = true; + } + } + list_append = true; + } + else + list_append = true; break; } } @@ -1234,14 +1266,7 @@ process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt) /* If the hypertable has continuous aggregates, then invalidate * the dropped region. */ if (ts_continuous_agg_hypertable_status(ht->fd.id) == HypertableIsRawTable) - { - int64 start = ts_chunk_primary_dimension_start(chunk); - int64 end = ts_chunk_primary_dimension_end(chunk); - - Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id == - chunk->cube->slices[0]->fd.dimension_id); - ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end); - } + ts_continuous_agg_invalidate_chunk(ht, chunk); } } diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c index 9a9a476ce..7c079d88c 100644 --- a/src/ts_catalog/continuous_agg.c +++ b/src/ts_catalog/continuous_agg.c @@ -31,6 +31,7 @@ #include "bgw/job.h" #include "ts_catalog/continuous_agg.h" #include "cross_module_fn.h" +#include "hypercube.h" #include "hypertable.h" #include "hypertable_cache.h" #include "scan_iterator.h" @@ -1351,6 +1352,17 @@ ts_continuous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid) return par_dim; } +TSDLLEXPORT void +ts_continuous_agg_invalidate_chunk(Hypertable *ht, Chunk *chunk) +{ + int64 start = ts_chunk_primary_dimension_start(chunk); + int64 end = ts_chunk_primary_dimension_end(chunk); + + Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id == + chunk->cube->slices[0]->fd.dimension_id); + ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end); +} + typedef struct Watermark { int32 hyper_id; diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h index 327e470f3..4ac047968 100644 --- a/src/ts_catalog/continuous_agg.h +++ b/src/ts_catalog/continuous_agg.h @@ -174,6 +174,8 @@ extern TSDLLEXPORT const Dimension * ts_continuous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid); extern ContinuousAgg *ts_continuous_agg_find_userview_name(const char *schema, const char *name); +extern TSDLLEXPORT void ts_continuous_agg_invalidate_chunk(Hypertable *ht, Chunk *chunk); + extern TSDLLEXPORT bool ts_continuous_agg_bucket_width_variable(const ContinuousAgg *agg); extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAgg *agg); diff --git a/test/expected/chunk_utils.out b/test/expected/chunk_utils.out index 2a4928d2d..0ed8625ef 100644 --- a/test/expected/chunk_utils.out +++ b/test/expected/chunk_utils.out @@ -283,6 +283,20 @@ SELECT * FROM _timescaledb_catalog.dimension_slice ORDER BY id; 24 | 3 | 6 | 7 (24 rows) +-- Test that truncating chunks works +SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk; + count +------- + 1 +(1 row) + +TRUNCATE TABLE _timescaledb_internal._hyper_2_7_chunk; +SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk; + count +------- + 0 +(1 row) + -- Drop one chunk "manually" and verify that dimension slices and -- constraints are cleaned up. Each chunk has two constraints and two -- dimension slices. Both constraints should be deleted, but only one diff --git a/test/sql/chunk_utils.sql b/test/sql/chunk_utils.sql index 598544f9e..465dc5552 100644 --- a/test/sql/chunk_utils.sql +++ b/test/sql/chunk_utils.sql @@ -117,6 +117,11 @@ FULL OUTER JOIN _timescaledb_catalog.dimension_slice ds ON (ds.id = cc.dimension ORDER BY c.id; SELECT * FROM _timescaledb_catalog.dimension_slice ORDER BY id; +-- Test that truncating chunks works +SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk; +TRUNCATE TABLE _timescaledb_internal._hyper_2_7_chunk; +SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk; + -- Drop one chunk "manually" and verify that dimension slices and -- constraints are cleaned up. Each chunk has two constraints and two -- dimension slices. Both constraints should be deleted, but only one diff --git a/tsl/test/expected/cagg_invalidation.out b/tsl/test/expected/cagg_invalidation.out index a5f0e365e..f64c6406d 100644 --- a/tsl/test/expected/cagg_invalidation.out +++ b/tsl/test/expected/cagg_invalidation.out @@ -577,6 +577,47 @@ SELECT * FROM hyper_invals; 2 | 20 | 20 (1 row) +-- Pick the first chunk of conditions to TRUNCATE +SELECT show_chunks AS chunk_to_truncate +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before truncating one of the chunks +SELECT * FROM :chunk_to_truncate +ORDER BY 1; + time | device | temp +------+--------+------ + 1 | 4 | 23.7 + 1 | 0 | 16 + 2 | 2 | 23.5 + 2 | 1 | 25 + 3 | 2 | 23.5 + 3 | 0 | 20 + 4 | 2 | 10 + 5 | 2 | 26 + 6 | 1 | 13 + 7 | 3 | 35 + 8 | 1 | 37 + 9 | 3 | 7 +(12 rows) + +-- Truncate one chunk +\if :IS_DISTRIBUTED +-- There is no TRUNCATE implementation for FOREIGN tables yet +\set ON_ERROR_STOP 0 +\endif +TRUNCATE TABLE :chunk_to_truncate; +\if :IS_DISTRIBUTED +\set ON_ERROR_STOP 1 +\endif +-- Should see new invalidation entries for conditions for the non-distributed case +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 0 | 10 + 2 | 20 | 20 +(2 rows) + -- TRUNCATE the hypertable to invalidate all its continuous aggregates TRUNCATE conditions; -- Now empty @@ -590,8 +631,9 @@ SELECT * FROM hyper_invals; hyper_id | start | end ----------+----------------------+--------------------- 1 | -9223372036854775808 | 9223372036854775807 + 1 | 0 | 10 2 | 20 | 20 -(2 rows) +(3 rows) -- Aggregates still hold data SELECT * FROM cond_10 @@ -673,7 +715,7 @@ ORDER BY 1,2; -- TRUNCATE ONLY \set ON_ERROR_STOP 0 TRUNCATE ONLY cond_20; -psql:include/cagg_invalidation_common.sql:385: ERROR: cannot truncate only a continuous aggregate +psql:include/cagg_invalidation_common.sql:408: ERROR: cannot truncate only a continuous aggregate \set ON_ERROR_STOP 1 TRUNCATE cond_20; -- Should now be empty @@ -746,7 +788,7 @@ WHERE user_view_name = 'cond_1' \gset \else \set ON_ERROR_STOP 0 SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0); -psql:include/cagg_invalidation_common.sql:433: ERROR: cannot invalidate cagg, end time should be greater than start time +psql:include/cagg_invalidation_common.sql:456: ERROR: cannot invalidate cagg, end time should be greater than start time \set ON_ERROR_STOP 1 \endif -- Test invalidations with bucket size 1 @@ -923,7 +965,7 @@ CREATE table threshold_test (time int, value int); SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2); \else SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); -psql:include/cagg_invalidation_common.sql:544: NOTICE: adding not-null constraint to column "time" +psql:include/cagg_invalidation_common.sql:567: NOTICE: adding not-null constraint to column "time" create_hypertable ----------------------------- (7,public,threshold_test,t) @@ -959,14 +1001,14 @@ ORDER BY 1,2; \else \set ON_ERROR_STOP 0 SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0); -psql:include/cagg_invalidation_common.sql:569: ERROR: cannot invalidate hypertable, end time should be greater than start time +psql:include/cagg_invalidation_common.sql:592: ERROR: cannot invalidate hypertable, end time should be greater than start time \set ON_ERROR_STOP 1 \endif -- Test that threshold is initilized to min value when there's no data -- and we specify an infinite end. Note that the min value may differ -- depending on time type. CALL refresh_continuous_aggregate('thresh_2', 0, NULL); -psql:include/cagg_invalidation_common.sql:576: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:599: NOTICE: continuous aggregate "thresh_2" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; @@ -992,13 +1034,13 @@ ORDER BY 1,2; -- Refresh where both the start and end of the window is above the -- max data value CALL refresh_continuous_aggregate('thresh_2', 14, NULL); -psql:include/cagg_invalidation_common.sql:596: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:619: NOTICE: continuous aggregate "thresh_2" is already up-to-date SELECT watermark AS thresh_hyper_id_watermark FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id \gset -- Refresh where we start from the current watermark to infinity CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); -psql:include/cagg_invalidation_common.sql:603: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:626: NOTICE: continuous aggregate "thresh_2" is already up-to-date -- Now refresh with max end of the window to test that the -- invalidation threshold is capped at the last bucket of data CALL refresh_continuous_aggregate('thresh_2', 0, NULL); @@ -1200,7 +1242,7 @@ INSERT INTO conditions VALUES(3, 1, 1.0); INSERT INTO conditions VALUES(4, 1, 1.0); INSERT INTO conditions VALUES(6, 1, 1.0); CALL refresh_continuous_aggregate('cond_1', 10, NULL); -psql:include/cagg_invalidation_common.sql:725: NOTICE: continuous aggregate "cond_1" is already up-to-date +psql:include/cagg_invalidation_common.sql:748: NOTICE: continuous aggregate "cond_1" is already up-to-date SELECT * FROM cagg_invals WHERE cagg_id = :cond_1_id; cagg_id | start | end @@ -1226,7 +1268,7 @@ INSERT INTO conditions VALUES (40, 1, 1.0); -- Refresh to process invalidations, but outside the range of -- invalidations we inserted so that we don't clear them. CALL refresh_continuous_aggregate('cond_10', 50, 60); -psql:include/cagg_invalidation_common.sql:746: NOTICE: continuous aggregate "cond_10" is already up-to-date +psql:include/cagg_invalidation_common.sql:769: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT mat_hypertable_id AS cond_10_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'cond_10' \gset @@ -1266,16 +1308,16 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200); SET timescaledb.materializations_per_refresh_window='foo'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:785: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:808: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "foo". SET timescaledb.materializations_per_refresh_window='2bar'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:788: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:811: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "2bar". SET timescaledb.materializations_per_refresh_window='-'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:792: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:815: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "-". \set VERBOSITY terse diff --git a/tsl/test/expected/cagg_invalidation_dist_ht.out b/tsl/test/expected/cagg_invalidation_dist_ht-12.out similarity index 96% rename from tsl/test/expected/cagg_invalidation_dist_ht.out rename to tsl/test/expected/cagg_invalidation_dist_ht-12.out index 041739982..e15700ddf 100644 --- a/tsl/test/expected/cagg_invalidation_dist_ht.out +++ b/tsl/test/expected/cagg_invalidation_dist_ht-12.out @@ -635,6 +635,48 @@ SELECT * FROM hyper_invals; 2 | 30 | 80 (2 rows) +-- Pick the first chunk of conditions to TRUNCATE +SELECT show_chunks AS chunk_to_truncate +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before truncating one of the chunks +SELECT * FROM :chunk_to_truncate +ORDER BY 1; + time | device | temp +------+--------+------ + 1 | 4 | 23.7 + 1 | 0 | 16 + 2 | 2 | 23.5 + 2 | 1 | 25 + 3 | 2 | 23.5 + 3 | 0 | 20 + 4 | 2 | 10 + 5 | 2 | 26 + 6 | 1 | 13 + 7 | 3 | 35 + 8 | 1 | 37 + 9 | 3 | 7 +(12 rows) + +-- Truncate one chunk +\if :IS_DISTRIBUTED +-- There is no TRUNCATE implementation for FOREIGN tables yet +\set ON_ERROR_STOP 0 +\endif +TRUNCATE TABLE :chunk_to_truncate; +psql:include/cagg_invalidation_common.sql:352: ERROR: "_dist_hyper_1_1_chunk" is not a table +\if :IS_DISTRIBUTED +\set ON_ERROR_STOP 1 +\endif +-- Should see new invalidation entries for conditions for the non-distributed case +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + -- TRUNCATE the hypertable to invalidate all its continuous aggregates TRUNCATE conditions; -- Now empty @@ -732,7 +774,7 @@ ORDER BY 1,2; -- TRUNCATE ONLY \set ON_ERROR_STOP 0 TRUNCATE ONLY cond_20; -psql:include/cagg_invalidation_common.sql:385: ERROR: cannot truncate only a continuous aggregate +psql:include/cagg_invalidation_common.sql:408: ERROR: cannot truncate only a continuous aggregate \set ON_ERROR_STOP 1 TRUNCATE cond_20; -- Should now be empty @@ -980,7 +1022,7 @@ ORDER BY 1,2; CREATE table threshold_test (time int, value int); \if :IS_DISTRIBUTED SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2); -psql:include/cagg_invalidation_common.sql:542: NOTICE: adding not-null constraint to column "time" +psql:include/cagg_invalidation_common.sql:565: NOTICE: adding not-null constraint to column "time" create_distributed_hypertable ------------------------------- (7,public,threshold_test,t) @@ -1024,7 +1066,7 @@ SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, -- and we specify an infinite end. Note that the min value may differ -- depending on time type. CALL refresh_continuous_aggregate('thresh_2', 0, NULL); -psql:include/cagg_invalidation_common.sql:576: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:599: NOTICE: continuous aggregate "thresh_2" is already up-to-date SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id ORDER BY 1,2; @@ -1050,13 +1092,13 @@ ORDER BY 1,2; -- Refresh where both the start and end of the window is above the -- max data value CALL refresh_continuous_aggregate('thresh_2', 14, NULL); -psql:include/cagg_invalidation_common.sql:596: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:619: NOTICE: continuous aggregate "thresh_2" is already up-to-date SELECT watermark AS thresh_hyper_id_watermark FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold WHERE hypertable_id = :thresh_hyper_id \gset -- Refresh where we start from the current watermark to infinity CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); -psql:include/cagg_invalidation_common.sql:603: NOTICE: continuous aggregate "thresh_2" is already up-to-date +psql:include/cagg_invalidation_common.sql:626: NOTICE: continuous aggregate "thresh_2" is already up-to-date -- Now refresh with max end of the window to test that the -- invalidation threshold is capped at the last bucket of data CALL refresh_continuous_aggregate('thresh_2', 0, NULL); @@ -1258,7 +1300,7 @@ INSERT INTO conditions VALUES(3, 1, 1.0); INSERT INTO conditions VALUES(4, 1, 1.0); INSERT INTO conditions VALUES(6, 1, 1.0); CALL refresh_continuous_aggregate('cond_1', 10, NULL); -psql:include/cagg_invalidation_common.sql:725: NOTICE: continuous aggregate "cond_1" is already up-to-date +psql:include/cagg_invalidation_common.sql:748: NOTICE: continuous aggregate "cond_1" is already up-to-date SELECT * FROM cagg_invals WHERE cagg_id = :cond_1_id; cagg_id | start | end @@ -1284,7 +1326,7 @@ INSERT INTO conditions VALUES (40, 1, 1.0); -- Refresh to process invalidations, but outside the range of -- invalidations we inserted so that we don't clear them. CALL refresh_continuous_aggregate('cond_10', 50, 60); -psql:include/cagg_invalidation_common.sql:746: NOTICE: continuous aggregate "cond_10" is already up-to-date +psql:include/cagg_invalidation_common.sql:769: NOTICE: continuous aggregate "cond_10" is already up-to-date SELECT mat_hypertable_id AS cond_10_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'cond_10' \gset @@ -1327,17 +1369,17 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200); SET timescaledb.materializations_per_refresh_window='foo'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:785: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:808: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "foo". SET timescaledb.materializations_per_refresh_window='2bar'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:788: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:811: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "2bar". SET timescaledb.materializations_per_refresh_window='-'; INSERT INTO conditions VALUES (140, 1, 1.0); CALL refresh_continuous_aggregate('cond_10', 0, 200); -psql:include/cagg_invalidation_common.sql:792: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +psql:include/cagg_invalidation_common.sql:815: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" DETAIL: Expected an integer but current value is "-". \set VERBOSITY terse -- cleanup diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-13.out b/tsl/test/expected/cagg_invalidation_dist_ht-13.out new file mode 100644 index 000000000..e15700ddf --- /dev/null +++ b/tsl/test/expected/cagg_invalidation_dist_ht-13.out @@ -0,0 +1,1389 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +------------------------------------ +-- Set up a distributed environment +------------------------------------ +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +\set DATA_NODE_1 :TEST_DBNAME _1 +\set DATA_NODE_2 :TEST_DBNAME _2 +\set DATA_NODE_3 :TEST_DBNAME _3 +\ir include/remote_exec.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE SCHEMA IF NOT EXISTS test; +psql:include/remote_exec.sql:5: NOTICE: schema "test" already exists, skipping +GRANT USAGE ON SCHEMA test TO PUBLIC; +CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' +LANGUAGE C; +CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text) +RETURNS TABLE("table_record" CSTRING[]) +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings' +LANGUAGE C; +SELECT (add_data_node (name, host => 'localhost', DATABASE => name)).* +FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v (name); + node_name | host | port | database | node_created | database_created | extension_created +--------------------------------+-----------+-------+--------------------------------+--------------+------------------+------------------- + db_cagg_invalidation_dist_ht_1 | localhost | 55432 | db_cagg_invalidation_dist_ht_1 | t | t | t + db_cagg_invalidation_dist_ht_2 | localhost | 55432 | db_cagg_invalidation_dist_ht_2 | t | t | t + db_cagg_invalidation_dist_ht_3 | localhost | 55432 | db_cagg_invalidation_dist_ht_3 | t | t | t +(3 rows) + +GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; +\set IS_DISTRIBUTED TRUE +\ir include/cagg_invalidation_common.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Disable background workers since we are testing manual refresh +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +SELECT _timescaledb_internal.stop_background_workers(); + stop_background_workers +------------------------- + t +(1 row) + +SET ROLE :ROLE_DEFAULT_PERM_USER; +SET datestyle TO 'ISO, YMD'; +SET timezone TO 'UTC'; +CREATE TABLE conditions (time bigint NOT NULL, device int, temp float); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('conditions', 'time', chunk_time_interval => 10, replication_factor => 2); + create_distributed_hypertable +------------------------------- + (1,public,conditions,t) +(1 row) + +\else +SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10); +\endif +CREATE TABLE measurements (time int NOT NULL, device int, temp float); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('measurements', 'time', chunk_time_interval => 10, replication_factor => 2); + create_distributed_hypertable +------------------------------- + (2,public,measurements,t) +(1 row) + +\else +SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION bigint_now() +RETURNS bigint LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM conditions +$$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION bigint_now() +RETURNS bigint LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM conditions +$$; +$DIST$); +\endif +CREATE OR REPLACE FUNCTION int_now() +RETURNS int LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM measurements +$$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION int_now() +RETURNS int LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM measurements +$$; +$DIST$); +\endif +SELECT set_integer_now_func('conditions', 'bigint_now'); + set_integer_now_func +---------------------- + +(1 row) + +SELECT set_integer_now_func('measurements', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +INSERT INTO conditions +SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int, + abs(timestamp_hash(to_timestamp(t)::timestamp))%40 +FROM generate_series(1, 100, 1) t; +CREATE TABLE temp AS +SELECT * FROM conditions; +INSERT INTO measurements +SELECT * FROM temp; +-- Show the most recent data +SELECT * FROM conditions +ORDER BY time DESC, device +LIMIT 10; + time | device | temp +------+--------+------ + 100 | 0 | 8 + 99 | 1 | 5 + 98 | 2 | 26 + 97 | 2 | 10 + 96 | 2 | 34 + 95 | 2 | 30 + 94 | 3 | 31 + 93 | 0 | 4 + 92 | 0 | 32 + 91 | 3 | 15 +(10 rows) + +-- Create two continuous aggregates on the same hypertable to test +-- that invalidations are handled correctly across both of them. +CREATE MATERIALIZED VIEW cond_10 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '10', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +CREATE MATERIALIZED VIEW cond_20 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +CREATE MATERIALIZED VIEW measure_10 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(10, time) AS bucket, device, avg(temp) AS avg_temp +FROM measurements +GROUP BY 1,2 WITH NO DATA; +-- There should be three continuous aggregates, two on one hypertable +-- and one on the other: +SELECT mat_hypertable_id, raw_hypertable_id, user_view_name +FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | raw_hypertable_id | user_view_name +-------------------+-------------------+---------------- + 3 | 1 | cond_10 + 4 | 1 | cond_20 + 5 | 2 | measure_10 +(3 rows) + +-- The continuous aggregates should be empty +SELECT * FROM cond_10 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM cond_20 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM measure_10 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +\if :IS_DISTRIBUTED +CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE( + "hyper_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$ + SELECT hypertable_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log +$DIST$) +ORDER BY 1,2,3 +$$; +CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE( + "cagg_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$ + SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log +$DIST$) +ORDER BY 1,2,3 +$$; +\else +CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE ( + "hyper_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT hypertable_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log + ORDER BY 1,2,3 +$$; +CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE ( + "cagg_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT materialization_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + ORDER BY 1,2,3 +$$; +\endif +CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); +CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); +-- Must refresh to move the invalidation threshold, or no +-- invalidations will be generated. Initially, there is no threshold +-- set: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- There should be only "infinite" invalidations in the cagg +-- invalidation log: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(3 rows) + +-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly: +CALL refresh_continuous_aggregate('cond_10', 1, 50); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Invalidations should be cleared inside the refresh window: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | 9 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refresh up to 50 from the beginning +CALL refresh_continuous_aggregate('cond_10', 0, 50); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refreshing below the threshold does not move it: +CALL refresh_continuous_aggregate('cond_10', 20, 49); +psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Nothing changes with invalidations either since the region was +-- already refreshed and no new invalidations have been generated: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refreshing measure_10 moves the threshold only for the other hypertable: +CALL refresh_continuous_aggregate('measure_10', 0, 30); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 + 2 | 30 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(5 rows) + +-- Refresh on the second continuous aggregate, cond_20, on the first +-- hypertable moves the same threshold as when refreshing cond_10: +CALL refresh_continuous_aggregate('cond_20', 60, 100); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- There should be no hypertable invalidations initially: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- +(0 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- Create invalidations across different ranges. Some of these should +-- be deleted and others cut in different ways when a refresh is +-- run. Note that the refresh window is inclusive in the start of the +-- window but exclusive at the end. +-- Entries that should be left unmodified: +INSERT INTO conditions VALUES (10, 4, 23.7); +INSERT INTO conditions VALUES (10, 5, 23.8), (19, 3, 23.6); +INSERT INTO conditions VALUES (60, 3, 23.7), (70, 4, 23.7); +-- Should see some invaliations in the hypertable invalidation log: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 10 | 10 + 1 | 10 | 19 + 1 | 60 | 60 + 1 | 60 | 70 + 1 | 70 | 70 +(5 rows) + +-- Generate some invalidations for the other hypertable +INSERT INTO measurements VALUES (20, 4, 23.7); +INSERT INTO measurements VALUES (30, 5, 23.8), (80, 3, 23.6); +-- Should now see invalidations for both hypertables +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 10 | 10 + 1 | 10 | 19 + 1 | 60 | 60 + 1 | 60 | 70 + 1 | 70 | 70 + 2 | 20 | 20 + 2 | 30 | 80 +(7 rows) + +-- First refresh a window where we don't have any invalidations. This +-- allows us to see only the copying of the invalidations to the per +-- cagg log without additional processing. +CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidation threshold remains at 100: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + +-- Invalidations should be moved from the hypertable invalidation log +-- to the continuous aggregate log, but only for the hypertable that +-- the refreshed aggregate belongs to: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Now add more invalidations to test a refresh that overlaps with them. +-- Entries that should be deleted: +INSERT INTO conditions VALUES (30, 1, 23.4), (59, 1, 23.4); +INSERT INTO conditions VALUES (20, 1, 23.4), (30, 1, 23.4); +-- Entries that should be cut to the right, leaving an invalidation to +-- the left of the refresh window: +INSERT INTO conditions VALUES (1, 4, 23.7), (25, 1, 23.4); +INSERT INTO conditions VALUES (19, 4, 23.7), (59, 1, 23.4); +-- Entries that should be cut to the left and right, leaving two +-- invalidation entries on each side of the refresh window: +INSERT INTO conditions VALUES (2, 2, 23.5), (60, 1, 23.4); +INSERT INTO conditions VALUES (3, 2, 23.5), (80, 1, 23.4); +-- Entries that should be cut to the left, leaving an invalidation to +-- the right of the refresh window: +INSERT INTO conditions VALUES (60, 3, 23.6), (90, 3, 23.6); +INSERT INTO conditions VALUES (20, 5, 23.8), (100, 3, 23.6); +-- New invalidations in the hypertable invalidation log: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 1 | 1 + 1 | 1 | 25 + 1 | 2 | 60 + 1 | 3 | 3 + 1 | 3 | 80 + 1 | 19 | 19 + 1 | 19 | 59 + 1 | 20 | 20 + 1 | 20 | 30 + 1 | 20 | 100 + 1 | 25 | 25 + 1 | 30 | 30 + 1 | 30 | 59 + 1 | 59 | 59 + 1 | 60 | 90 + 1 | 80 | 80 + 1 | 100 | 100 + 2 | 20 | 20 + 2 | 30 | 80 +(19 rows) + +-- But nothing has yet changed in the cagg invalidation log: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Refresh to process invalidations for daily temperature: +CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidations should be moved from the hypertable invalidation log +-- to the continuous aggregate log. +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- Only the cond_10 cagg should have its entries cut: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | -9223372036854775808 | 19 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 0 | 99 + 4 | 0 | 119 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(12 rows) + +-- Refresh also cond_20: +CALL refresh_continuous_aggregate('cond_20', 20, 60); +-- The cond_20 cagg should also have its entries cut: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | -9223372036854775808 | 19 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(8 rows) + +-- Refresh cond_10 to completely remove an invalidation: +CALL refresh_continuous_aggregate('cond_10', 0, 20); +-- The 1-19 invalidation should be deleted: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- Clear everything between 0 and 100 to make way for new +-- invalidations +CALL refresh_continuous_aggregate('cond_10', 0, 100); +-- Test refreshing with non-overlapping invalidations +INSERT INTO conditions VALUES (20, 1, 23.4), (25, 1, 23.4); +INSERT INTO conditions VALUES (30, 1, 23.4), (46, 1, 23.4); +CALL refresh_continuous_aggregate('cond_10', 1, 40); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 40 | 49 + 3 | 100 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Refresh whithout cutting (in area where there are no +-- invalidations). Merging of overlapping entries should still happen: +INSERT INTO conditions VALUES (15, 1, 23.4), (42, 1, 23.4); +CALL refresh_continuous_aggregate('cond_10', 90, 100); +psql:include/cagg_invalidation_common.sql:327: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 49 + 3 | 100 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 0 | 59 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(10 rows) + +-- Test max refresh window +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+---------------------- + 3 | -9223372036854775808 | -9223372036854775801 + 3 | 110 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 0 | 59 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- Pick the first chunk of conditions to TRUNCATE +SELECT show_chunks AS chunk_to_truncate +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before truncating one of the chunks +SELECT * FROM :chunk_to_truncate +ORDER BY 1; + time | device | temp +------+--------+------ + 1 | 4 | 23.7 + 1 | 0 | 16 + 2 | 2 | 23.5 + 2 | 1 | 25 + 3 | 2 | 23.5 + 3 | 0 | 20 + 4 | 2 | 10 + 5 | 2 | 26 + 6 | 1 | 13 + 7 | 3 | 35 + 8 | 1 | 37 + 9 | 3 | 7 +(12 rows) + +-- Truncate one chunk +\if :IS_DISTRIBUTED +-- There is no TRUNCATE implementation for FOREIGN tables yet +\set ON_ERROR_STOP 0 +\endif +TRUNCATE TABLE :chunk_to_truncate; +psql:include/cagg_invalidation_common.sql:352: ERROR: "_dist_hyper_1_1_chunk" is not a table +\if :IS_DISTRIBUTED +\set ON_ERROR_STOP 1 +\endif +-- Should see new invalidation entries for conditions for the non-distributed case +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- TRUNCATE the hypertable to invalidate all its continuous aggregates +TRUNCATE conditions; +-- Now empty +SELECT * FROM conditions; + time | device | temp +------+--------+------ +(0 rows) + +-- Should see an infinite invalidation entry for conditions +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+----------------------+--------------------- + 1 | -9223372036854775808 | 9223372036854775807 + 2 | 20 | 20 + 2 | 30 | 80 +(3 rows) + +-- Aggregates still hold data +SELECT * FROM cond_10 +ORDER BY 1,2 +LIMIT 5; + bucket | device | avg_temp +--------+--------+---------- + 0 | 0 | 18 + 0 | 1 | 25 + 0 | 2 | 20.75 + 0 | 3 | 21 + 0 | 4 | 23.7 +(5 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2 +LIMIT 5; + bucket | device | avg_temp +--------+--------+------------------ + 20 | 0 | 18.2857142857143 + 20 | 1 | 23.5142857142857 + 20 | 2 | 26 + 20 | 3 | 23 + 20 | 5 | 23.8 +(5 rows) + +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- Both should now be empty after refresh +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Insert new data again and refresh +INSERT INTO conditions VALUES + (1, 1, 23.4), (4, 3, 14.3), (5, 1, 13.6), + (6, 2, 17.9), (12, 1, 18.3), (19, 3, 28.2), + (10, 3, 22.3), (11, 2, 34.9), (15, 2, 45.6), + (21, 1, 15.3), (22, 2, 12.3), (29, 3, 16.3); +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- Should now hold data again +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 18.5 + 0 | 2 | 17.9 + 0 | 3 | 14.3 + 10 | 1 | 18.3 + 10 | 2 | 40.25 + 10 | 3 | 25.25 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(9 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+------------------ + 0 | 1 | 18.4333333333333 + 0 | 2 | 32.8 + 0 | 3 | 21.6 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(6 rows) + +-- Truncate one of the aggregates, but first test that we block +-- TRUNCATE ONLY +\set ON_ERROR_STOP 0 +TRUNCATE ONLY cond_20; +psql:include/cagg_invalidation_common.sql:408: ERROR: cannot truncate only a continuous aggregate +\set ON_ERROR_STOP 1 +TRUNCATE cond_20; +-- Should now be empty +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Other aggregate is not affected +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 18.5 + 0 | 2 | 17.9 + 0 | 3 | 14.3 + 10 | 1 | 18.3 + 10 | 2 | 40.25 + 10 | 3 | 25.25 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(9 rows) + +-- Refresh again to bring data back +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- The aggregate should be populated again +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+------------------ + 0 | 1 | 18.4333333333333 + 0 | 2 | 32.8 + 0 | 3 | 21.6 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(6 rows) + +------------------------------------------------------- +-- Test corner cases against a minimal bucket aggregate +------------------------------------------------------- +-- First, clear the table and aggregate +TRUNCATE conditions; +SELECT * FROM conditions; + time | device | temp +------+--------+------ +(0 rows) + +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +CREATE MATERIALIZED VIEW cond_1 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '1', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +SELECT mat_hypertable_id AS cond_1_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'cond_1' \gset +-- Test manual invalidation error +\if :IS_DISTRIBUTED +\else +\set ON_ERROR_STOP 0 +SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0); +\set ON_ERROR_STOP 1 +\endif +-- Test invalidations with bucket size 1 +INSERT INTO conditions VALUES (0, 1, 1.0); +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 0 | 0 + 2 | 20 | 20 + 2 | 30 | 80 +(3 rows) + +-- Refreshing around the bucket should not update the aggregate +CALL refresh_continuous_aggregate('cond_1', -1, 0); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Refresh only the invalidated bucket +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+----------------------+--------------------- + 6 | -9223372036854775808 | -2 + 6 | 2 | 9223372036854775807 +(2 rows) + +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 +(1 row) + +-- Refresh 1 extra bucket on the left +INSERT INTO conditions VALUES (0, 1, 2.0); +CALL refresh_continuous_aggregate('cond_1', -1, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1.5 +(1 row) + +-- Refresh 1 extra bucket on the right +INSERT INTO conditions VALUES (0, 1, 3.0); +CALL refresh_continuous_aggregate('cond_1', 0, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 2 +(1 row) + +-- Refresh 1 extra bucket on each side +INSERT INTO conditions VALUES (0, 1, 4.0); +CALL refresh_continuous_aggregate('cond_1', -1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 2.5 +(1 row) + +-- Clear to reset aggregate +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +-- Test invalidation of size 2 +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0); +-- Refresh one bucket at a time +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 +(1 row) + +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +-- Repeat the same thing but refresh the whole invalidation at once +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0); +CALL refresh_continuous_aggregate('cond_1', 0, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +-- Test invalidation of size 3 +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); +-- Invalidation extends beyond the refresh window on both ends +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 1 | 1 | 2 +(1 row) + +-- Should leave one invalidation on each side of the refresh window +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+-------+--------------------- + 6 | 0 | 0 + 6 | 2 | 2 + 6 | 110 | 9223372036854775807 +(3 rows) + +-- Refresh the two remaining invalidations +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +CALL refresh_continuous_aggregate('cond_1', 2, 3); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +-- Clear and repeat but instead refresh the whole range in one go. The +-- result should be the same as the three partial refreshes. Use +-- DELETE instead of TRUNCATE to clear this time. +DELETE FROM conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); +CALL refresh_continuous_aggregate('cond_1', 0, 3); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2); +psql:include/cagg_invalidation_common.sql:565: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (7,public,threshold_test,t) +(1 row) + +\else +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +\endif +SELECT set_integer_now_func('threshold_test', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 WITH NO DATA; +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- Test manual invalidation error +\if :IS_DISTRIBUTED +\else +\set ON_ERROR_STOP 0 +SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0); +\set ON_ERROR_STOP 1 +\endif +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +psql:include/cagg_invalidation_common.sql:599: NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; +CALL refresh_continuous_aggregate('thresh_2', 0, 5); +-- Threshold should move to end of the last refreshed bucket, which is +-- the last bucket fully included in the window, i.e., the window +-- shrinks to end of previous bucket. +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 4 +(1 row) + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); +psql:include/cagg_invalidation_common.sql:619: NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); +psql:include/cagg_invalidation_common.sql:626: NOTICE: continuous aggregate "thresh_2" is already up-to-date +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT * FROM cagg_invals +WHERE cagg_id = :thresh_cagg_id; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 12 | 9223372036854775807 +(2 rows) + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset +DELETE FROM threshold_test +WHERE time > 6; +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 11 + 12 | 13 + 14 | 15 +(8 rows) + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 16 +(1 row) + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT * FROM cagg_invals +WHERE cagg_id = :thresh_cagg_id; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 16 | 9223372036854775807 +(2 rows) + +---------------------------------------------------------------------- +-- Test that dropping a chunk invalidates the dropped region. First +-- create another chunk so that we have two chunks. One of the chunks +-- will be dropped. +--------------------------------------------------------------------- +INSERT INTO conditions VALUES (10, 1, 10.0); +-- Chunks currently associated with the hypertable +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('conditions'); + chunk_to_drop +---------------------------------------------- + _timescaledb_internal._dist_hyper_1_34_chunk + _timescaledb_internal._dist_hyper_1_40_chunk +(2 rows) + +-- Pick the first one to drop +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before dropping one of the chunks +SELECT * FROM conditions +ORDER BY 1,2; + time | device | temp +------+--------+------ + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 + 10 | 1 | 10 +(4 rows) + +-- Drop one chunk +\if :IS_DISTRIBUTED +CALL distributed_exec(format('DROP TABLE IF EXISTS %s', :'chunk_to_drop')); +DROP FOREIGN TABLE :chunk_to_drop; +\else +DROP TABLE :chunk_to_drop; +\endif +-- The chunk's data no longer exists in the hypertable +SELECT * FROM conditions +ORDER BY 1,2; + time | device | temp +------+--------+------ + 10 | 1 | 10 +(1 row) + +-- Aggregate still remains in continuous aggregate, however +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +-- Refresh the continuous aggregate to make the dropped data be +-- reflected in the aggregate +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +-- Aggregate now up-to-date with the source hypertable +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 10 | 1 | 10 +(1 row) + +-- Test that adjacent invalidations are merged +INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0); +INSERT INTO conditions VALUES(3, 1, 1.0); +INSERT INTO conditions VALUES(4, 1, 1.0); +INSERT INTO conditions VALUES(6, 1, 1.0); +CALL refresh_continuous_aggregate('cond_1', 10, NULL); +psql:include/cagg_invalidation_common.sql:748: NOTICE: continuous aggregate "cond_1" is already up-to-date +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+-------+--------------------- + 6 | 1 | 4 + 6 | 6 | 6 + 6 | 110 | 9223372036854775807 +(3 rows) + +--------------------------------------------------------------------- +-- Test that single timestamp invalidations are expanded to buckets, +-- and adjacent buckets merged. This merging cannot cross Data-Node +-- chunk boundaries for the distributed hypertable case. +--------------------------------------------------------------------- +-- First clear invalidations in a range: +CALL refresh_continuous_aggregate('cond_10', -20, 60); +-- The following three should be merged to one range 0-29 +INSERT INTO conditions VALUES (5, 1, 1.0); +INSERT INTO conditions VALUES (15, 1, 1.0); +INSERT INTO conditions VALUES (25, 1, 1.0); +-- The last one should not merge with the others +INSERT INTO conditions VALUES (40, 1, 1.0); +-- Refresh to process invalidations, but outside the range of +-- invalidations we inserted so that we don't clear them. +CALL refresh_continuous_aggregate('cond_10', 50, 60); +psql:include/cagg_invalidation_common.sql:769: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT mat_hypertable_id AS cond_10_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'cond_10' \gset +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_10_id; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -21 + 3 | 0 | 9 + 3 | 0 | 19 + 3 | 10 | 29 + 3 | 20 | 29 + 3 | 40 | 49 + 3 | 60 | 9223372036854775807 +(7 rows) + +-- should trigger two individual refreshes +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Allow at most 5 individual invalidations per refreshe +SET timescaledb.materializations_per_refresh_window=5; +-- Insert into every second bucket +INSERT INTO conditions VALUES (20, 1, 1.0); +INSERT INTO conditions VALUES (40, 1, 1.0); +INSERT INTO conditions VALUES (60, 1, 1.0); +INSERT INTO conditions VALUES (80, 1, 1.0); +INSERT INTO conditions VALUES (100, 1, 1.0); +INSERT INTO conditions VALUES (120, 1, 1.0); +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +\set VERBOSITY default +-- Test acceptable values for materializations per refresh +SET timescaledb.materializations_per_refresh_window=' 5 '; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Large value will be treated as LONG_MAX +SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Test bad values for materializations per refresh +SET timescaledb.materializations_per_refresh_window='foo'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:808: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "foo". +SET timescaledb.materializations_per_refresh_window='2bar'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:811: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "2bar". +SET timescaledb.materializations_per_refresh_window='-'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:815: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "-". +\set VERBOSITY terse +-- cleanup +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +DROP DATABASE :DATA_NODE_1; +DROP DATABASE :DATA_NODE_2; +DROP DATABASE :DATA_NODE_3; diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-14.out b/tsl/test/expected/cagg_invalidation_dist_ht-14.out new file mode 100644 index 000000000..7be17390e --- /dev/null +++ b/tsl/test/expected/cagg_invalidation_dist_ht-14.out @@ -0,0 +1,1389 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +------------------------------------ +-- Set up a distributed environment +------------------------------------ +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +\set DATA_NODE_1 :TEST_DBNAME _1 +\set DATA_NODE_2 :TEST_DBNAME _2 +\set DATA_NODE_3 :TEST_DBNAME _3 +\ir include/remote_exec.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +CREATE SCHEMA IF NOT EXISTS test; +psql:include/remote_exec.sql:5: NOTICE: schema "test" already exists, skipping +GRANT USAGE ON SCHEMA test TO PUBLIC; +CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' +LANGUAGE C; +CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text) +RETURNS TABLE("table_record" CSTRING[]) +AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings' +LANGUAGE C; +SELECT (add_data_node (name, host => 'localhost', DATABASE => name)).* +FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v (name); + node_name | host | port | database | node_created | database_created | extension_created +--------------------------------+-----------+-------+--------------------------------+--------------+------------------+------------------- + db_cagg_invalidation_dist_ht_1 | localhost | 55432 | db_cagg_invalidation_dist_ht_1 | t | t | t + db_cagg_invalidation_dist_ht_2 | localhost | 55432 | db_cagg_invalidation_dist_ht_2 | t | t | t + db_cagg_invalidation_dist_ht_3 | localhost | 55432 | db_cagg_invalidation_dist_ht_3 | t | t | t +(3 rows) + +GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC; +\set IS_DISTRIBUTED TRUE +\ir include/cagg_invalidation_common.sql +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Disable background workers since we are testing manual refresh +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER +SELECT _timescaledb_internal.stop_background_workers(); + stop_background_workers +------------------------- + t +(1 row) + +SET ROLE :ROLE_DEFAULT_PERM_USER; +SET datestyle TO 'ISO, YMD'; +SET timezone TO 'UTC'; +CREATE TABLE conditions (time bigint NOT NULL, device int, temp float); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('conditions', 'time', chunk_time_interval => 10, replication_factor => 2); + create_distributed_hypertable +------------------------------- + (1,public,conditions,t) +(1 row) + +\else +SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10); +\endif +CREATE TABLE measurements (time int NOT NULL, device int, temp float); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('measurements', 'time', chunk_time_interval => 10, replication_factor => 2); + create_distributed_hypertable +------------------------------- + (2,public,measurements,t) +(1 row) + +\else +SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10); +\endif +CREATE OR REPLACE FUNCTION bigint_now() +RETURNS bigint LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM conditions +$$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION bigint_now() +RETURNS bigint LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM conditions +$$; +$DIST$); +\endif +CREATE OR REPLACE FUNCTION int_now() +RETURNS int LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM measurements +$$; +\if :IS_DISTRIBUTED +CALL distributed_exec($DIST$ +CREATE OR REPLACE FUNCTION int_now() +RETURNS int LANGUAGE SQL STABLE AS +$$ + SELECT coalesce(max(time), 0) + FROM measurements +$$; +$DIST$); +\endif +SELECT set_integer_now_func('conditions', 'bigint_now'); + set_integer_now_func +---------------------- + +(1 row) + +SELECT set_integer_now_func('measurements', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +INSERT INTO conditions +SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int, + abs(timestamp_hash(to_timestamp(t)::timestamp))%40 +FROM generate_series(1, 100, 1) t; +CREATE TABLE temp AS +SELECT * FROM conditions; +INSERT INTO measurements +SELECT * FROM temp; +-- Show the most recent data +SELECT * FROM conditions +ORDER BY time DESC, device +LIMIT 10; + time | device | temp +------+--------+------ + 100 | 0 | 8 + 99 | 1 | 5 + 98 | 2 | 26 + 97 | 2 | 10 + 96 | 2 | 34 + 95 | 2 | 30 + 94 | 3 | 31 + 93 | 0 | 4 + 92 | 0 | 32 + 91 | 3 | 15 +(10 rows) + +-- Create two continuous aggregates on the same hypertable to test +-- that invalidations are handled correctly across both of them. +CREATE MATERIALIZED VIEW cond_10 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '10', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +CREATE MATERIALIZED VIEW cond_20 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +CREATE MATERIALIZED VIEW measure_10 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(10, time) AS bucket, device, avg(temp) AS avg_temp +FROM measurements +GROUP BY 1,2 WITH NO DATA; +-- There should be three continuous aggregates, two on one hypertable +-- and one on the other: +SELECT mat_hypertable_id, raw_hypertable_id, user_view_name +FROM _timescaledb_catalog.continuous_agg; + mat_hypertable_id | raw_hypertable_id | user_view_name +-------------------+-------------------+---------------- + 3 | 1 | cond_10 + 4 | 1 | cond_20 + 5 | 2 | measure_10 +(3 rows) + +-- The continuous aggregates should be empty +SELECT * FROM cond_10 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM cond_20 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM measure_10 +ORDER BY 1 DESC, 2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +\if :IS_DISTRIBUTED +CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE( + "hyper_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$ + SELECT hypertable_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log +$DIST$) +ORDER BY 1,2,3 +$$; +CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE( + "cagg_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$ + SELECT materialization_id AS cagg_id, + lowest_modified_value AS start, + greatest_modified_value AS end + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log +$DIST$) +ORDER BY 1,2,3 +$$; +\else +CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE ( + "hyper_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT hypertable_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log + ORDER BY 1,2,3 +$$; +CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE ( + "cagg_id" INT, + "start" BIGINT, + "end" BIGINT +) +LANGUAGE SQL VOLATILE AS +$$ +SELECT materialization_id, + lowest_modified_value, + greatest_modified_value + FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log + ORDER BY 1,2,3 +$$; +\endif +CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals(); +CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals(); +-- Must refresh to move the invalidation threshold, or no +-- invalidations will be generated. Initially, there is no threshold +-- set: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- There should be only "infinite" invalidations in the cagg +-- invalidation log: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(3 rows) + +-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly: +CALL refresh_continuous_aggregate('cond_10', 1, 50); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Invalidations should be cleared inside the refresh window: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | 9 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refresh up to 50 from the beginning +CALL refresh_continuous_aggregate('cond_10', 0, 50); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refreshing below the threshold does not move it: +CALL refresh_continuous_aggregate('cond_10', 20, 49); +psql:include/cagg_invalidation_common.sql:207: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 +(1 row) + +-- Nothing changes with invalidations either since the region was +-- already refreshed and no new invalidations have been generated: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | 9223372036854775807 +(4 rows) + +-- Refreshing measure_10 moves the threshold only for the other hypertable: +CALL refresh_continuous_aggregate('measure_10', 0, 30); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 50 + 2 | 30 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(5 rows) + +-- Refresh on the second continuous aggregate, cond_20, on the first +-- hypertable moves the same threshold as when refreshing cond_10: +CALL refresh_continuous_aggregate('cond_20', 60, 100); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- There should be no hypertable invalidations initially: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- +(0 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 50 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- Create invalidations across different ranges. Some of these should +-- be deleted and others cut in different ways when a refresh is +-- run. Note that the refresh window is inclusive in the start of the +-- window but exclusive at the end. +-- Entries that should be left unmodified: +INSERT INTO conditions VALUES (10, 4, 23.7); +INSERT INTO conditions VALUES (10, 5, 23.8), (19, 3, 23.6); +INSERT INTO conditions VALUES (60, 3, 23.7), (70, 4, 23.7); +-- Should see some invaliations in the hypertable invalidation log: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 10 | 10 + 1 | 10 | 19 + 1 | 60 | 60 + 1 | 60 | 70 + 1 | 70 | 70 +(5 rows) + +-- Generate some invalidations for the other hypertable +INSERT INTO measurements VALUES (20, 4, 23.7); +INSERT INTO measurements VALUES (30, 5, 23.8), (80, 3, 23.6); +-- Should now see invalidations for both hypertables +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 10 | 10 + 1 | 10 | 19 + 1 | 60 | 60 + 1 | 60 | 70 + 1 | 70 | 70 + 2 | 20 | 20 + 2 | 30 | 80 +(7 rows) + +-- First refresh a window where we don't have any invalidations. This +-- allows us to see only the copying of the invalidations to the per +-- cagg log without additional processing. +CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidation threshold remains at 100: +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 1 | 100 + 2 | 30 +(2 rows) + +-- Invalidations should be moved from the hypertable invalidation log +-- to the continuous aggregate log, but only for the hypertable that +-- the refreshed aggregate belongs to: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Now add more invalidations to test a refresh that overlaps with them. +-- Entries that should be deleted: +INSERT INTO conditions VALUES (30, 1, 23.4), (59, 1, 23.4); +INSERT INTO conditions VALUES (20, 1, 23.4), (30, 1, 23.4); +-- Entries that should be cut to the right, leaving an invalidation to +-- the left of the refresh window: +INSERT INTO conditions VALUES (1, 4, 23.7), (25, 1, 23.4); +INSERT INTO conditions VALUES (19, 4, 23.7), (59, 1, 23.4); +-- Entries that should be cut to the left and right, leaving two +-- invalidation entries on each side of the refresh window: +INSERT INTO conditions VALUES (2, 2, 23.5), (60, 1, 23.4); +INSERT INTO conditions VALUES (3, 2, 23.5), (80, 1, 23.4); +-- Entries that should be cut to the left, leaving an invalidation to +-- the right of the refresh window: +INSERT INTO conditions VALUES (60, 3, 23.6), (90, 3, 23.6); +INSERT INTO conditions VALUES (20, 5, 23.8), (100, 3, 23.6); +-- New invalidations in the hypertable invalidation log: +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 1 | 1 + 1 | 1 | 25 + 1 | 2 | 60 + 1 | 3 | 3 + 1 | 3 | 80 + 1 | 19 | 19 + 1 | 19 | 59 + 1 | 20 | 20 + 1 | 20 | 30 + 1 | 20 | 100 + 1 | 25 | 25 + 1 | 30 | 30 + 1 | 30 | 59 + 1 | 59 | 59 + 1 | 60 | 90 + 1 | 80 | 80 + 1 | 100 | 100 + 2 | 20 | 20 + 2 | 30 | 80 +(19 rows) + +-- But nothing has yet changed in the cagg invalidation log: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Refresh to process invalidations for daily temperature: +CALL refresh_continuous_aggregate('cond_10', 20, 60); +-- Invalidations should be moved from the hypertable invalidation log +-- to the continuous aggregate log. +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- Only the cond_10 cagg should have its entries cut: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | -9223372036854775808 | 19 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 59 + 4 | 0 | 19 + 4 | 0 | 99 + 4 | 0 | 119 + 4 | 60 | 79 + 4 | 100 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(12 rows) + +-- Refresh also cond_20: +CALL refresh_continuous_aggregate('cond_20', 20, 60); +-- The cond_20 cagg should also have its entries cut: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | -9223372036854775808 | 19 + 3 | 10 | 19 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(8 rows) + +-- Refresh cond_10 to completely remove an invalidation: +CALL refresh_continuous_aggregate('cond_10', 0, 20); +-- The 1-19 invalidation should be deleted: +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 60 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(6 rows) + +-- Clear everything between 0 and 100 to make way for new +-- invalidations +CALL refresh_continuous_aggregate('cond_10', 0, 100); +-- Test refreshing with non-overlapping invalidations +INSERT INTO conditions VALUES (20, 1, 23.4), (25, 1, 23.4); +INSERT INTO conditions VALUES (30, 1, 23.4), (46, 1, 23.4); +CALL refresh_continuous_aggregate('cond_10', 1, 40); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 40 | 49 + 3 | 100 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +-- Refresh whithout cutting (in area where there are no +-- invalidations). Merging of overlapping entries should still happen: +INSERT INTO conditions VALUES (15, 1, 23.4), (42, 1, 23.4); +CALL refresh_continuous_aggregate('cond_10', 90, 100); +psql:include/cagg_invalidation_common.sql:327: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -1 + 3 | 10 | 49 + 3 | 100 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 0 | 59 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(10 rows) + +-- Test max refresh window +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +SELECT * FROM cagg_invals; + cagg_id | start | end +---------+----------------------+---------------------- + 3 | -9223372036854775808 | -9223372036854775801 + 3 | 110 | 9223372036854775807 + 4 | -9223372036854775808 | 19 + 4 | 0 | 59 + 4 | 20 | 39 + 4 | 20 | 59 + 4 | 60 | 9223372036854775807 + 5 | -9223372036854775808 | -1 + 5 | 30 | 9223372036854775807 +(9 rows) + +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- Pick the first chunk of conditions to TRUNCATE +SELECT show_chunks AS chunk_to_truncate +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before truncating one of the chunks +SELECT * FROM :chunk_to_truncate +ORDER BY 1; + time | device | temp +------+--------+------ + 1 | 4 | 23.7 + 1 | 0 | 16 + 2 | 2 | 23.5 + 2 | 1 | 25 + 3 | 2 | 23.5 + 3 | 0 | 20 + 4 | 2 | 10 + 5 | 2 | 26 + 6 | 1 | 13 + 7 | 3 | 35 + 8 | 1 | 37 + 9 | 3 | 7 +(12 rows) + +-- Truncate one chunk +\if :IS_DISTRIBUTED +-- There is no TRUNCATE implementation for FOREIGN tables yet +\set ON_ERROR_STOP 0 +\endif +TRUNCATE TABLE :chunk_to_truncate; +psql:include/cagg_invalidation_common.sql:352: ERROR: cannot truncate foreign table "_dist_hyper_1_1_chunk" +\if :IS_DISTRIBUTED +\set ON_ERROR_STOP 1 +\endif +-- Should see new invalidation entries for conditions for the non-distributed case +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 2 | 20 | 20 + 2 | 30 | 80 +(2 rows) + +-- TRUNCATE the hypertable to invalidate all its continuous aggregates +TRUNCATE conditions; +-- Now empty +SELECT * FROM conditions; + time | device | temp +------+--------+------ +(0 rows) + +-- Should see an infinite invalidation entry for conditions +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+----------------------+--------------------- + 1 | -9223372036854775808 | 9223372036854775807 + 2 | 20 | 20 + 2 | 30 | 80 +(3 rows) + +-- Aggregates still hold data +SELECT * FROM cond_10 +ORDER BY 1,2 +LIMIT 5; + bucket | device | avg_temp +--------+--------+---------- + 0 | 0 | 18 + 0 | 1 | 25 + 0 | 2 | 20.75 + 0 | 3 | 21 + 0 | 4 | 23.7 +(5 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2 +LIMIT 5; + bucket | device | avg_temp +--------+--------+------------------ + 20 | 0 | 18.2857142857143 + 20 | 1 | 23.5142857142857 + 20 | 2 | 26 + 20 | 3 | 23 + 20 | 5 | 23.8 +(5 rows) + +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- Both should now be empty after refresh +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Insert new data again and refresh +INSERT INTO conditions VALUES + (1, 1, 23.4), (4, 3, 14.3), (5, 1, 13.6), + (6, 2, 17.9), (12, 1, 18.3), (19, 3, 28.2), + (10, 3, 22.3), (11, 2, 34.9), (15, 2, 45.6), + (21, 1, 15.3), (22, 2, 12.3), (29, 3, 16.3); +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- Should now hold data again +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 18.5 + 0 | 2 | 17.9 + 0 | 3 | 14.3 + 10 | 1 | 18.3 + 10 | 2 | 40.25 + 10 | 3 | 25.25 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(9 rows) + +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+------------------ + 0 | 1 | 18.4333333333333 + 0 | 2 | 32.8 + 0 | 3 | 21.6 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(6 rows) + +-- Truncate one of the aggregates, but first test that we block +-- TRUNCATE ONLY +\set ON_ERROR_STOP 0 +TRUNCATE ONLY cond_20; +psql:include/cagg_invalidation_common.sql:408: ERROR: cannot truncate only a continuous aggregate +\set ON_ERROR_STOP 1 +TRUNCATE cond_20; +-- Should now be empty +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Other aggregate is not affected +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 18.5 + 0 | 2 | 17.9 + 0 | 3 | 14.3 + 10 | 1 | 18.3 + 10 | 2 | 40.25 + 10 | 3 | 25.25 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(9 rows) + +-- Refresh again to bring data back +CALL refresh_continuous_aggregate('cond_20', NULL, NULL); +-- The aggregate should be populated again +SELECT * FROM cond_20 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+------------------ + 0 | 1 | 18.4333333333333 + 0 | 2 | 32.8 + 0 | 3 | 21.6 + 20 | 1 | 15.3 + 20 | 2 | 12.3 + 20 | 3 | 16.3 +(6 rows) + +------------------------------------------------------- +-- Test corner cases against a minimal bucket aggregate +------------------------------------------------------- +-- First, clear the table and aggregate +TRUNCATE conditions; +SELECT * FROM conditions; + time | device | temp +------+--------+------ +(0 rows) + +CALL refresh_continuous_aggregate('cond_10', NULL, NULL); +SELECT * FROM cond_10 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +CREATE MATERIALIZED VIEW cond_1 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(BIGINT '1', time) AS bucket, device, avg(temp) AS avg_temp +FROM conditions +GROUP BY 1,2 WITH NO DATA; +SELECT mat_hypertable_id AS cond_1_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'cond_1' \gset +-- Test manual invalidation error +\if :IS_DISTRIBUTED +\else +\set ON_ERROR_STOP 0 +SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0); +\set ON_ERROR_STOP 1 +\endif +-- Test invalidations with bucket size 1 +INSERT INTO conditions VALUES (0, 1, 1.0); +SELECT * FROM hyper_invals; + hyper_id | start | end +----------+-------+----- + 1 | 0 | 0 + 2 | 20 | 20 + 2 | 30 | 80 +(3 rows) + +-- Refreshing around the bucket should not update the aggregate +CALL refresh_continuous_aggregate('cond_1', -1, 0); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- +(0 rows) + +-- Refresh only the invalidated bucket +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+----------------------+--------------------- + 6 | -9223372036854775808 | -2 + 6 | 2 | 9223372036854775807 +(2 rows) + +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 +(1 row) + +-- Refresh 1 extra bucket on the left +INSERT INTO conditions VALUES (0, 1, 2.0); +CALL refresh_continuous_aggregate('cond_1', -1, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1.5 +(1 row) + +-- Refresh 1 extra bucket on the right +INSERT INTO conditions VALUES (0, 1, 3.0); +CALL refresh_continuous_aggregate('cond_1', 0, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 2 +(1 row) + +-- Refresh 1 extra bucket on each side +INSERT INTO conditions VALUES (0, 1, 4.0); +CALL refresh_continuous_aggregate('cond_1', -1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 2.5 +(1 row) + +-- Clear to reset aggregate +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +-- Test invalidation of size 2 +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0); +-- Refresh one bucket at a time +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 +(1 row) + +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +-- Repeat the same thing but refresh the whole invalidation at once +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0); +CALL refresh_continuous_aggregate('cond_1', 0, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +-- Test invalidation of size 3 +TRUNCATE conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); +-- Invalidation extends beyond the refresh window on both ends +CALL refresh_continuous_aggregate('cond_1', 1, 2); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 1 | 1 | 2 +(1 row) + +-- Should leave one invalidation on each side of the refresh window +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+-------+--------------------- + 6 | 0 | 0 + 6 | 2 | 2 + 6 | 110 | 9223372036854775807 +(3 rows) + +-- Refresh the two remaining invalidations +CALL refresh_continuous_aggregate('cond_1', 0, 1); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 +(2 rows) + +CALL refresh_continuous_aggregate('cond_1', 2, 3); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +-- Clear and repeat but instead refresh the whole range in one go. The +-- result should be the same as the three partial refreshes. Use +-- DELETE instead of TRUNCATE to clear this time. +DELETE FROM conditions; +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0); +CALL refresh_continuous_aggregate('cond_1', 0, 3); +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +---------------------------------------------- +-- Test that invalidation threshold is capped +---------------------------------------------- +CREATE table threshold_test (time int, value int); +\if :IS_DISTRIBUTED +SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2); +psql:include/cagg_invalidation_common.sql:565: NOTICE: adding not-null constraint to column "time" + create_distributed_hypertable +------------------------------- + (7,public,threshold_test,t) +(1 row) + +\else +SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4); +\endif +SELECT set_integer_now_func('threshold_test', 'int_now'); + set_integer_now_func +---------------------- + +(1 row) + +CREATE MATERIALIZED VIEW thresh_2 +WITH (timescaledb.continuous, + timescaledb.materialized_only=true) +AS +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 WITH NO DATA; +SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'thresh_2' \gset +-- There's no invalidation threshold initially +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- +(0 rows) + +-- Test manual invalidation error +\if :IS_DISTRIBUTED +\else +\set ON_ERROR_STOP 0 +SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0); +\set ON_ERROR_STOP 1 +\endif +-- Test that threshold is initilized to min value when there's no data +-- and we specify an infinite end. Note that the min value may differ +-- depending on time type. +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +psql:include/cagg_invalidation_common.sql:599: NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+------------- + 7 | -2147483648 +(1 row) + +INSERT INTO threshold_test +SELECT v, v FROM generate_series(1, 10) v; +CALL refresh_continuous_aggregate('thresh_2', 0, 5); +-- Threshold should move to end of the last refreshed bucket, which is +-- the last bucket fully included in the window, i.e., the window +-- shrinks to end of previous bucket. +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 4 +(1 row) + +-- Refresh where both the start and end of the window is above the +-- max data value +CALL refresh_continuous_aggregate('thresh_2', 14, NULL); +psql:include/cagg_invalidation_common.sql:619: NOTICE: continuous aggregate "thresh_2" is already up-to-date +SELECT watermark AS thresh_hyper_id_watermark +FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id \gset +-- Refresh where we start from the current watermark to infinity +CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL); +psql:include/cagg_invalidation_common.sql:626: NOTICE: continuous aggregate "thresh_2" is already up-to-date +-- Now refresh with max end of the window to test that the +-- invalidation threshold is capped at the last bucket of data +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Should not have processed invalidations beyond the invalidation +-- threshold. +SELECT * FROM cagg_invals +WHERE cagg_id = :thresh_cagg_id; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 12 | 9223372036854775807 +(2 rows) + +-- Check that things are properly materialized +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Delete the last data +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('threshold_test') +ORDER BY 1 DESC +LIMIT 1 \gset +DELETE FROM threshold_test +WHERE time > 6; +-- The last data in the hypertable is gone +SELECT time_bucket(2, time) AS bucket, max(value) AS max +FROM threshold_test +GROUP BY 1 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The aggregate still holds data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 10 +(6 rows) + +-- Refresh the aggregate to bring it up-to-date +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Data also gone from the aggregate +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 6 +(4 rows) + +-- The invalidation threshold remains the same +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 12 +(1 row) + +-- Insert new data beyond the invalidation threshold to move it +-- forward +INSERT INTO threshold_test +SELECT v, v FROM generate_series(7, 15) v; +CALL refresh_continuous_aggregate('thresh_2', 0, NULL); +-- Aggregate now updated to reflect newly aggregated data +SELECT * FROM thresh_2 +ORDER BY 1; + bucket | max +--------+----- + 0 | 1 + 2 | 3 + 4 | 5 + 6 | 7 + 8 | 9 + 10 | 11 + 12 | 13 + 14 | 15 +(8 rows) + +-- The invalidation threshold should have moved forward to the end of +-- the new data +SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold +WHERE hypertable_id = :thresh_hyper_id +ORDER BY 1,2; + hypertable_id | watermark +---------------+----------- + 7 | 16 +(1 row) + +-- The aggregate remains invalid beyond the invalidation threshold +SELECT * FROM cagg_invals +WHERE cagg_id = :thresh_cagg_id; + cagg_id | start | end +---------+----------------------+--------------------- + 8 | -9223372036854775808 | -1 + 8 | 16 | 9223372036854775807 +(2 rows) + +---------------------------------------------------------------------- +-- Test that dropping a chunk invalidates the dropped region. First +-- create another chunk so that we have two chunks. One of the chunks +-- will be dropped. +--------------------------------------------------------------------- +INSERT INTO conditions VALUES (10, 1, 10.0); +-- Chunks currently associated with the hypertable +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('conditions'); + chunk_to_drop +---------------------------------------------- + _timescaledb_internal._dist_hyper_1_34_chunk + _timescaledb_internal._dist_hyper_1_40_chunk +(2 rows) + +-- Pick the first one to drop +SELECT show_chunks AS chunk_to_drop +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset +-- Show the data before dropping one of the chunks +SELECT * FROM conditions +ORDER BY 1,2; + time | device | temp +------+--------+------ + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 + 10 | 1 | 10 +(4 rows) + +-- Drop one chunk +\if :IS_DISTRIBUTED +CALL distributed_exec(format('DROP TABLE IF EXISTS %s', :'chunk_to_drop')); +DROP FOREIGN TABLE :chunk_to_drop; +\else +DROP TABLE :chunk_to_drop; +\endif +-- The chunk's data no longer exists in the hypertable +SELECT * FROM conditions +ORDER BY 1,2; + time | device | temp +------+--------+------ + 10 | 1 | 10 +(1 row) + +-- Aggregate still remains in continuous aggregate, however +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 0 | 1 | 1 + 1 | 1 | 2 + 2 | 1 | 3 +(3 rows) + +-- Refresh the continuous aggregate to make the dropped data be +-- reflected in the aggregate +CALL refresh_continuous_aggregate('cond_1', NULL, NULL); +-- Aggregate now up-to-date with the source hypertable +SELECT * FROM cond_1 +ORDER BY 1,2; + bucket | device | avg_temp +--------+--------+---------- + 10 | 1 | 10 +(1 row) + +-- Test that adjacent invalidations are merged +INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0); +INSERT INTO conditions VALUES(3, 1, 1.0); +INSERT INTO conditions VALUES(4, 1, 1.0); +INSERT INTO conditions VALUES(6, 1, 1.0); +CALL refresh_continuous_aggregate('cond_1', 10, NULL); +psql:include/cagg_invalidation_common.sql:748: NOTICE: continuous aggregate "cond_1" is already up-to-date +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_1_id; + cagg_id | start | end +---------+-------+--------------------- + 6 | 1 | 4 + 6 | 6 | 6 + 6 | 110 | 9223372036854775807 +(3 rows) + +--------------------------------------------------------------------- +-- Test that single timestamp invalidations are expanded to buckets, +-- and adjacent buckets merged. This merging cannot cross Data-Node +-- chunk boundaries for the distributed hypertable case. +--------------------------------------------------------------------- +-- First clear invalidations in a range: +CALL refresh_continuous_aggregate('cond_10', -20, 60); +-- The following three should be merged to one range 0-29 +INSERT INTO conditions VALUES (5, 1, 1.0); +INSERT INTO conditions VALUES (15, 1, 1.0); +INSERT INTO conditions VALUES (25, 1, 1.0); +-- The last one should not merge with the others +INSERT INTO conditions VALUES (40, 1, 1.0); +-- Refresh to process invalidations, but outside the range of +-- invalidations we inserted so that we don't clear them. +CALL refresh_continuous_aggregate('cond_10', 50, 60); +psql:include/cagg_invalidation_common.sql:769: NOTICE: continuous aggregate "cond_10" is already up-to-date +SELECT mat_hypertable_id AS cond_10_id +FROM _timescaledb_catalog.continuous_agg +WHERE user_view_name = 'cond_10' \gset +SELECT * FROM cagg_invals +WHERE cagg_id = :cond_10_id; + cagg_id | start | end +---------+----------------------+--------------------- + 3 | -9223372036854775808 | -21 + 3 | 0 | 9 + 3 | 0 | 19 + 3 | 10 | 29 + 3 | 20 | 29 + 3 | 40 | 49 + 3 | 60 | 9223372036854775807 +(7 rows) + +-- should trigger two individual refreshes +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Allow at most 5 individual invalidations per refreshe +SET timescaledb.materializations_per_refresh_window=5; +-- Insert into every second bucket +INSERT INTO conditions VALUES (20, 1, 1.0); +INSERT INTO conditions VALUES (40, 1, 1.0); +INSERT INTO conditions VALUES (60, 1, 1.0); +INSERT INTO conditions VALUES (80, 1, 1.0); +INSERT INTO conditions VALUES (100, 1, 1.0); +INSERT INTO conditions VALUES (120, 1, 1.0); +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +\set VERBOSITY default +-- Test acceptable values for materializations per refresh +SET timescaledb.materializations_per_refresh_window=' 5 '; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Large value will be treated as LONG_MAX +SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +-- Test bad values for materializations per refresh +SET timescaledb.materializations_per_refresh_window='foo'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:808: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "foo". +SET timescaledb.materializations_per_refresh_window='2bar'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:811: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "2bar". +SET timescaledb.materializations_per_refresh_window='-'; +INSERT INTO conditions VALUES (140, 1, 1.0); +CALL refresh_continuous_aggregate('cond_10', 0, 200); +psql:include/cagg_invalidation_common.sql:815: WARNING: invalid value for session variable "timescaledb.materializations_per_refresh_window" +DETAIL: Expected an integer but current value is "-". +\set VERBOSITY terse +-- cleanup +\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER; +DROP DATABASE :DATA_NODE_1; +DROP DATABASE :DATA_NODE_2; +DROP DATABASE :DATA_NODE_3; diff --git a/tsl/test/expected/chunk_utils_compression.out b/tsl/test/expected/chunk_utils_compression.out index 535782c27..4f79e9607 100644 --- a/tsl/test/expected/chunk_utils_compression.out +++ b/tsl/test/expected/chunk_utils_compression.out @@ -106,6 +106,25 @@ SELECT show_chunks('public.table_to_compress', newer_than=>'1 day'::interval); ------------- (0 rows) +-- truncate one compressed chunk +SELECT chunk_schema || '.' || chunk_name as "CHNAME" +FROM timescaledb_information.chunks +WHERE hypertable_name = 'table_to_compress' and hypertable_schema = 'public' +ORDER BY chunk_name LIMIT 1 +\gset +SELECT count(*) FROM :CHNAME; + count +------- + 1 +(1 row) + +TRUNCATE TABLE :CHNAME; +SELECT count(*) FROM :CHNAME; + count +------- + 0 +(1 row) + -- drop all hypertables' old chunks SELECT drop_chunks(table_name::regclass, older_than=>'1 day'::interval) FROM _timescaledb_catalog.hypertable diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index a3e386f43..4393352c2 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -53,7 +53,6 @@ if(CMAKE_BUILD_TYPE MATCHES Debug) cagg_ddl_dist_ht.sql cagg_drop_chunks.sql cagg_dump.sql - cagg_invalidation_dist_ht.sql cagg_multi.sql continuous_aggs.sql continuous_aggs_deprecated.sql @@ -129,8 +128,14 @@ set(TEST_TEMPLATES transparent_decompression_ordered_index.sql.in) if(CMAKE_BUILD_TYPE MATCHES Debug) - list(APPEND TEST_TEMPLATES cagg_query.sql.in dist_hypertable.sql.in - remote_copy.sql.in dist_grant.sql.in) + list( + APPEND + TEST_TEMPLATES + cagg_query.sql.in + dist_hypertable.sql.in + remote_copy.sql.in + dist_grant.sql.in + cagg_invalidation_dist_ht.sql.in) endif(CMAKE_BUILD_TYPE MATCHES Debug) # Check if PostgreSQL was compiled with JIT support diff --git a/tsl/test/sql/cagg_invalidation_dist_ht.sql b/tsl/test/sql/cagg_invalidation_dist_ht.sql.in similarity index 100% rename from tsl/test/sql/cagg_invalidation_dist_ht.sql rename to tsl/test/sql/cagg_invalidation_dist_ht.sql.in diff --git a/tsl/test/sql/chunk_utils_compression.sql b/tsl/test/sql/chunk_utils_compression.sql index 66619123f..d258039de 100644 --- a/tsl/test/sql/chunk_utils_compression.sql +++ b/tsl/test/sql/chunk_utils_compression.sql @@ -37,6 +37,15 @@ SELECT show_chunks('public.uncompressed_table'); SELECT show_chunks('public.table_to_compress'); SELECT show_chunks('public.table_to_compress', older_than=>'1 day'::interval); SELECT show_chunks('public.table_to_compress', newer_than=>'1 day'::interval); +-- truncate one compressed chunk +SELECT chunk_schema || '.' || chunk_name as "CHNAME" +FROM timescaledb_information.chunks +WHERE hypertable_name = 'table_to_compress' and hypertable_schema = 'public' +ORDER BY chunk_name LIMIT 1 +\gset +SELECT count(*) FROM :CHNAME; +TRUNCATE TABLE :CHNAME; +SELECT count(*) FROM :CHNAME; -- drop all hypertables' old chunks SELECT drop_chunks(table_name::regclass, older_than=>'1 day'::interval) FROM _timescaledb_catalog.hypertable diff --git a/tsl/test/sql/include/cagg_invalidation_common.sql b/tsl/test/sql/include/cagg_invalidation_common.sql index e90595faa..77fe40e95 100644 --- a/tsl/test/sql/include/cagg_invalidation_common.sql +++ b/tsl/test/sql/include/cagg_invalidation_common.sql @@ -334,6 +334,29 @@ CALL refresh_continuous_aggregate('cond_10', NULL, NULL); SELECT * FROM cagg_invals; SELECT * FROM hyper_invals; +-- Pick the first chunk of conditions to TRUNCATE +SELECT show_chunks AS chunk_to_truncate +FROM show_chunks('conditions') +ORDER BY 1 +LIMIT 1 \gset + +-- Show the data before truncating one of the chunks +SELECT * FROM :chunk_to_truncate +ORDER BY 1; + +-- Truncate one chunk +\if :IS_DISTRIBUTED +-- There is no TRUNCATE implementation for FOREIGN tables yet +\set ON_ERROR_STOP 0 +\endif +TRUNCATE TABLE :chunk_to_truncate; +\if :IS_DISTRIBUTED +\set ON_ERROR_STOP 1 +\endif + +-- Should see new invalidation entries for conditions for the non-distributed case +SELECT * FROM hyper_invals; + -- TRUNCATE the hypertable to invalidate all its continuous aggregates TRUNCATE conditions;