From 9c6433e6edf3ba7fa4ea9207220da3e0db8e87e8 Mon Sep 17 00:00:00 2001
From: Markos Fountoulakis <markos@timescale.com>
Date: Tue, 16 Aug 2022 19:10:07 +0300
Subject: [PATCH] Handle TRUNCATE TABLE on chunks

Make truncating a uncompressed chunk drop the data for the case where
they reside in a corresponding compressed chunk.

Generate invalidations for Continuous Aggregates after TRUNCATE, so
as to have consistent refresh operations on the materialization
hypertable.

Fixes #4362
---
 CHANGELOG.md                                  |    2 +
 src/process_utility.c                         |   51 +-
 src/ts_catalog/continuous_agg.c               |   12 +
 src/ts_catalog/continuous_agg.h               |    2 +
 test/expected/chunk_utils.out                 |   14 +
 test/sql/chunk_utils.sql                      |    5 +
 tsl/test/expected/cagg_invalidation.out       |   68 +-
 ...t.out => cagg_invalidation_dist_ht-12.out} |   62 +-
 .../expected/cagg_invalidation_dist_ht-13.out | 1389 +++++++++++++++++
 .../expected/cagg_invalidation_dist_ht-14.out | 1389 +++++++++++++++++
 tsl/test/expected/chunk_utils_compression.out |   19 +
 tsl/test/sql/CMakeLists.txt                   |   11 +-
 ...t.sql => cagg_invalidation_dist_ht.sql.in} |    0
 tsl/test/sql/chunk_utils_compression.sql      |    9 +
 .../sql/include/cagg_invalidation_common.sql  |   23 +
 15 files changed, 3017 insertions(+), 39 deletions(-)
 rename tsl/test/expected/{cagg_invalidation_dist_ht.out => cagg_invalidation_dist_ht-12.out} (96%)
 create mode 100644 tsl/test/expected/cagg_invalidation_dist_ht-13.out
 create mode 100644 tsl/test/expected/cagg_invalidation_dist_ht-14.out
 rename tsl/test/sql/{cagg_invalidation_dist_ht.sql => cagg_invalidation_dist_ht.sql.in} (100%)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4f4ab5a39..3d5599e9f 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,11 +15,13 @@ accidentally triggering the load of a previous DB version.**
 * #4486 Adding boolean column with default value doesn't work on compressed table
 * #4555 Handle properly default privileges on Continuous Aggregates
 * #4575 Fix use of `get_partition_hash` and `get_partition_for_key` inside an IMMUTABLE function
+* #4416 Handle TRUNCATE TABLE on chunks
 
 **Thanks**
 @janko for reporting
 @AlmiS for reporting error on `get_partition_hash` executed inside an IMMUTABLE function
 @michaelkitson for reporting permission errors using default privileges on Continuous Aggregates
+@jayadevanm for reporting error of TRUNCATE TABLE on compressed chunk
 
 ## 2.7.2 (2022-07-26)
 
diff --git a/src/process_utility.c b/src/process_utility.c
index ace88ff38..b3489c03b 100644
--- a/src/process_utility.c
+++ b/src/process_utility.c
@@ -1001,7 +1001,7 @@ process_truncate(ProcessUtilityArgs *args)
 	List *hypertables = NIL;
 	List *relations = NIL;
 	bool list_changed = false;
-	MemoryContext parsetreectx = GetMemoryChunkContext(args->parsetree);
+	MemoryContext oldctx, parsetreectx = GetMemoryChunkContext(args->parsetree);
 
 	/* For all hypertables, we drop the now empty chunks. We also propagate the
 	 * TRUNCATE call to the compressed version of the hypertable, if it exists.
@@ -1037,7 +1037,6 @@ process_truncate(ProcessUtilityArgs *args)
 					if (cagg)
 					{
 						Hypertable *mat_ht, *raw_ht;
-						MemoryContext oldctx;
 
 						if (!relation_should_recurse(rv))
 							ereport(ERROR,
@@ -1071,13 +1070,14 @@ process_truncate(ProcessUtilityArgs *args)
 					break;
 				}
 				case RELKIND_RELATION:
+				/* TRUNCATE for foreign tables not implemented yet. This will raise an error. */
+				case RELKIND_FOREIGN_TABLE:
 				{
 					Hypertable *ht =
 						ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);
+					Chunk *chunk;
 
-					if (!ht)
-						list_append = true;
-					else
+					if (ht)
 					{
 						ContinuousAggHypertableStatus agg_status;
 
@@ -1114,6 +1114,38 @@ process_truncate(ProcessUtilityArgs *args)
 							 */
 							list_changed = true;
 					}
+					else if ((chunk = ts_chunk_get_by_relid(relid, false)) != NULL)
+					{ /* this is a chunk */
+						ht = ts_hypertable_cache_get_entry(hcache,
+														   chunk->hypertable_relid,
+														   CACHE_FLAG_NONE);
+
+						Assert(ht != NULL);
+
+						/* If the hypertable has continuous aggregates, then invalidate
+						 * the truncated region. */
+						if (ts_continuous_agg_hypertable_status(ht->fd.id) == HypertableIsRawTable)
+							ts_continuous_agg_invalidate_chunk(ht, chunk);
+						/* Truncate the compressed chunk too. */
+						if (chunk->fd.compressed_chunk_id != INVALID_CHUNK_ID)
+						{
+							Chunk *compressed_chunk =
+								ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, false);
+							if (compressed_chunk != NULL)
+							{
+								/* Create list item into the same context of the list. */
+								oldctx = MemoryContextSwitchTo(parsetreectx);
+								rv = makeRangeVar(NameStr(compressed_chunk->fd.schema_name),
+												  NameStr(compressed_chunk->fd.table_name),
+												  -1);
+								MemoryContextSwitchTo(oldctx);
+								list_changed = true;
+							}
+						}
+						list_append = true;
+					}
+					else
+						list_append = true;
 					break;
 				}
 			}
@@ -1234,14 +1266,7 @@ process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt)
 			/* If the hypertable has continuous aggregates, then invalidate
 			 * the dropped region. */
 			if (ts_continuous_agg_hypertable_status(ht->fd.id) == HypertableIsRawTable)
-			{
-				int64 start = ts_chunk_primary_dimension_start(chunk);
-				int64 end = ts_chunk_primary_dimension_end(chunk);
-
-				Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
-					   chunk->cube->slices[0]->fd.dimension_id);
-				ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end);
-			}
+				ts_continuous_agg_invalidate_chunk(ht, chunk);
 		}
 	}
 
diff --git a/src/ts_catalog/continuous_agg.c b/src/ts_catalog/continuous_agg.c
index 9a9a476ce..7c079d88c 100644
--- a/src/ts_catalog/continuous_agg.c
+++ b/src/ts_catalog/continuous_agg.c
@@ -31,6 +31,7 @@
 #include "bgw/job.h"
 #include "ts_catalog/continuous_agg.h"
 #include "cross_module_fn.h"
+#include "hypercube.h"
 #include "hypertable.h"
 #include "hypertable_cache.h"
 #include "scan_iterator.h"
@@ -1351,6 +1352,17 @@ ts_continuous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid)
 	return par_dim;
 }
 
+TSDLLEXPORT void
+ts_continuous_agg_invalidate_chunk(Hypertable *ht, Chunk *chunk)
+{
+	int64 start = ts_chunk_primary_dimension_start(chunk);
+	int64 end = ts_chunk_primary_dimension_end(chunk);
+
+	Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
+		   chunk->cube->slices[0]->fd.dimension_id);
+	ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end);
+}
+
 typedef struct Watermark
 {
 	int32 hyper_id;
diff --git a/src/ts_catalog/continuous_agg.h b/src/ts_catalog/continuous_agg.h
index 327e470f3..4ac047968 100644
--- a/src/ts_catalog/continuous_agg.h
+++ b/src/ts_catalog/continuous_agg.h
@@ -174,6 +174,8 @@ extern TSDLLEXPORT const Dimension *
 ts_continuous_agg_find_integer_now_func_by_materialization_id(int32 mat_htid);
 extern ContinuousAgg *ts_continuous_agg_find_userview_name(const char *schema, const char *name);
 
+extern TSDLLEXPORT void ts_continuous_agg_invalidate_chunk(Hypertable *ht, Chunk *chunk);
+
 extern TSDLLEXPORT bool ts_continuous_agg_bucket_width_variable(const ContinuousAgg *agg);
 extern TSDLLEXPORT int64 ts_continuous_agg_bucket_width(const ContinuousAgg *agg);
 
diff --git a/test/expected/chunk_utils.out b/test/expected/chunk_utils.out
index 2a4928d2d..0ed8625ef 100644
--- a/test/expected/chunk_utils.out
+++ b/test/expected/chunk_utils.out
@@ -283,6 +283,20 @@ SELECT * FROM _timescaledb_catalog.dimension_slice ORDER BY id;
  24 |            3 |                    6 |                   7
 (24 rows)
 
+-- Test that truncating chunks works
+SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk;
+ count 
+-------
+     1
+(1 row)
+
+TRUNCATE TABLE _timescaledb_internal._hyper_2_7_chunk;
+SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk;
+ count 
+-------
+     0
+(1 row)
+
 -- Drop one chunk "manually" and verify that dimension slices and
 -- constraints are cleaned up. Each chunk has two constraints and two
 -- dimension slices. Both constraints should be deleted, but only one
diff --git a/test/sql/chunk_utils.sql b/test/sql/chunk_utils.sql
index 598544f9e..465dc5552 100644
--- a/test/sql/chunk_utils.sql
+++ b/test/sql/chunk_utils.sql
@@ -117,6 +117,11 @@ FULL OUTER JOIN _timescaledb_catalog.dimension_slice ds ON (ds.id = cc.dimension
 ORDER BY c.id;
 SELECT * FROM _timescaledb_catalog.dimension_slice ORDER BY id;
 
+-- Test that truncating chunks works
+SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk;
+TRUNCATE TABLE _timescaledb_internal._hyper_2_7_chunk;
+SELECT count(*) FROM _timescaledb_internal._hyper_2_7_chunk;
+
 -- Drop one chunk "manually" and verify that dimension slices and
 -- constraints are cleaned up. Each chunk has two constraints and two
 -- dimension slices. Both constraints should be deleted, but only one
diff --git a/tsl/test/expected/cagg_invalidation.out b/tsl/test/expected/cagg_invalidation.out
index a5f0e365e..f64c6406d 100644
--- a/tsl/test/expected/cagg_invalidation.out
+++ b/tsl/test/expected/cagg_invalidation.out
@@ -577,6 +577,47 @@ SELECT * FROM hyper_invals;
         2 |    20 |  20
 (1 row)
 
+-- Pick the first chunk of conditions to TRUNCATE
+SELECT show_chunks AS chunk_to_truncate
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before truncating one of the chunks
+SELECT * FROM :chunk_to_truncate
+ORDER BY 1;
+ time | device | temp 
+------+--------+------
+    1 |      4 | 23.7
+    1 |      0 |   16
+    2 |      2 | 23.5
+    2 |      1 |   25
+    3 |      2 | 23.5
+    3 |      0 |   20
+    4 |      2 |   10
+    5 |      2 |   26
+    6 |      1 |   13
+    7 |      3 |   35
+    8 |      1 |   37
+    9 |      3 |    7
+(12 rows)
+
+-- Truncate one chunk
+\if :IS_DISTRIBUTED
+-- There is no TRUNCATE implementation for FOREIGN tables yet
+\set ON_ERROR_STOP 0
+\endif
+TRUNCATE TABLE :chunk_to_truncate;
+\if :IS_DISTRIBUTED
+\set ON_ERROR_STOP 1
+\endif
+-- Should see new invalidation entries for conditions for the non-distributed case
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |     0 |  10
+        2 |    20 |  20
+(2 rows)
+
 -- TRUNCATE the hypertable to invalidate all its continuous aggregates
 TRUNCATE conditions;
 -- Now empty
@@ -590,8 +631,9 @@ SELECT * FROM hyper_invals;
  hyper_id |        start         |         end         
 ----------+----------------------+---------------------
         1 | -9223372036854775808 | 9223372036854775807
+        1 |                    0 |                  10
         2 |                   20 |                  20
-(2 rows)
+(3 rows)
 
 -- Aggregates still hold data
 SELECT * FROM cond_10
@@ -673,7 +715,7 @@ ORDER BY 1,2;
 -- TRUNCATE ONLY
 \set ON_ERROR_STOP 0
 TRUNCATE ONLY cond_20;
-psql:include/cagg_invalidation_common.sql:385: ERROR:  cannot truncate only a continuous aggregate
+psql:include/cagg_invalidation_common.sql:408: ERROR:  cannot truncate only a continuous aggregate
 \set ON_ERROR_STOP 1
 TRUNCATE cond_20;
 -- Should now be empty
@@ -746,7 +788,7 @@ WHERE user_view_name = 'cond_1' \gset
 \else
 \set ON_ERROR_STOP 0
 SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0);
-psql:include/cagg_invalidation_common.sql:433: ERROR:  cannot invalidate cagg, end time should be greater than start time
+psql:include/cagg_invalidation_common.sql:456: ERROR:  cannot invalidate cagg, end time should be greater than start time
 \set ON_ERROR_STOP 1
 \endif
 -- Test invalidations with bucket size 1
@@ -923,7 +965,7 @@ CREATE table threshold_test (time int, value int);
 SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2);
 \else
 SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4);
-psql:include/cagg_invalidation_common.sql:544: NOTICE:  adding not-null constraint to column "time"
+psql:include/cagg_invalidation_common.sql:567: NOTICE:  adding not-null constraint to column "time"
       create_hypertable      
 -----------------------------
  (7,public,threshold_test,t)
@@ -959,14 +1001,14 @@ ORDER BY 1,2;
 \else
 \set ON_ERROR_STOP 0
 SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0);
-psql:include/cagg_invalidation_common.sql:569: ERROR:  cannot invalidate hypertable, end time should be greater than start time
+psql:include/cagg_invalidation_common.sql:592: ERROR:  cannot invalidate hypertable, end time should be greater than start time
 \set ON_ERROR_STOP 1
 \endif
 -- Test that threshold is initilized to min value when there's no data
 -- and we specify an infinite end. Note that the min value may differ
 -- depending on time type.
 CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-psql:include/cagg_invalidation_common.sql:576: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:599: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
 WHERE hypertable_id = :thresh_hyper_id
 ORDER BY 1,2;
@@ -992,13 +1034,13 @@ ORDER BY 1,2;
 -- Refresh where both the start and end of the window is above the
 -- max data value
 CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
-psql:include/cagg_invalidation_common.sql:596: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:619: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 SELECT watermark AS thresh_hyper_id_watermark
 FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
 WHERE hypertable_id = :thresh_hyper_id \gset
 -- Refresh where we start from the current watermark to infinity
 CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
-psql:include/cagg_invalidation_common.sql:603: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:626: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 -- Now refresh with max end of the window to test that the
 -- invalidation threshold is capped at the last bucket of data
 CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
@@ -1200,7 +1242,7 @@ INSERT INTO conditions VALUES(3, 1, 1.0);
 INSERT INTO conditions VALUES(4, 1, 1.0);
 INSERT INTO conditions VALUES(6, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_1', 10, NULL);
-psql:include/cagg_invalidation_common.sql:725: NOTICE:  continuous aggregate "cond_1" is already up-to-date
+psql:include/cagg_invalidation_common.sql:748: NOTICE:  continuous aggregate "cond_1" is already up-to-date
 SELECT * FROM cagg_invals
 WHERE cagg_id = :cond_1_id;
  cagg_id | start |         end         
@@ -1226,7 +1268,7 @@ INSERT INTO conditions VALUES (40, 1, 1.0);
 -- Refresh to process invalidations, but outside the range of
 -- invalidations we inserted so that we don't clear them.
 CALL refresh_continuous_aggregate('cond_10', 50, 60);
-psql:include/cagg_invalidation_common.sql:746: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+psql:include/cagg_invalidation_common.sql:769: NOTICE:  continuous aggregate "cond_10" is already up-to-date
 SELECT mat_hypertable_id AS cond_10_id
 FROM _timescaledb_catalog.continuous_agg
 WHERE user_view_name = 'cond_10' \gset
@@ -1266,16 +1308,16 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200);
 SET timescaledb.materializations_per_refresh_window='foo';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:785: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:808: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "foo".
 SET timescaledb.materializations_per_refresh_window='2bar';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:788: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:811: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "2bar".
 SET timescaledb.materializations_per_refresh_window='-';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:792: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:815: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "-".
 \set VERBOSITY terse
diff --git a/tsl/test/expected/cagg_invalidation_dist_ht.out b/tsl/test/expected/cagg_invalidation_dist_ht-12.out
similarity index 96%
rename from tsl/test/expected/cagg_invalidation_dist_ht.out
rename to tsl/test/expected/cagg_invalidation_dist_ht-12.out
index 041739982..e15700ddf 100644
--- a/tsl/test/expected/cagg_invalidation_dist_ht.out
+++ b/tsl/test/expected/cagg_invalidation_dist_ht-12.out
@@ -635,6 +635,48 @@ SELECT * FROM hyper_invals;
         2 |    30 |  80
 (2 rows)
 
+-- Pick the first chunk of conditions to TRUNCATE
+SELECT show_chunks AS chunk_to_truncate
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before truncating one of the chunks
+SELECT * FROM :chunk_to_truncate
+ORDER BY 1;
+ time | device | temp 
+------+--------+------
+    1 |      4 | 23.7
+    1 |      0 |   16
+    2 |      2 | 23.5
+    2 |      1 |   25
+    3 |      2 | 23.5
+    3 |      0 |   20
+    4 |      2 |   10
+    5 |      2 |   26
+    6 |      1 |   13
+    7 |      3 |   35
+    8 |      1 |   37
+    9 |      3 |    7
+(12 rows)
+
+-- Truncate one chunk
+\if :IS_DISTRIBUTED
+-- There is no TRUNCATE implementation for FOREIGN tables yet
+\set ON_ERROR_STOP 0
+\endif
+TRUNCATE TABLE :chunk_to_truncate;
+psql:include/cagg_invalidation_common.sql:352: ERROR:  "_dist_hyper_1_1_chunk" is not a table
+\if :IS_DISTRIBUTED
+\set ON_ERROR_STOP 1
+\endif
+-- Should see new invalidation entries for conditions for the non-distributed case
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
 -- TRUNCATE the hypertable to invalidate all its continuous aggregates
 TRUNCATE conditions;
 -- Now empty
@@ -732,7 +774,7 @@ ORDER BY 1,2;
 -- TRUNCATE ONLY
 \set ON_ERROR_STOP 0
 TRUNCATE ONLY cond_20;
-psql:include/cagg_invalidation_common.sql:385: ERROR:  cannot truncate only a continuous aggregate
+psql:include/cagg_invalidation_common.sql:408: ERROR:  cannot truncate only a continuous aggregate
 \set ON_ERROR_STOP 1
 TRUNCATE cond_20;
 -- Should now be empty
@@ -980,7 +1022,7 @@ ORDER BY 1,2;
 CREATE table threshold_test (time int, value int);
 \if :IS_DISTRIBUTED
 SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2);
-psql:include/cagg_invalidation_common.sql:542: NOTICE:  adding not-null constraint to column "time"
+psql:include/cagg_invalidation_common.sql:565: NOTICE:  adding not-null constraint to column "time"
  create_distributed_hypertable 
 -------------------------------
  (7,public,threshold_test,t)
@@ -1024,7 +1066,7 @@ SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id,
 -- and we specify an infinite end. Note that the min value may differ
 -- depending on time type.
 CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
-psql:include/cagg_invalidation_common.sql:576: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:599: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
 WHERE hypertable_id = :thresh_hyper_id
 ORDER BY 1,2;
@@ -1050,13 +1092,13 @@ ORDER BY 1,2;
 -- Refresh where both the start and end of the window is above the
 -- max data value
 CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
-psql:include/cagg_invalidation_common.sql:596: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:619: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 SELECT watermark AS thresh_hyper_id_watermark
 FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
 WHERE hypertable_id = :thresh_hyper_id \gset
 -- Refresh where we start from the current watermark to infinity
 CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
-psql:include/cagg_invalidation_common.sql:603: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+psql:include/cagg_invalidation_common.sql:626: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
 -- Now refresh with max end of the window to test that the
 -- invalidation threshold is capped at the last bucket of data
 CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
@@ -1258,7 +1300,7 @@ INSERT INTO conditions VALUES(3, 1, 1.0);
 INSERT INTO conditions VALUES(4, 1, 1.0);
 INSERT INTO conditions VALUES(6, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_1', 10, NULL);
-psql:include/cagg_invalidation_common.sql:725: NOTICE:  continuous aggregate "cond_1" is already up-to-date
+psql:include/cagg_invalidation_common.sql:748: NOTICE:  continuous aggregate "cond_1" is already up-to-date
 SELECT * FROM cagg_invals
 WHERE cagg_id = :cond_1_id;
  cagg_id | start |         end         
@@ -1284,7 +1326,7 @@ INSERT INTO conditions VALUES (40, 1, 1.0);
 -- Refresh to process invalidations, but outside the range of
 -- invalidations we inserted so that we don't clear them.
 CALL refresh_continuous_aggregate('cond_10', 50, 60);
-psql:include/cagg_invalidation_common.sql:746: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+psql:include/cagg_invalidation_common.sql:769: NOTICE:  continuous aggregate "cond_10" is already up-to-date
 SELECT mat_hypertable_id AS cond_10_id
 FROM _timescaledb_catalog.continuous_agg
 WHERE user_view_name = 'cond_10' \gset
@@ -1327,17 +1369,17 @@ CALL refresh_continuous_aggregate('cond_10', 0, 200);
 SET timescaledb.materializations_per_refresh_window='foo';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:785: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:808: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "foo".
 SET timescaledb.materializations_per_refresh_window='2bar';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:788: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:811: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "2bar".
 SET timescaledb.materializations_per_refresh_window='-';
 INSERT INTO conditions VALUES (140, 1, 1.0);
 CALL refresh_continuous_aggregate('cond_10', 0, 200);
-psql:include/cagg_invalidation_common.sql:792: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+psql:include/cagg_invalidation_common.sql:815: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
 DETAIL:  Expected an integer but current value is "-".
 \set VERBOSITY terse
 -- cleanup
diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-13.out b/tsl/test/expected/cagg_invalidation_dist_ht-13.out
new file mode 100644
index 000000000..e15700ddf
--- /dev/null
+++ b/tsl/test/expected/cagg_invalidation_dist_ht-13.out
@@ -0,0 +1,1389 @@
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+------------------------------------
+-- Set up a distributed environment
+------------------------------------
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
+\set DATA_NODE_1 :TEST_DBNAME _1
+\set DATA_NODE_2 :TEST_DBNAME _2
+\set DATA_NODE_3 :TEST_DBNAME _3
+\ir include/remote_exec.sql
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+CREATE SCHEMA IF NOT EXISTS test;
+psql:include/remote_exec.sql:5: NOTICE:  schema "test" already exists, skipping
+GRANT USAGE ON SCHEMA test TO PUBLIC;
+CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text)
+RETURNS VOID
+AS :TSL_MODULE_PATHNAME, 'ts_remote_exec'
+LANGUAGE C;
+CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text)
+RETURNS TABLE("table_record" CSTRING[])
+AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings'
+LANGUAGE C;
+SELECT (add_data_node (name, host => 'localhost', DATABASE => name)).*
+FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v (name);
+           node_name            |   host    | port  |            database            | node_created | database_created | extension_created 
+--------------------------------+-----------+-------+--------------------------------+--------------+------------------+-------------------
+ db_cagg_invalidation_dist_ht_1 | localhost | 55432 | db_cagg_invalidation_dist_ht_1 | t            | t                | t
+ db_cagg_invalidation_dist_ht_2 | localhost | 55432 | db_cagg_invalidation_dist_ht_2 | t            | t                | t
+ db_cagg_invalidation_dist_ht_3 | localhost | 55432 | db_cagg_invalidation_dist_ht_3 | t            | t                | t
+(3 rows)
+
+GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC;
+\set IS_DISTRIBUTED TRUE
+\ir include/cagg_invalidation_common.sql
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+-- Disable background workers since we are testing manual refresh
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
+SELECT _timescaledb_internal.stop_background_workers();
+ stop_background_workers 
+-------------------------
+ t
+(1 row)
+
+SET ROLE :ROLE_DEFAULT_PERM_USER;
+SET datestyle TO 'ISO, YMD';
+SET timezone TO 'UTC';
+CREATE TABLE conditions (time bigint NOT NULL, device int, temp float);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('conditions', 'time', chunk_time_interval => 10, replication_factor => 2);
+ create_distributed_hypertable 
+-------------------------------
+ (1,public,conditions,t)
+(1 row)
+
+\else
+SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10);
+\endif
+CREATE TABLE measurements (time int NOT NULL, device int, temp float);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('measurements', 'time', chunk_time_interval => 10, replication_factor => 2);
+ create_distributed_hypertable 
+-------------------------------
+ (2,public,measurements,t)
+(1 row)
+
+\else
+SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10);
+\endif
+CREATE OR REPLACE FUNCTION bigint_now()
+RETURNS bigint LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM conditions
+$$;
+\if :IS_DISTRIBUTED
+CALL distributed_exec($DIST$
+CREATE OR REPLACE FUNCTION bigint_now()
+RETURNS bigint LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM conditions
+$$;
+$DIST$);
+\endif
+CREATE OR REPLACE FUNCTION int_now()
+RETURNS int LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM measurements
+$$;
+\if :IS_DISTRIBUTED
+CALL distributed_exec($DIST$
+CREATE OR REPLACE FUNCTION int_now()
+RETURNS int LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM measurements
+$$;
+$DIST$);
+\endif
+SELECT set_integer_now_func('conditions', 'bigint_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+SELECT set_integer_now_func('measurements', 'int_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+INSERT INTO conditions
+SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int,
+       abs(timestamp_hash(to_timestamp(t)::timestamp))%40
+FROM generate_series(1, 100, 1) t;
+CREATE TABLE temp AS
+SELECT * FROM conditions;
+INSERT INTO measurements
+SELECT * FROM temp;
+-- Show the most recent data
+SELECT * FROM conditions
+ORDER BY time DESC, device
+LIMIT 10;
+ time | device | temp 
+------+--------+------
+  100 |      0 |    8
+   99 |      1 |    5
+   98 |      2 |   26
+   97 |      2 |   10
+   96 |      2 |   34
+   95 |      2 |   30
+   94 |      3 |   31
+   93 |      0 |    4
+   92 |      0 |   32
+   91 |      3 |   15
+(10 rows)
+
+-- Create two continuous aggregates on the same hypertable to test
+-- that invalidations are handled correctly across both of them.
+CREATE MATERIALIZED VIEW cond_10
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '10', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+CREATE MATERIALIZED VIEW cond_20
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+CREATE MATERIALIZED VIEW measure_10
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(10, time) AS bucket, device, avg(temp) AS avg_temp
+FROM measurements
+GROUP BY 1,2 WITH NO DATA;
+-- There should be three continuous aggregates, two on one hypertable
+-- and one on the other:
+SELECT mat_hypertable_id, raw_hypertable_id, user_view_name
+FROM _timescaledb_catalog.continuous_agg;
+ mat_hypertable_id | raw_hypertable_id | user_view_name 
+-------------------+-------------------+----------------
+                 3 |                 1 | cond_10
+                 4 |                 1 | cond_20
+                 5 |                 2 | measure_10
+(3 rows)
+
+-- The continuous aggregates should be empty
+SELECT * FROM cond_10
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM measure_10
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+\if :IS_DISTRIBUTED
+CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE(
+      "hyper_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$
+      SELECT hypertable_id,
+            lowest_modified_value,
+            greatest_modified_value
+            FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
+$DIST$)
+ORDER BY 1,2,3
+$$;
+CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE(
+      "cagg_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$
+      SELECT materialization_id AS cagg_id,
+            lowest_modified_value AS start,
+            greatest_modified_value AS end
+            FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
+$DIST$)
+ORDER BY 1,2,3
+$$;
+\else
+CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS  TABLE (
+      "hyper_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT hypertable_id,
+       lowest_modified_value,
+       greatest_modified_value
+       FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
+       ORDER BY 1,2,3
+$$;
+CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE (
+      "cagg_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT materialization_id,
+       lowest_modified_value,
+       greatest_modified_value
+       FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
+       ORDER BY 1,2,3
+$$;
+\endif
+CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals();
+CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals();
+-- Must refresh to move the invalidation threshold, or no
+-- invalidations will be generated. Initially, there is no threshold
+-- set:
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+(0 rows)
+
+-- There should be only "infinite" invalidations in the cagg
+-- invalidation log:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(3 rows)
+
+-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly:
+CALL refresh_continuous_aggregate('cond_10', 1, 50);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+(1 row)
+
+-- Invalidations should be cleared inside the refresh window:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                   9
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refresh up to 50 from the beginning
+CALL refresh_continuous_aggregate('cond_10', 0, 50);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refreshing below the threshold does not move it:
+CALL refresh_continuous_aggregate('cond_10', 20, 49);
+psql:include/cagg_invalidation_common.sql:207: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+(1 row)
+
+-- Nothing changes with invalidations either since the region was
+-- already refreshed and no new invalidations have been generated:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refreshing measure_10 moves the threshold only for the other hypertable:
+CALL refresh_continuous_aggregate('measure_10', 0, 30);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+             2 |        30
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(5 rows)
+
+-- Refresh on the second continuous aggregate, cond_20, on the first
+-- hypertable moves the same threshold as when refreshing cond_10:
+CALL refresh_continuous_aggregate('cond_20', 60, 100);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |       100
+             2 |        30
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- There should be no hypertable invalidations initially:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+(0 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- Create invalidations across different ranges. Some of these should
+-- be deleted and others cut in different ways when a refresh is
+-- run. Note that the refresh window is inclusive in the start of the
+-- window but exclusive at the end.
+-- Entries that should be left unmodified:
+INSERT INTO conditions VALUES (10, 4, 23.7);
+INSERT INTO conditions VALUES (10, 5, 23.8), (19, 3, 23.6);
+INSERT INTO conditions VALUES (60, 3, 23.7), (70, 4, 23.7);
+-- Should see some invaliations in the hypertable invalidation log:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |    10 |  10
+        1 |    10 |  19
+        1 |    60 |  60
+        1 |    60 |  70
+        1 |    70 |  70
+(5 rows)
+
+-- Generate some invalidations for the other hypertable
+INSERT INTO measurements VALUES (20, 4, 23.7);
+INSERT INTO measurements VALUES (30, 5, 23.8), (80, 3, 23.6);
+-- Should now see invalidations for both hypertables
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |    10 |  10
+        1 |    10 |  19
+        1 |    60 |  60
+        1 |    60 |  70
+        1 |    70 |  70
+        2 |    20 |  20
+        2 |    30 |  80
+(7 rows)
+
+-- First refresh a window where we don't have any invalidations. This
+-- allows us to see only the copying of the invalidations to the per
+-- cagg log without additional processing.
+CALL refresh_continuous_aggregate('cond_10', 20, 60);
+-- Invalidation threshold remains at 100:
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |       100
+             2 |        30
+(2 rows)
+
+-- Invalidations should be moved from the hypertable invalidation log
+-- to the continuous aggregate log, but only for the hypertable that
+-- the refreshed aggregate belongs to:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Now add more invalidations to test a refresh that overlaps with them.
+-- Entries that should be deleted:
+INSERT INTO conditions VALUES (30, 1, 23.4), (59, 1, 23.4);
+INSERT INTO conditions VALUES (20, 1, 23.4), (30, 1, 23.4);
+-- Entries that should be cut to the right, leaving an invalidation to
+-- the left of the refresh window:
+INSERT INTO conditions VALUES (1, 4, 23.7), (25, 1, 23.4);
+INSERT INTO conditions VALUES (19, 4, 23.7), (59, 1, 23.4);
+-- Entries that should be cut to the left and right, leaving two
+-- invalidation entries on each side of the refresh window:
+INSERT INTO conditions VALUES (2, 2, 23.5), (60, 1, 23.4);
+INSERT INTO conditions VALUES (3, 2, 23.5), (80, 1, 23.4);
+-- Entries that should be cut to the left, leaving an invalidation to
+-- the right of the refresh window:
+INSERT INTO conditions VALUES (60, 3, 23.6), (90, 3, 23.6);
+INSERT INTO conditions VALUES (20, 5, 23.8), (100, 3, 23.6);
+-- New invalidations in the hypertable invalidation log:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |     1 |   1
+        1 |     1 |  25
+        1 |     2 |  60
+        1 |     3 |   3
+        1 |     3 |  80
+        1 |    19 |  19
+        1 |    19 |  59
+        1 |    20 |  20
+        1 |    20 |  30
+        1 |    20 | 100
+        1 |    25 |  25
+        1 |    30 |  30
+        1 |    30 |  59
+        1 |    59 |  59
+        1 |    60 |  90
+        1 |    80 |  80
+        1 |   100 | 100
+        2 |    20 |  20
+        2 |    30 |  80
+(19 rows)
+
+-- But nothing has yet changed in the cagg invalidation log:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Refresh to process invalidations for daily temperature:
+CALL refresh_continuous_aggregate('cond_10', 20, 60);
+-- Invalidations should be moved from the hypertable invalidation log
+-- to the continuous aggregate log.
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- Only the cond_10 cagg should have its entries cut:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 | -9223372036854775808 |                  19
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                    0 |                  99
+       4 |                    0 |                 119
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(12 rows)
+
+-- Refresh also cond_20:
+CALL refresh_continuous_aggregate('cond_20', 20, 60);
+-- The cond_20 cagg should also have its entries cut:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 | -9223372036854775808 |                  19
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(8 rows)
+
+-- Refresh cond_10 to completely remove an invalidation:
+CALL refresh_continuous_aggregate('cond_10', 0, 20);
+-- The 1-19 invalidation should be deleted:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- Clear everything between 0 and 100 to make way for new
+-- invalidations
+CALL refresh_continuous_aggregate('cond_10', 0, 100);
+-- Test refreshing with non-overlapping invalidations
+INSERT INTO conditions VALUES (20, 1, 23.4), (25, 1, 23.4);
+INSERT INTO conditions VALUES (30, 1, 23.4), (46, 1, 23.4);
+CALL refresh_continuous_aggregate('cond_10', 1, 40);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   40 |                  49
+       3 |                  100 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   20 |                  39
+       4 |                   20 |                  59
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Refresh whithout cutting (in area where there are no
+-- invalidations). Merging of overlapping entries should still happen:
+INSERT INTO conditions VALUES (15, 1, 23.4), (42, 1, 23.4);
+CALL refresh_continuous_aggregate('cond_10', 90, 100);
+psql:include/cagg_invalidation_common.sql:327: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  49
+       3 |                  100 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                    0 |                  59
+       4 |                   20 |                  39
+       4 |                   20 |                  59
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(10 rows)
+
+-- Test max refresh window
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end          
+---------+----------------------+----------------------
+       3 | -9223372036854775808 | -9223372036854775801
+       3 |                  110 |  9223372036854775807
+       4 | -9223372036854775808 |                   19
+       4 |                    0 |                   59
+       4 |                   20 |                   39
+       4 |                   20 |                   59
+       4 |                   60 |  9223372036854775807
+       5 | -9223372036854775808 |                   -1
+       5 |                   30 |  9223372036854775807
+(9 rows)
+
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- Pick the first chunk of conditions to TRUNCATE
+SELECT show_chunks AS chunk_to_truncate
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before truncating one of the chunks
+SELECT * FROM :chunk_to_truncate
+ORDER BY 1;
+ time | device | temp 
+------+--------+------
+    1 |      4 | 23.7
+    1 |      0 |   16
+    2 |      2 | 23.5
+    2 |      1 |   25
+    3 |      2 | 23.5
+    3 |      0 |   20
+    4 |      2 |   10
+    5 |      2 |   26
+    6 |      1 |   13
+    7 |      3 |   35
+    8 |      1 |   37
+    9 |      3 |    7
+(12 rows)
+
+-- Truncate one chunk
+\if :IS_DISTRIBUTED
+-- There is no TRUNCATE implementation for FOREIGN tables yet
+\set ON_ERROR_STOP 0
+\endif
+TRUNCATE TABLE :chunk_to_truncate;
+psql:include/cagg_invalidation_common.sql:352: ERROR:  "_dist_hyper_1_1_chunk" is not a table
+\if :IS_DISTRIBUTED
+\set ON_ERROR_STOP 1
+\endif
+-- Should see new invalidation entries for conditions for the non-distributed case
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- TRUNCATE the hypertable to invalidate all its continuous aggregates
+TRUNCATE conditions;
+-- Now empty
+SELECT * FROM conditions;
+ time | device | temp 
+------+--------+------
+(0 rows)
+
+-- Should see an infinite invalidation entry for conditions
+SELECT * FROM hyper_invals;
+ hyper_id |        start         |         end         
+----------+----------------------+---------------------
+        1 | -9223372036854775808 | 9223372036854775807
+        2 |                   20 |                  20
+        2 |                   30 |                  80
+(3 rows)
+
+-- Aggregates still hold data
+SELECT * FROM cond_10
+ORDER BY 1,2
+LIMIT 5;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      0 |       18
+      0 |      1 |       25
+      0 |      2 |    20.75
+      0 |      3 |       21
+      0 |      4 |     23.7
+(5 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2
+LIMIT 5;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+     20 |      0 | 18.2857142857143
+     20 |      1 | 23.5142857142857
+     20 |      2 |               26
+     20 |      3 |               23
+     20 |      5 |             23.8
+(5 rows)
+
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- Both should now be empty after refresh
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Insert new data again and refresh
+INSERT INTO conditions VALUES
+       (1, 1, 23.4), (4, 3, 14.3), (5, 1, 13.6),
+       (6, 2, 17.9), (12, 1, 18.3), (19, 3, 28.2),
+       (10, 3, 22.3), (11, 2, 34.9), (15, 2, 45.6),
+       (21, 1, 15.3), (22, 2, 12.3), (29, 3, 16.3);
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- Should now hold data again
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |     18.5
+      0 |      2 |     17.9
+      0 |      3 |     14.3
+     10 |      1 |     18.3
+     10 |      2 |    40.25
+     10 |      3 |    25.25
+     20 |      1 |     15.3
+     20 |      2 |     12.3
+     20 |      3 |     16.3
+(9 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+      0 |      1 | 18.4333333333333
+      0 |      2 |             32.8
+      0 |      3 |             21.6
+     20 |      1 |             15.3
+     20 |      2 |             12.3
+     20 |      3 |             16.3
+(6 rows)
+
+-- Truncate one of the aggregates, but first test that we block
+-- TRUNCATE ONLY
+\set ON_ERROR_STOP 0
+TRUNCATE ONLY cond_20;
+psql:include/cagg_invalidation_common.sql:408: ERROR:  cannot truncate only a continuous aggregate
+\set ON_ERROR_STOP 1
+TRUNCATE cond_20;
+-- Should now be empty
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Other aggregate is not affected
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |     18.5
+      0 |      2 |     17.9
+      0 |      3 |     14.3
+     10 |      1 |     18.3
+     10 |      2 |    40.25
+     10 |      3 |    25.25
+     20 |      1 |     15.3
+     20 |      2 |     12.3
+     20 |      3 |     16.3
+(9 rows)
+
+-- Refresh again to bring data back
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- The aggregate should be populated again
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+      0 |      1 | 18.4333333333333
+      0 |      2 |             32.8
+      0 |      3 |             21.6
+     20 |      1 |             15.3
+     20 |      2 |             12.3
+     20 |      3 |             16.3
+(6 rows)
+
+-------------------------------------------------------
+-- Test corner cases against a minimal bucket aggregate
+-------------------------------------------------------
+-- First, clear the table and aggregate
+TRUNCATE conditions;
+SELECT * FROM conditions;
+ time | device | temp 
+------+--------+------
+(0 rows)
+
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+CREATE MATERIALIZED VIEW cond_1
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '1', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+SELECT mat_hypertable_id AS cond_1_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'cond_1' \gset
+-- Test manual invalidation error
+\if :IS_DISTRIBUTED
+\else
+\set ON_ERROR_STOP 0
+SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0);
+\set ON_ERROR_STOP 1
+\endif
+-- Test invalidations with bucket size 1
+INSERT INTO conditions VALUES (0, 1, 1.0);
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |     0 |   0
+        2 |    20 |  20
+        2 |    30 |  80
+(3 rows)
+
+-- Refreshing around the bucket should not update the aggregate
+CALL refresh_continuous_aggregate('cond_1', -1, 0);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Refresh only the invalidated bucket
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       6 | -9223372036854775808 |                  -2
+       6 |                    2 | 9223372036854775807
+(2 rows)
+
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+(1 row)
+
+-- Refresh 1 extra bucket on the left
+INSERT INTO conditions VALUES (0, 1, 2.0);
+CALL refresh_continuous_aggregate('cond_1', -1, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |      1.5
+(1 row)
+
+-- Refresh 1 extra bucket on the right
+INSERT INTO conditions VALUES (0, 1, 3.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        2
+(1 row)
+
+-- Refresh 1 extra bucket on each side
+INSERT INTO conditions VALUES (0, 1, 4.0);
+CALL refresh_continuous_aggregate('cond_1', -1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |      2.5
+(1 row)
+
+-- Clear to reset aggregate
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+-- Test invalidation of size 2
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0);
+-- Refresh one bucket at a time
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+(1 row)
+
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+-- Repeat the same thing but refresh the whole invalidation at once
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+-- Test invalidation of size 3
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0);
+-- Invalidation extends beyond the refresh window on both ends
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      1 |      1 |        2
+(1 row)
+
+-- Should leave one invalidation on each side of the refresh window
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id | start |         end         
+---------+-------+---------------------
+       6 |     0 |                   0
+       6 |     2 |                   2
+       6 |   110 | 9223372036854775807
+(3 rows)
+
+-- Refresh the two remaining invalidations
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+CALL refresh_continuous_aggregate('cond_1', 2, 3);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+-- Clear and repeat but instead refresh the whole range in one go. The
+-- result should be the same as the three partial refreshes. Use
+-- DELETE instead of TRUNCATE to clear this time.
+DELETE FROM conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 3);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+----------------------------------------------
+-- Test that invalidation threshold is capped
+----------------------------------------------
+CREATE table threshold_test (time int, value int);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2);
+psql:include/cagg_invalidation_common.sql:565: NOTICE:  adding not-null constraint to column "time"
+ create_distributed_hypertable 
+-------------------------------
+ (7,public,threshold_test,t)
+(1 row)
+
+\else
+SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4);
+\endif
+SELECT set_integer_now_func('threshold_test', 'int_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+CREATE MATERIALIZED VIEW thresh_2
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(2, time) AS bucket, max(value) AS max
+FROM threshold_test
+GROUP BY 1 WITH NO DATA;
+SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'thresh_2' \gset
+-- There's no invalidation threshold initially
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+(0 rows)
+
+-- Test manual invalidation error
+\if :IS_DISTRIBUTED
+\else
+\set ON_ERROR_STOP 0
+SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0);
+\set ON_ERROR_STOP 1
+\endif
+-- Test that threshold is initilized to min value when there's no data
+-- and we specify an infinite end. Note that the min value may differ
+-- depending on time type.
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+psql:include/cagg_invalidation_common.sql:599: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id |  watermark  
+---------------+-------------
+             7 | -2147483648
+(1 row)
+
+INSERT INTO threshold_test
+SELECT v, v FROM generate_series(1, 10) v;
+CALL refresh_continuous_aggregate('thresh_2', 0, 5);
+-- Threshold should move to end of the last refreshed bucket, which is
+-- the last bucket fully included in the window, i.e., the window
+-- shrinks to end of previous bucket.
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |         4
+(1 row)
+
+-- Refresh where both the start and end of the window is above the
+-- max data value
+CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
+psql:include/cagg_invalidation_common.sql:619: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+SELECT watermark AS thresh_hyper_id_watermark
+FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id \gset
+-- Refresh where we start from the current watermark to infinity
+CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
+psql:include/cagg_invalidation_common.sql:626: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+-- Now refresh with max end of the window to test that the
+-- invalidation threshold is capped at the last bucket of data
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        12
+(1 row)
+
+-- Should not have processed invalidations beyond the invalidation
+-- threshold.
+SELECT * FROM cagg_invals
+WHERE cagg_id = :thresh_cagg_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       8 | -9223372036854775808 |                  -1
+       8 |                   12 | 9223372036854775807
+(2 rows)
+
+-- Check that things are properly materialized
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  10
+(6 rows)
+
+-- Delete the last data
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('threshold_test')
+ORDER BY 1 DESC
+LIMIT 1 \gset
+DELETE FROM threshold_test
+WHERE time > 6;
+-- The last data in the hypertable is gone
+SELECT time_bucket(2, time) AS bucket, max(value) AS max
+FROM threshold_test
+GROUP BY 1
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   6
+(4 rows)
+
+-- The aggregate still holds data
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  10
+(6 rows)
+
+-- Refresh the aggregate to bring it up-to-date
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+-- Data also gone from the aggregate
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   6
+(4 rows)
+
+-- The invalidation threshold remains the same
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        12
+(1 row)
+
+-- Insert new data beyond the invalidation threshold to move it
+-- forward
+INSERT INTO threshold_test
+SELECT v, v FROM generate_series(7, 15) v;
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+-- Aggregate now updated to reflect newly aggregated data
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  11
+     12 |  13
+     14 |  15
+(8 rows)
+
+-- The invalidation threshold should have moved forward to the end of
+-- the new data
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        16
+(1 row)
+
+-- The aggregate remains invalid beyond the invalidation threshold
+SELECT * FROM cagg_invals
+WHERE cagg_id = :thresh_cagg_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       8 | -9223372036854775808 |                  -1
+       8 |                   16 | 9223372036854775807
+(2 rows)
+
+----------------------------------------------------------------------
+-- Test that dropping a chunk invalidates the dropped region. First
+-- create another chunk so that we have two chunks. One of the chunks
+-- will be dropped.
+---------------------------------------------------------------------
+INSERT INTO conditions VALUES (10, 1, 10.0);
+-- Chunks currently associated with the hypertable
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('conditions');
+                chunk_to_drop                 
+----------------------------------------------
+ _timescaledb_internal._dist_hyper_1_34_chunk
+ _timescaledb_internal._dist_hyper_1_40_chunk
+(2 rows)
+
+-- Pick the first one to drop
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before dropping one of the chunks
+SELECT * FROM conditions
+ORDER BY 1,2;
+ time | device | temp 
+------+--------+------
+    0 |      1 |    1
+    1 |      1 |    2
+    2 |      1 |    3
+   10 |      1 |   10
+(4 rows)
+
+-- Drop one chunk
+\if :IS_DISTRIBUTED
+CALL distributed_exec(format('DROP TABLE IF EXISTS %s', :'chunk_to_drop'));
+DROP FOREIGN TABLE :chunk_to_drop;
+\else
+DROP TABLE :chunk_to_drop;
+\endif
+-- The chunk's data no longer exists in the hypertable
+SELECT * FROM conditions
+ORDER BY 1,2;
+ time | device | temp 
+------+--------+------
+   10 |      1 |   10
+(1 row)
+
+-- Aggregate still remains in continuous aggregate, however
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+-- Refresh the continuous aggregate to make the dropped data be
+-- reflected in the aggregate
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+-- Aggregate now up-to-date with the source hypertable
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+     10 |      1 |       10
+(1 row)
+
+-- Test that adjacent invalidations are merged
+INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0);
+INSERT INTO conditions VALUES(3, 1, 1.0);
+INSERT INTO conditions VALUES(4, 1, 1.0);
+INSERT INTO conditions VALUES(6, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_1', 10, NULL);
+psql:include/cagg_invalidation_common.sql:748: NOTICE:  continuous aggregate "cond_1" is already up-to-date
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id | start |         end         
+---------+-------+---------------------
+       6 |     1 |                   4
+       6 |     6 |                   6
+       6 |   110 | 9223372036854775807
+(3 rows)
+
+---------------------------------------------------------------------
+-- Test that single timestamp invalidations are expanded to buckets,
+-- and adjacent buckets merged. This merging cannot cross Data-Node
+-- chunk boundaries for the distributed hypertable case.
+---------------------------------------------------------------------
+-- First clear invalidations in a range:
+CALL refresh_continuous_aggregate('cond_10', -20, 60);
+-- The following three should be merged to one range 0-29
+INSERT INTO conditions VALUES (5, 1, 1.0);
+INSERT INTO conditions VALUES (15, 1, 1.0);
+INSERT INTO conditions VALUES (25, 1, 1.0);
+-- The last one should not merge with the others
+INSERT INTO conditions VALUES (40, 1, 1.0);
+-- Refresh to process invalidations, but outside the range of
+-- invalidations we inserted so that we don't clear them.
+CALL refresh_continuous_aggregate('cond_10', 50, 60);
+psql:include/cagg_invalidation_common.sql:769: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT mat_hypertable_id AS cond_10_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'cond_10' \gset
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_10_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                 -21
+       3 |                    0 |                   9
+       3 |                    0 |                  19
+       3 |                   10 |                  29
+       3 |                   20 |                  29
+       3 |                   40 |                  49
+       3 |                   60 | 9223372036854775807
+(7 rows)
+
+-- should trigger two individual refreshes
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Allow at most 5 individual invalidations per refreshe
+SET timescaledb.materializations_per_refresh_window=5;
+-- Insert into every second bucket
+INSERT INTO conditions VALUES (20, 1, 1.0);
+INSERT INTO conditions VALUES (40, 1, 1.0);
+INSERT INTO conditions VALUES (60, 1, 1.0);
+INSERT INTO conditions VALUES (80, 1, 1.0);
+INSERT INTO conditions VALUES (100, 1, 1.0);
+INSERT INTO conditions VALUES (120, 1, 1.0);
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+\set VERBOSITY default
+-- Test acceptable values for materializations per refresh
+SET timescaledb.materializations_per_refresh_window=' 5 ';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Large value will be treated as LONG_MAX
+SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Test bad values for materializations per refresh
+SET timescaledb.materializations_per_refresh_window='foo';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:808: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "foo".
+SET timescaledb.materializations_per_refresh_window='2bar';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:811: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "2bar".
+SET timescaledb.materializations_per_refresh_window='-';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:815: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "-".
+\set VERBOSITY terse
+-- cleanup
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
+DROP DATABASE :DATA_NODE_1;
+DROP DATABASE :DATA_NODE_2;
+DROP DATABASE :DATA_NODE_3;
diff --git a/tsl/test/expected/cagg_invalidation_dist_ht-14.out b/tsl/test/expected/cagg_invalidation_dist_ht-14.out
new file mode 100644
index 000000000..7be17390e
--- /dev/null
+++ b/tsl/test/expected/cagg_invalidation_dist_ht-14.out
@@ -0,0 +1,1389 @@
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+------------------------------------
+-- Set up a distributed environment
+------------------------------------
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
+\set DATA_NODE_1 :TEST_DBNAME _1
+\set DATA_NODE_2 :TEST_DBNAME _2
+\set DATA_NODE_3 :TEST_DBNAME _3
+\ir include/remote_exec.sql
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+CREATE SCHEMA IF NOT EXISTS test;
+psql:include/remote_exec.sql:5: NOTICE:  schema "test" already exists, skipping
+GRANT USAGE ON SCHEMA test TO PUBLIC;
+CREATE OR REPLACE FUNCTION test.remote_exec(srv_name name[], command text)
+RETURNS VOID
+AS :TSL_MODULE_PATHNAME, 'ts_remote_exec'
+LANGUAGE C;
+CREATE OR REPLACE FUNCTION test.remote_exec_get_result_strings(srv_name name[], command text)
+RETURNS TABLE("table_record" CSTRING[])
+AS :TSL_MODULE_PATHNAME, 'ts_remote_exec_get_result_strings'
+LANGUAGE C;
+SELECT (add_data_node (name, host => 'localhost', DATABASE => name)).*
+FROM (VALUES (:'DATA_NODE_1'), (:'DATA_NODE_2'), (:'DATA_NODE_3')) v (name);
+           node_name            |   host    | port  |            database            | node_created | database_created | extension_created 
+--------------------------------+-----------+-------+--------------------------------+--------------+------------------+-------------------
+ db_cagg_invalidation_dist_ht_1 | localhost | 55432 | db_cagg_invalidation_dist_ht_1 | t            | t                | t
+ db_cagg_invalidation_dist_ht_2 | localhost | 55432 | db_cagg_invalidation_dist_ht_2 | t            | t                | t
+ db_cagg_invalidation_dist_ht_3 | localhost | 55432 | db_cagg_invalidation_dist_ht_3 | t            | t                | t
+(3 rows)
+
+GRANT USAGE ON FOREIGN SERVER :DATA_NODE_1, :DATA_NODE_2, :DATA_NODE_3 TO PUBLIC;
+\set IS_DISTRIBUTED TRUE
+\ir include/cagg_invalidation_common.sql
+-- This file and its contents are licensed under the Timescale License.
+-- Please see the included NOTICE for copyright information and
+-- LICENSE-TIMESCALE for a copy of the license.
+-- Disable background workers since we are testing manual refresh
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER
+SELECT _timescaledb_internal.stop_background_workers();
+ stop_background_workers 
+-------------------------
+ t
+(1 row)
+
+SET ROLE :ROLE_DEFAULT_PERM_USER;
+SET datestyle TO 'ISO, YMD';
+SET timezone TO 'UTC';
+CREATE TABLE conditions (time bigint NOT NULL, device int, temp float);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('conditions', 'time', chunk_time_interval => 10, replication_factor => 2);
+ create_distributed_hypertable 
+-------------------------------
+ (1,public,conditions,t)
+(1 row)
+
+\else
+SELECT create_hypertable('conditions', 'time', chunk_time_interval => 10);
+\endif
+CREATE TABLE measurements (time int NOT NULL, device int, temp float);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('measurements', 'time', chunk_time_interval => 10, replication_factor => 2);
+ create_distributed_hypertable 
+-------------------------------
+ (2,public,measurements,t)
+(1 row)
+
+\else
+SELECT create_hypertable('measurements', 'time', chunk_time_interval => 10);
+\endif
+CREATE OR REPLACE FUNCTION bigint_now()
+RETURNS bigint LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM conditions
+$$;
+\if :IS_DISTRIBUTED
+CALL distributed_exec($DIST$
+CREATE OR REPLACE FUNCTION bigint_now()
+RETURNS bigint LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM conditions
+$$;
+$DIST$);
+\endif
+CREATE OR REPLACE FUNCTION int_now()
+RETURNS int LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM measurements
+$$;
+\if :IS_DISTRIBUTED
+CALL distributed_exec($DIST$
+CREATE OR REPLACE FUNCTION int_now()
+RETURNS int LANGUAGE SQL STABLE AS
+$$
+    SELECT coalesce(max(time), 0)
+    FROM measurements
+$$;
+$DIST$);
+\endif
+SELECT set_integer_now_func('conditions', 'bigint_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+SELECT set_integer_now_func('measurements', 'int_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+INSERT INTO conditions
+SELECT t, ceil(abs(timestamp_hash(to_timestamp(t)::timestamp))%4)::int,
+       abs(timestamp_hash(to_timestamp(t)::timestamp))%40
+FROM generate_series(1, 100, 1) t;
+CREATE TABLE temp AS
+SELECT * FROM conditions;
+INSERT INTO measurements
+SELECT * FROM temp;
+-- Show the most recent data
+SELECT * FROM conditions
+ORDER BY time DESC, device
+LIMIT 10;
+ time | device | temp 
+------+--------+------
+  100 |      0 |    8
+   99 |      1 |    5
+   98 |      2 |   26
+   97 |      2 |   10
+   96 |      2 |   34
+   95 |      2 |   30
+   94 |      3 |   31
+   93 |      0 |    4
+   92 |      0 |   32
+   91 |      3 |   15
+(10 rows)
+
+-- Create two continuous aggregates on the same hypertable to test
+-- that invalidations are handled correctly across both of them.
+CREATE MATERIALIZED VIEW cond_10
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '10', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+CREATE MATERIALIZED VIEW cond_20
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '20', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+CREATE MATERIALIZED VIEW measure_10
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(10, time) AS bucket, device, avg(temp) AS avg_temp
+FROM measurements
+GROUP BY 1,2 WITH NO DATA;
+-- There should be three continuous aggregates, two on one hypertable
+-- and one on the other:
+SELECT mat_hypertable_id, raw_hypertable_id, user_view_name
+FROM _timescaledb_catalog.continuous_agg;
+ mat_hypertable_id | raw_hypertable_id | user_view_name 
+-------------------+-------------------+----------------
+                 3 |                 1 | cond_10
+                 4 |                 1 | cond_20
+                 5 |                 2 | measure_10
+(3 rows)
+
+-- The continuous aggregates should be empty
+SELECT * FROM cond_10
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM measure_10
+ORDER BY 1 DESC, 2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+\if :IS_DISTRIBUTED
+CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS TABLE(
+      "hyper_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$
+      SELECT hypertable_id,
+            lowest_modified_value,
+            greatest_modified_value
+            FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
+$DIST$)
+ORDER BY 1,2,3
+$$;
+CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE(
+      "cagg_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT DISTINCT table_record[1]::TEXT::INT, table_record[2]::TEXT::BIGINT, table_record[3]::TEXT::BIGINT FROM test.remote_exec_get_result_strings(NULL, $DIST$
+      SELECT materialization_id AS cagg_id,
+            lowest_modified_value AS start,
+            greatest_modified_value AS end
+            FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
+$DIST$)
+ORDER BY 1,2,3
+$$;
+\else
+CREATE OR REPLACE FUNCTION get_hyper_invals() RETURNS  TABLE (
+      "hyper_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT hypertable_id,
+       lowest_modified_value,
+       greatest_modified_value
+       FROM _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log
+       ORDER BY 1,2,3
+$$;
+CREATE OR REPLACE FUNCTION get_cagg_invals() RETURNS TABLE (
+      "cagg_id" INT,
+      "start" BIGINT,
+      "end" BIGINT
+)
+LANGUAGE SQL VOLATILE AS
+$$
+SELECT materialization_id,
+       lowest_modified_value,
+       greatest_modified_value
+       FROM _timescaledb_catalog.continuous_aggs_materialization_invalidation_log
+       ORDER BY 1,2,3
+$$;
+\endif
+CREATE VIEW hyper_invals AS SELECT * FROM get_hyper_invals();
+CREATE VIEW cagg_invals AS SELECT * FROM get_cagg_invals();
+-- Must refresh to move the invalidation threshold, or no
+-- invalidations will be generated. Initially, there is no threshold
+-- set:
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+(0 rows)
+
+-- There should be only "infinite" invalidations in the cagg
+-- invalidation log:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(3 rows)
+
+-- Now refresh up to 50 without the first bucket, and the threshold should be updated accordingly:
+CALL refresh_continuous_aggregate('cond_10', 1, 50);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+(1 row)
+
+-- Invalidations should be cleared inside the refresh window:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                   9
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refresh up to 50 from the beginning
+CALL refresh_continuous_aggregate('cond_10', 0, 50);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refreshing below the threshold does not move it:
+CALL refresh_continuous_aggregate('cond_10', 20, 49);
+psql:include/cagg_invalidation_common.sql:207: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+(1 row)
+
+-- Nothing changes with invalidations either since the region was
+-- already refreshed and no new invalidations have been generated:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 | 9223372036854775807
+(4 rows)
+
+-- Refreshing measure_10 moves the threshold only for the other hypertable:
+CALL refresh_continuous_aggregate('measure_10', 0, 30);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |        50
+             2 |        30
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(5 rows)
+
+-- Refresh on the second continuous aggregate, cond_20, on the first
+-- hypertable moves the same threshold as when refreshing cond_10:
+CALL refresh_continuous_aggregate('cond_20', 60, 100);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |       100
+             2 |        30
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- There should be no hypertable invalidations initially:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+(0 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   50 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- Create invalidations across different ranges. Some of these should
+-- be deleted and others cut in different ways when a refresh is
+-- run. Note that the refresh window is inclusive in the start of the
+-- window but exclusive at the end.
+-- Entries that should be left unmodified:
+INSERT INTO conditions VALUES (10, 4, 23.7);
+INSERT INTO conditions VALUES (10, 5, 23.8), (19, 3, 23.6);
+INSERT INTO conditions VALUES (60, 3, 23.7), (70, 4, 23.7);
+-- Should see some invaliations in the hypertable invalidation log:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |    10 |  10
+        1 |    10 |  19
+        1 |    60 |  60
+        1 |    60 |  70
+        1 |    70 |  70
+(5 rows)
+
+-- Generate some invalidations for the other hypertable
+INSERT INTO measurements VALUES (20, 4, 23.7);
+INSERT INTO measurements VALUES (30, 5, 23.8), (80, 3, 23.6);
+-- Should now see invalidations for both hypertables
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |    10 |  10
+        1 |    10 |  19
+        1 |    60 |  60
+        1 |    60 |  70
+        1 |    70 |  70
+        2 |    20 |  20
+        2 |    30 |  80
+(7 rows)
+
+-- First refresh a window where we don't have any invalidations. This
+-- allows us to see only the copying of the invalidations to the per
+-- cagg log without additional processing.
+CALL refresh_continuous_aggregate('cond_10', 20, 60);
+-- Invalidation threshold remains at 100:
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             1 |       100
+             2 |        30
+(2 rows)
+
+-- Invalidations should be moved from the hypertable invalidation log
+-- to the continuous aggregate log, but only for the hypertable that
+-- the refreshed aggregate belongs to:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Now add more invalidations to test a refresh that overlaps with them.
+-- Entries that should be deleted:
+INSERT INTO conditions VALUES (30, 1, 23.4), (59, 1, 23.4);
+INSERT INTO conditions VALUES (20, 1, 23.4), (30, 1, 23.4);
+-- Entries that should be cut to the right, leaving an invalidation to
+-- the left of the refresh window:
+INSERT INTO conditions VALUES (1, 4, 23.7), (25, 1, 23.4);
+INSERT INTO conditions VALUES (19, 4, 23.7), (59, 1, 23.4);
+-- Entries that should be cut to the left and right, leaving two
+-- invalidation entries on each side of the refresh window:
+INSERT INTO conditions VALUES (2, 2, 23.5), (60, 1, 23.4);
+INSERT INTO conditions VALUES (3, 2, 23.5), (80, 1, 23.4);
+-- Entries that should be cut to the left, leaving an invalidation to
+-- the right of the refresh window:
+INSERT INTO conditions VALUES (60, 3, 23.6), (90, 3, 23.6);
+INSERT INTO conditions VALUES (20, 5, 23.8), (100, 3, 23.6);
+-- New invalidations in the hypertable invalidation log:
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |     1 |   1
+        1 |     1 |  25
+        1 |     2 |  60
+        1 |     3 |   3
+        1 |     3 |  80
+        1 |    19 |  19
+        1 |    19 |  59
+        1 |    20 |  20
+        1 |    20 |  30
+        1 |    20 | 100
+        1 |    25 |  25
+        1 |    30 |  30
+        1 |    30 |  59
+        1 |    59 |  59
+        1 |    60 |  90
+        1 |    80 |  80
+        1 |   100 | 100
+        2 |    20 |  20
+        2 |    30 |  80
+(19 rows)
+
+-- But nothing has yet changed in the cagg invalidation log:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Refresh to process invalidations for daily temperature:
+CALL refresh_continuous_aggregate('cond_10', 20, 60);
+-- Invalidations should be moved from the hypertable invalidation log
+-- to the continuous aggregate log.
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- Only the cond_10 cagg should have its entries cut:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 | -9223372036854775808 |                  19
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  59
+       4 |                    0 |                  19
+       4 |                    0 |                  99
+       4 |                    0 |                 119
+       4 |                   60 |                  79
+       4 |                  100 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(12 rows)
+
+-- Refresh also cond_20:
+CALL refresh_continuous_aggregate('cond_20', 20, 60);
+-- The cond_20 cagg should also have its entries cut:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 | -9223372036854775808 |                  19
+       3 |                   10 |                  19
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(8 rows)
+
+-- Refresh cond_10 to completely remove an invalidation:
+CALL refresh_continuous_aggregate('cond_10', 0, 20);
+-- The 1-19 invalidation should be deleted:
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   60 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(6 rows)
+
+-- Clear everything between 0 and 100 to make way for new
+-- invalidations
+CALL refresh_continuous_aggregate('cond_10', 0, 100);
+-- Test refreshing with non-overlapping invalidations
+INSERT INTO conditions VALUES (20, 1, 23.4), (25, 1, 23.4);
+INSERT INTO conditions VALUES (30, 1, 23.4), (46, 1, 23.4);
+CALL refresh_continuous_aggregate('cond_10', 1, 40);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   40 |                  49
+       3 |                  100 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                   20 |                  39
+       4 |                   20 |                  59
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(9 rows)
+
+-- Refresh whithout cutting (in area where there are no
+-- invalidations). Merging of overlapping entries should still happen:
+INSERT INTO conditions VALUES (15, 1, 23.4), (42, 1, 23.4);
+CALL refresh_continuous_aggregate('cond_10', 90, 100);
+psql:include/cagg_invalidation_common.sql:327: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                  -1
+       3 |                   10 |                  49
+       3 |                  100 | 9223372036854775807
+       4 | -9223372036854775808 |                  19
+       4 |                    0 |                  59
+       4 |                   20 |                  39
+       4 |                   20 |                  59
+       4 |                   60 | 9223372036854775807
+       5 | -9223372036854775808 |                  -1
+       5 |                   30 | 9223372036854775807
+(10 rows)
+
+-- Test max refresh window
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+SELECT * FROM cagg_invals;
+ cagg_id |        start         |         end          
+---------+----------------------+----------------------
+       3 | -9223372036854775808 | -9223372036854775801
+       3 |                  110 |  9223372036854775807
+       4 | -9223372036854775808 |                   19
+       4 |                    0 |                   59
+       4 |                   20 |                   39
+       4 |                   20 |                   59
+       4 |                   60 |  9223372036854775807
+       5 | -9223372036854775808 |                   -1
+       5 |                   30 |  9223372036854775807
+(9 rows)
+
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- Pick the first chunk of conditions to TRUNCATE
+SELECT show_chunks AS chunk_to_truncate
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before truncating one of the chunks
+SELECT * FROM :chunk_to_truncate
+ORDER BY 1;
+ time | device | temp 
+------+--------+------
+    1 |      4 | 23.7
+    1 |      0 |   16
+    2 |      2 | 23.5
+    2 |      1 |   25
+    3 |      2 | 23.5
+    3 |      0 |   20
+    4 |      2 |   10
+    5 |      2 |   26
+    6 |      1 |   13
+    7 |      3 |   35
+    8 |      1 |   37
+    9 |      3 |    7
+(12 rows)
+
+-- Truncate one chunk
+\if :IS_DISTRIBUTED
+-- There is no TRUNCATE implementation for FOREIGN tables yet
+\set ON_ERROR_STOP 0
+\endif
+TRUNCATE TABLE :chunk_to_truncate;
+psql:include/cagg_invalidation_common.sql:352: ERROR:  cannot truncate foreign table "_dist_hyper_1_1_chunk"
+\if :IS_DISTRIBUTED
+\set ON_ERROR_STOP 1
+\endif
+-- Should see new invalidation entries for conditions for the non-distributed case
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        2 |    20 |  20
+        2 |    30 |  80
+(2 rows)
+
+-- TRUNCATE the hypertable to invalidate all its continuous aggregates
+TRUNCATE conditions;
+-- Now empty
+SELECT * FROM conditions;
+ time | device | temp 
+------+--------+------
+(0 rows)
+
+-- Should see an infinite invalidation entry for conditions
+SELECT * FROM hyper_invals;
+ hyper_id |        start         |         end         
+----------+----------------------+---------------------
+        1 | -9223372036854775808 | 9223372036854775807
+        2 |                   20 |                  20
+        2 |                   30 |                  80
+(3 rows)
+
+-- Aggregates still hold data
+SELECT * FROM cond_10
+ORDER BY 1,2
+LIMIT 5;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      0 |       18
+      0 |      1 |       25
+      0 |      2 |    20.75
+      0 |      3 |       21
+      0 |      4 |     23.7
+(5 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2
+LIMIT 5;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+     20 |      0 | 18.2857142857143
+     20 |      1 | 23.5142857142857
+     20 |      2 |               26
+     20 |      3 |               23
+     20 |      5 |             23.8
+(5 rows)
+
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- Both should now be empty after refresh
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Insert new data again and refresh
+INSERT INTO conditions VALUES
+       (1, 1, 23.4), (4, 3, 14.3), (5, 1, 13.6),
+       (6, 2, 17.9), (12, 1, 18.3), (19, 3, 28.2),
+       (10, 3, 22.3), (11, 2, 34.9), (15, 2, 45.6),
+       (21, 1, 15.3), (22, 2, 12.3), (29, 3, 16.3);
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- Should now hold data again
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |     18.5
+      0 |      2 |     17.9
+      0 |      3 |     14.3
+     10 |      1 |     18.3
+     10 |      2 |    40.25
+     10 |      3 |    25.25
+     20 |      1 |     15.3
+     20 |      2 |     12.3
+     20 |      3 |     16.3
+(9 rows)
+
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+      0 |      1 | 18.4333333333333
+      0 |      2 |             32.8
+      0 |      3 |             21.6
+     20 |      1 |             15.3
+     20 |      2 |             12.3
+     20 |      3 |             16.3
+(6 rows)
+
+-- Truncate one of the aggregates, but first test that we block
+-- TRUNCATE ONLY
+\set ON_ERROR_STOP 0
+TRUNCATE ONLY cond_20;
+psql:include/cagg_invalidation_common.sql:408: ERROR:  cannot truncate only a continuous aggregate
+\set ON_ERROR_STOP 1
+TRUNCATE cond_20;
+-- Should now be empty
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Other aggregate is not affected
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |     18.5
+      0 |      2 |     17.9
+      0 |      3 |     14.3
+     10 |      1 |     18.3
+     10 |      2 |    40.25
+     10 |      3 |    25.25
+     20 |      1 |     15.3
+     20 |      2 |     12.3
+     20 |      3 |     16.3
+(9 rows)
+
+-- Refresh again to bring data back
+CALL refresh_continuous_aggregate('cond_20', NULL, NULL);
+-- The aggregate should be populated again
+SELECT * FROM cond_20
+ORDER BY 1,2;
+ bucket | device |     avg_temp     
+--------+--------+------------------
+      0 |      1 | 18.4333333333333
+      0 |      2 |             32.8
+      0 |      3 |             21.6
+     20 |      1 |             15.3
+     20 |      2 |             12.3
+     20 |      3 |             16.3
+(6 rows)
+
+-------------------------------------------------------
+-- Test corner cases against a minimal bucket aggregate
+-------------------------------------------------------
+-- First, clear the table and aggregate
+TRUNCATE conditions;
+SELECT * FROM conditions;
+ time | device | temp 
+------+--------+------
+(0 rows)
+
+CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
+SELECT * FROM cond_10
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+CREATE MATERIALIZED VIEW cond_1
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(BIGINT '1', time) AS bucket, device, avg(temp) AS avg_temp
+FROM conditions
+GROUP BY 1,2 WITH NO DATA;
+SELECT mat_hypertable_id AS cond_1_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'cond_1' \gset
+-- Test manual invalidation error
+\if :IS_DISTRIBUTED
+\else
+\set ON_ERROR_STOP 0
+SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(:cond_1_id, 1, 0);
+\set ON_ERROR_STOP 1
+\endif
+-- Test invalidations with bucket size 1
+INSERT INTO conditions VALUES (0, 1, 1.0);
+SELECT * FROM hyper_invals;
+ hyper_id | start | end 
+----------+-------+-----
+        1 |     0 |   0
+        2 |    20 |  20
+        2 |    30 |  80
+(3 rows)
+
+-- Refreshing around the bucket should not update the aggregate
+CALL refresh_continuous_aggregate('cond_1', -1, 0);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+(0 rows)
+
+-- Refresh only the invalidated bucket
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       6 | -9223372036854775808 |                  -2
+       6 |                    2 | 9223372036854775807
+(2 rows)
+
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+(1 row)
+
+-- Refresh 1 extra bucket on the left
+INSERT INTO conditions VALUES (0, 1, 2.0);
+CALL refresh_continuous_aggregate('cond_1', -1, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |      1.5
+(1 row)
+
+-- Refresh 1 extra bucket on the right
+INSERT INTO conditions VALUES (0, 1, 3.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        2
+(1 row)
+
+-- Refresh 1 extra bucket on each side
+INSERT INTO conditions VALUES (0, 1, 4.0);
+CALL refresh_continuous_aggregate('cond_1', -1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |      2.5
+(1 row)
+
+-- Clear to reset aggregate
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+-- Test invalidation of size 2
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0);
+-- Refresh one bucket at a time
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+(1 row)
+
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+-- Repeat the same thing but refresh the whole invalidation at once
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+-- Test invalidation of size 3
+TRUNCATE conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0);
+-- Invalidation extends beyond the refresh window on both ends
+CALL refresh_continuous_aggregate('cond_1', 1, 2);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      1 |      1 |        2
+(1 row)
+
+-- Should leave one invalidation on each side of the refresh window
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id | start |         end         
+---------+-------+---------------------
+       6 |     0 |                   0
+       6 |     2 |                   2
+       6 |   110 | 9223372036854775807
+(3 rows)
+
+-- Refresh the two remaining invalidations
+CALL refresh_continuous_aggregate('cond_1', 0, 1);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+(2 rows)
+
+CALL refresh_continuous_aggregate('cond_1', 2, 3);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+-- Clear and repeat but instead refresh the whole range in one go. The
+-- result should be the same as the three partial refreshes. Use
+-- DELETE instead of TRUNCATE to clear this time.
+DELETE FROM conditions;
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+INSERT INTO conditions VALUES (0, 1, 1.0), (1, 1, 2.0), (2, 1, 3.0);
+CALL refresh_continuous_aggregate('cond_1', 0, 3);
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+----------------------------------------------
+-- Test that invalidation threshold is capped
+----------------------------------------------
+CREATE table threshold_test (time int, value int);
+\if :IS_DISTRIBUTED
+SELECT create_distributed_hypertable('threshold_test', 'time', chunk_time_interval => 4, replication_factor => 2);
+psql:include/cagg_invalidation_common.sql:565: NOTICE:  adding not-null constraint to column "time"
+ create_distributed_hypertable 
+-------------------------------
+ (7,public,threshold_test,t)
+(1 row)
+
+\else
+SELECT create_hypertable('threshold_test', 'time', chunk_time_interval => 4);
+\endif
+SELECT set_integer_now_func('threshold_test', 'int_now');
+ set_integer_now_func 
+----------------------
+ 
+(1 row)
+
+CREATE MATERIALIZED VIEW thresh_2
+WITH (timescaledb.continuous,
+      timescaledb.materialized_only=true)
+AS
+SELECT time_bucket(2, time) AS bucket, max(value) AS max
+FROM threshold_test
+GROUP BY 1 WITH NO DATA;
+SELECT raw_hypertable_id AS thresh_hyper_id, mat_hypertable_id AS thresh_cagg_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'thresh_2' \gset
+-- There's no invalidation threshold initially
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+(0 rows)
+
+-- Test manual invalidation error
+\if :IS_DISTRIBUTED
+\else
+\set ON_ERROR_STOP 0
+SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(:thresh_hyper_id, 1, 0);
+\set ON_ERROR_STOP 1
+\endif
+-- Test that threshold is initilized to min value when there's no data
+-- and we specify an infinite end. Note that the min value may differ
+-- depending on time type.
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+psql:include/cagg_invalidation_common.sql:599: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id |  watermark  
+---------------+-------------
+             7 | -2147483648
+(1 row)
+
+INSERT INTO threshold_test
+SELECT v, v FROM generate_series(1, 10) v;
+CALL refresh_continuous_aggregate('thresh_2', 0, 5);
+-- Threshold should move to end of the last refreshed bucket, which is
+-- the last bucket fully included in the window, i.e., the window
+-- shrinks to end of previous bucket.
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |         4
+(1 row)
+
+-- Refresh where both the start and end of the window is above the
+-- max data value
+CALL refresh_continuous_aggregate('thresh_2', 14, NULL);
+psql:include/cagg_invalidation_common.sql:619: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+SELECT watermark AS thresh_hyper_id_watermark
+FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id \gset
+-- Refresh where we start from the current watermark to infinity
+CALL refresh_continuous_aggregate('thresh_2', :thresh_hyper_id_watermark, NULL);
+psql:include/cagg_invalidation_common.sql:626: NOTICE:  continuous aggregate "thresh_2" is already up-to-date
+-- Now refresh with max end of the window to test that the
+-- invalidation threshold is capped at the last bucket of data
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        12
+(1 row)
+
+-- Should not have processed invalidations beyond the invalidation
+-- threshold.
+SELECT * FROM cagg_invals
+WHERE cagg_id = :thresh_cagg_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       8 | -9223372036854775808 |                  -1
+       8 |                   12 | 9223372036854775807
+(2 rows)
+
+-- Check that things are properly materialized
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  10
+(6 rows)
+
+-- Delete the last data
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('threshold_test')
+ORDER BY 1 DESC
+LIMIT 1 \gset
+DELETE FROM threshold_test
+WHERE time > 6;
+-- The last data in the hypertable is gone
+SELECT time_bucket(2, time) AS bucket, max(value) AS max
+FROM threshold_test
+GROUP BY 1
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   6
+(4 rows)
+
+-- The aggregate still holds data
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  10
+(6 rows)
+
+-- Refresh the aggregate to bring it up-to-date
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+-- Data also gone from the aggregate
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   6
+(4 rows)
+
+-- The invalidation threshold remains the same
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        12
+(1 row)
+
+-- Insert new data beyond the invalidation threshold to move it
+-- forward
+INSERT INTO threshold_test
+SELECT v, v FROM generate_series(7, 15) v;
+CALL refresh_continuous_aggregate('thresh_2', 0, NULL);
+-- Aggregate now updated to reflect newly aggregated data
+SELECT * FROM thresh_2
+ORDER BY 1;
+ bucket | max 
+--------+-----
+      0 |   1
+      2 |   3
+      4 |   5
+      6 |   7
+      8 |   9
+     10 |  11
+     12 |  13
+     14 |  15
+(8 rows)
+
+-- The invalidation threshold should have moved forward to the end of
+-- the new data
+SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
+WHERE hypertable_id = :thresh_hyper_id
+ORDER BY 1,2;
+ hypertable_id | watermark 
+---------------+-----------
+             7 |        16
+(1 row)
+
+-- The aggregate remains invalid beyond the invalidation threshold
+SELECT * FROM cagg_invals
+WHERE cagg_id = :thresh_cagg_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       8 | -9223372036854775808 |                  -1
+       8 |                   16 | 9223372036854775807
+(2 rows)
+
+----------------------------------------------------------------------
+-- Test that dropping a chunk invalidates the dropped region. First
+-- create another chunk so that we have two chunks. One of the chunks
+-- will be dropped.
+---------------------------------------------------------------------
+INSERT INTO conditions VALUES (10, 1, 10.0);
+-- Chunks currently associated with the hypertable
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('conditions');
+                chunk_to_drop                 
+----------------------------------------------
+ _timescaledb_internal._dist_hyper_1_34_chunk
+ _timescaledb_internal._dist_hyper_1_40_chunk
+(2 rows)
+
+-- Pick the first one to drop
+SELECT show_chunks AS chunk_to_drop
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+-- Show the data before dropping one of the chunks
+SELECT * FROM conditions
+ORDER BY 1,2;
+ time | device | temp 
+------+--------+------
+    0 |      1 |    1
+    1 |      1 |    2
+    2 |      1 |    3
+   10 |      1 |   10
+(4 rows)
+
+-- Drop one chunk
+\if :IS_DISTRIBUTED
+CALL distributed_exec(format('DROP TABLE IF EXISTS %s', :'chunk_to_drop'));
+DROP FOREIGN TABLE :chunk_to_drop;
+\else
+DROP TABLE :chunk_to_drop;
+\endif
+-- The chunk's data no longer exists in the hypertable
+SELECT * FROM conditions
+ORDER BY 1,2;
+ time | device | temp 
+------+--------+------
+   10 |      1 |   10
+(1 row)
+
+-- Aggregate still remains in continuous aggregate, however
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+      0 |      1 |        1
+      1 |      1 |        2
+      2 |      1 |        3
+(3 rows)
+
+-- Refresh the continuous aggregate to make the dropped data be
+-- reflected in the aggregate
+CALL refresh_continuous_aggregate('cond_1', NULL, NULL);
+-- Aggregate now up-to-date with the source hypertable
+SELECT * FROM cond_1
+ORDER BY 1,2;
+ bucket | device | avg_temp 
+--------+--------+----------
+     10 |      1 |       10
+(1 row)
+
+-- Test that adjacent invalidations are merged
+INSERT INTO conditions VALUES(1, 1, 1.0), (2, 1, 2.0);
+INSERT INTO conditions VALUES(3, 1, 1.0);
+INSERT INTO conditions VALUES(4, 1, 1.0);
+INSERT INTO conditions VALUES(6, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_1', 10, NULL);
+psql:include/cagg_invalidation_common.sql:748: NOTICE:  continuous aggregate "cond_1" is already up-to-date
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_1_id;
+ cagg_id | start |         end         
+---------+-------+---------------------
+       6 |     1 |                   4
+       6 |     6 |                   6
+       6 |   110 | 9223372036854775807
+(3 rows)
+
+---------------------------------------------------------------------
+-- Test that single timestamp invalidations are expanded to buckets,
+-- and adjacent buckets merged. This merging cannot cross Data-Node
+-- chunk boundaries for the distributed hypertable case.
+---------------------------------------------------------------------
+-- First clear invalidations in a range:
+CALL refresh_continuous_aggregate('cond_10', -20, 60);
+-- The following three should be merged to one range 0-29
+INSERT INTO conditions VALUES (5, 1, 1.0);
+INSERT INTO conditions VALUES (15, 1, 1.0);
+INSERT INTO conditions VALUES (25, 1, 1.0);
+-- The last one should not merge with the others
+INSERT INTO conditions VALUES (40, 1, 1.0);
+-- Refresh to process invalidations, but outside the range of
+-- invalidations we inserted so that we don't clear them.
+CALL refresh_continuous_aggregate('cond_10', 50, 60);
+psql:include/cagg_invalidation_common.sql:769: NOTICE:  continuous aggregate "cond_10" is already up-to-date
+SELECT mat_hypertable_id AS cond_10_id
+FROM _timescaledb_catalog.continuous_agg
+WHERE user_view_name = 'cond_10' \gset
+SELECT * FROM cagg_invals
+WHERE cagg_id = :cond_10_id;
+ cagg_id |        start         |         end         
+---------+----------------------+---------------------
+       3 | -9223372036854775808 |                 -21
+       3 |                    0 |                   9
+       3 |                    0 |                  19
+       3 |                   10 |                  29
+       3 |                   20 |                  29
+       3 |                   40 |                  49
+       3 |                   60 | 9223372036854775807
+(7 rows)
+
+-- should trigger two individual refreshes
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Allow at most 5 individual invalidations per refreshe
+SET timescaledb.materializations_per_refresh_window=5;
+-- Insert into every second bucket
+INSERT INTO conditions VALUES (20, 1, 1.0);
+INSERT INTO conditions VALUES (40, 1, 1.0);
+INSERT INTO conditions VALUES (60, 1, 1.0);
+INSERT INTO conditions VALUES (80, 1, 1.0);
+INSERT INTO conditions VALUES (100, 1, 1.0);
+INSERT INTO conditions VALUES (120, 1, 1.0);
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+\set VERBOSITY default
+-- Test acceptable values for materializations per refresh
+SET timescaledb.materializations_per_refresh_window=' 5 ';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Large value will be treated as LONG_MAX
+SET timescaledb.materializations_per_refresh_window=342239897234023842394249234766923492347;
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+-- Test bad values for materializations per refresh
+SET timescaledb.materializations_per_refresh_window='foo';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:808: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "foo".
+SET timescaledb.materializations_per_refresh_window='2bar';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:811: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "2bar".
+SET timescaledb.materializations_per_refresh_window='-';
+INSERT INTO conditions VALUES (140, 1, 1.0);
+CALL refresh_continuous_aggregate('cond_10', 0, 200);
+psql:include/cagg_invalidation_common.sql:815: WARNING:  invalid value for session variable "timescaledb.materializations_per_refresh_window"
+DETAIL:  Expected an integer but current value is "-".
+\set VERBOSITY terse
+-- cleanup
+\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
+DROP DATABASE :DATA_NODE_1;
+DROP DATABASE :DATA_NODE_2;
+DROP DATABASE :DATA_NODE_3;
diff --git a/tsl/test/expected/chunk_utils_compression.out b/tsl/test/expected/chunk_utils_compression.out
index 535782c27..4f79e9607 100644
--- a/tsl/test/expected/chunk_utils_compression.out
+++ b/tsl/test/expected/chunk_utils_compression.out
@@ -106,6 +106,25 @@ SELECT show_chunks('public.table_to_compress', newer_than=>'1 day'::interval);
 -------------
 (0 rows)
 
+-- truncate one compressed chunk
+SELECT chunk_schema || '.' || chunk_name as "CHNAME"
+FROM timescaledb_information.chunks
+WHERE hypertable_name = 'table_to_compress' and hypertable_schema = 'public'
+ORDER BY chunk_name LIMIT 1
+\gset
+SELECT count(*) FROM :CHNAME;
+ count 
+-------
+     1
+(1 row)
+
+TRUNCATE TABLE :CHNAME;
+SELECT count(*) FROM :CHNAME;
+ count 
+-------
+     0
+(1 row)
+
 -- drop all hypertables' old chunks
 SELECT drop_chunks(table_name::regclass, older_than=>'1 day'::interval)
   FROM _timescaledb_catalog.hypertable
diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt
index a3e386f43..4393352c2 100644
--- a/tsl/test/sql/CMakeLists.txt
+++ b/tsl/test/sql/CMakeLists.txt
@@ -53,7 +53,6 @@ if(CMAKE_BUILD_TYPE MATCHES Debug)
     cagg_ddl_dist_ht.sql
     cagg_drop_chunks.sql
     cagg_dump.sql
-    cagg_invalidation_dist_ht.sql
     cagg_multi.sql
     continuous_aggs.sql
     continuous_aggs_deprecated.sql
@@ -129,8 +128,14 @@ set(TEST_TEMPLATES
     transparent_decompression_ordered_index.sql.in)
 
 if(CMAKE_BUILD_TYPE MATCHES Debug)
-  list(APPEND TEST_TEMPLATES cagg_query.sql.in dist_hypertable.sql.in
-       remote_copy.sql.in dist_grant.sql.in)
+  list(
+    APPEND
+    TEST_TEMPLATES
+    cagg_query.sql.in
+    dist_hypertable.sql.in
+    remote_copy.sql.in
+    dist_grant.sql.in
+    cagg_invalidation_dist_ht.sql.in)
 endif(CMAKE_BUILD_TYPE MATCHES Debug)
 
 # Check if PostgreSQL was compiled with JIT support
diff --git a/tsl/test/sql/cagg_invalidation_dist_ht.sql b/tsl/test/sql/cagg_invalidation_dist_ht.sql.in
similarity index 100%
rename from tsl/test/sql/cagg_invalidation_dist_ht.sql
rename to tsl/test/sql/cagg_invalidation_dist_ht.sql.in
diff --git a/tsl/test/sql/chunk_utils_compression.sql b/tsl/test/sql/chunk_utils_compression.sql
index 66619123f..d258039de 100644
--- a/tsl/test/sql/chunk_utils_compression.sql
+++ b/tsl/test/sql/chunk_utils_compression.sql
@@ -37,6 +37,15 @@ SELECT show_chunks('public.uncompressed_table');
 SELECT show_chunks('public.table_to_compress');
 SELECT show_chunks('public.table_to_compress', older_than=>'1 day'::interval);
 SELECT show_chunks('public.table_to_compress', newer_than=>'1 day'::interval);
+-- truncate one compressed chunk
+SELECT chunk_schema || '.' || chunk_name as "CHNAME"
+FROM timescaledb_information.chunks
+WHERE hypertable_name = 'table_to_compress' and hypertable_schema = 'public'
+ORDER BY chunk_name LIMIT 1
+\gset
+SELECT count(*) FROM :CHNAME;
+TRUNCATE TABLE :CHNAME;
+SELECT count(*) FROM :CHNAME;
 -- drop all hypertables' old chunks
 SELECT drop_chunks(table_name::regclass, older_than=>'1 day'::interval)
   FROM _timescaledb_catalog.hypertable
diff --git a/tsl/test/sql/include/cagg_invalidation_common.sql b/tsl/test/sql/include/cagg_invalidation_common.sql
index e90595faa..77fe40e95 100644
--- a/tsl/test/sql/include/cagg_invalidation_common.sql
+++ b/tsl/test/sql/include/cagg_invalidation_common.sql
@@ -334,6 +334,29 @@ CALL refresh_continuous_aggregate('cond_10', NULL, NULL);
 SELECT * FROM cagg_invals;
 SELECT * FROM hyper_invals;
 
+-- Pick the first chunk of conditions to TRUNCATE
+SELECT show_chunks AS chunk_to_truncate
+FROM show_chunks('conditions')
+ORDER BY 1
+LIMIT 1 \gset
+
+-- Show the data before truncating one of the chunks
+SELECT * FROM :chunk_to_truncate
+ORDER BY 1;
+
+-- Truncate one chunk
+\if :IS_DISTRIBUTED
+-- There is no TRUNCATE implementation for FOREIGN tables yet
+\set ON_ERROR_STOP 0
+\endif
+TRUNCATE TABLE :chunk_to_truncate;
+\if :IS_DISTRIBUTED
+\set ON_ERROR_STOP 1
+\endif
+
+-- Should see new invalidation entries for conditions for the non-distributed case
+SELECT * FROM hyper_invals;
+
 -- TRUNCATE the hypertable to invalidate all its continuous aggregates
 TRUNCATE conditions;