From 183d309b2cb25230170486b238b0b1b3d588a70f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?=
 <fabriziomello@gmail.com>
Date: Fri, 26 Apr 2024 11:38:28 -0300
Subject: [PATCH] Update the watermark when truncating a CAgg

In #5261 we cached the Continuous Aggregate watermark value in a
metadata table to improve performance avoiding compute the watermark at
planning time.

Manually DML operations on a CAgg are not recommended and instead the
user should use the `refresh_continuous_aggregate` procedure. But we
handle `TRUNCATE` over CAggs generating the necessary invalidation logs
so make sense to also update the watermark.
---
 .unreleased/feature_6865          |  1 +
 src/process_utility.c             | 19 +++++++++++-
 tsl/test/expected/cagg_ddl-13.out | 49 +++++++++++++++++++++++++++++++
 tsl/test/expected/cagg_ddl-14.out | 49 +++++++++++++++++++++++++++++++
 tsl/test/expected/cagg_ddl-15.out | 49 +++++++++++++++++++++++++++++++
 tsl/test/expected/cagg_ddl-16.out | 49 +++++++++++++++++++++++++++++++
 tsl/test/sql/cagg_ddl.sql.in      | 32 ++++++++++++++++++++
 7 files changed, 247 insertions(+), 1 deletion(-)
 create mode 100644 .unreleased/feature_6865

diff --git a/.unreleased/feature_6865 b/.unreleased/feature_6865
new file mode 100644
index 000000000..8584a15f9
--- /dev/null
+++ b/.unreleased/feature_6865
@@ -0,0 +1 @@
+Implements: #6865 Update the watermark when truncating a CAgg
diff --git a/src/process_utility.c b/src/process_utility.c
index b61d04a9d..5851c1ab3 100644
--- a/src/process_utility.c
+++ b/src/process_utility.c
@@ -69,6 +69,7 @@
 #include "ts_catalog/catalog.h"
 #include "ts_catalog/compression_settings.h"
 #include "ts_catalog/continuous_agg.h"
+#include "ts_catalog/continuous_aggs_watermark.h"
 #include "tss_callbacks.h"
 #include "utils.h"
 #include "with_clause_parser.h"
@@ -889,7 +890,7 @@ process_truncate(ProcessUtilityArgs *args)
 	TruncateStmt *stmt = (TruncateStmt *) args->parsetree;
 	Cache *hcache = ts_hypertable_cache_pin();
 	ListCell *cell;
-	List *hypertables = NIL;
+	List *hypertables = NIL, *mat_hypertables = NIL;
 	List *relations = NIL;
 	bool list_changed = false;
 	MemoryContext oldctx, parsetreectx = GetMemoryChunkContext(args->parsetree);
@@ -966,6 +967,9 @@ process_truncate(ProcessUtilityArgs *args)
 
 						/* mark list as changed because we'll add the materialization hypertable */
 						list_changed = true;
+
+						/* list of materialization hypertables to reset the watermark */
+						mat_hypertables = lappend(mat_hypertables, mat_ht);
 					}
 
 					list_append = true;
@@ -1103,6 +1107,19 @@ process_truncate(ProcessUtilityArgs *args)
 		}
 	}
 
+	/* For all materialization hypertables, reset the watermark */
+	foreach (cell, mat_hypertables)
+	{
+		Hypertable *mat_ht = lfirst(cell);
+
+		Assert(mat_ht != NULL);
+
+		/* Force update the watermark */
+		bool isnull;
+		int64 watermark = ts_hypertable_get_open_dim_max_value(mat_ht, 0, &isnull);
+		ts_cagg_watermark_update(mat_ht, watermark, isnull, true);
+	}
+
 	ts_cache_release(hcache);
 
 	return DDL_DONE;
diff --git a/tsl/test/expected/cagg_ddl-13.out b/tsl/test/expected/cagg_ddl-13.out
index 037ef716f..a3862ac53 100644
--- a/tsl/test/expected/cagg_ddl-13.out
+++ b/tsl/test/expected/cagg_ddl-13.out
@@ -2025,3 +2025,52 @@ SELECT * FROM conditions_daily ORDER BY bucket, avg;
  NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
 (6 rows)
 
+-- Test TRUNCATE over a Realtime CAgg
+DROP MATERIALIZED VIEW conditions_daily;
+NOTICE:  drop cascades to 2 other objects
+CREATE MATERIALIZED VIEW conditions_daily
+WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
+SELECT location,
+       time_bucket(INTERVAL '1 day', time) AS bucket,
+       AVG(temperature)
+  FROM conditions
+GROUP BY location, bucket
+WITH NO DATA;
+SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'conditions_daily' \gset
+-- Check the current watermark for an empty CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_empty_cagg;
+       watermak_empty_cagg       
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Refresh the CAGG
+CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
+-- Check the watermark after the refresh and before truncate the CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_before;
+       watermak_before        
+------------------------------
+ Fri Nov 02 17:00:00 2018 PDT
+(1 row)
+
+-- Truncate the given CAgg, it should reset the watermark to the empty state
+TRUNCATE conditions_daily;
+-- Watermark should be reseted
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_after;
+         watermak_after          
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Should return ROWS because the watermark was reseted by the TRUNCATE
+SELECT * FROM conditions_daily ORDER BY bucket, avg;
+ location |            bucket            | avg 
+----------+------------------------------+-----
+ SFO      | Sun Dec 31 16:00:00 2017 PST |  55
+ SFO      | Mon Jan 01 16:00:00 2018 PST |  65
+ NYC      | Mon Jan 01 16:00:00 2018 PST |  65
+ por      | Mon Jan 01 16:00:00 2018 PST | 100
+ NYC      | Wed Oct 31 17:00:00 2018 PDT |  65
+ NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
+(6 rows)
+
diff --git a/tsl/test/expected/cagg_ddl-14.out b/tsl/test/expected/cagg_ddl-14.out
index 037ef716f..a3862ac53 100644
--- a/tsl/test/expected/cagg_ddl-14.out
+++ b/tsl/test/expected/cagg_ddl-14.out
@@ -2025,3 +2025,52 @@ SELECT * FROM conditions_daily ORDER BY bucket, avg;
  NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
 (6 rows)
 
+-- Test TRUNCATE over a Realtime CAgg
+DROP MATERIALIZED VIEW conditions_daily;
+NOTICE:  drop cascades to 2 other objects
+CREATE MATERIALIZED VIEW conditions_daily
+WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
+SELECT location,
+       time_bucket(INTERVAL '1 day', time) AS bucket,
+       AVG(temperature)
+  FROM conditions
+GROUP BY location, bucket
+WITH NO DATA;
+SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'conditions_daily' \gset
+-- Check the current watermark for an empty CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_empty_cagg;
+       watermak_empty_cagg       
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Refresh the CAGG
+CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
+-- Check the watermark after the refresh and before truncate the CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_before;
+       watermak_before        
+------------------------------
+ Fri Nov 02 17:00:00 2018 PDT
+(1 row)
+
+-- Truncate the given CAgg, it should reset the watermark to the empty state
+TRUNCATE conditions_daily;
+-- Watermark should be reseted
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_after;
+         watermak_after          
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Should return ROWS because the watermark was reseted by the TRUNCATE
+SELECT * FROM conditions_daily ORDER BY bucket, avg;
+ location |            bucket            | avg 
+----------+------------------------------+-----
+ SFO      | Sun Dec 31 16:00:00 2017 PST |  55
+ SFO      | Mon Jan 01 16:00:00 2018 PST |  65
+ NYC      | Mon Jan 01 16:00:00 2018 PST |  65
+ por      | Mon Jan 01 16:00:00 2018 PST | 100
+ NYC      | Wed Oct 31 17:00:00 2018 PDT |  65
+ NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
+(6 rows)
+
diff --git a/tsl/test/expected/cagg_ddl-15.out b/tsl/test/expected/cagg_ddl-15.out
index 037ef716f..a3862ac53 100644
--- a/tsl/test/expected/cagg_ddl-15.out
+++ b/tsl/test/expected/cagg_ddl-15.out
@@ -2025,3 +2025,52 @@ SELECT * FROM conditions_daily ORDER BY bucket, avg;
  NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
 (6 rows)
 
+-- Test TRUNCATE over a Realtime CAgg
+DROP MATERIALIZED VIEW conditions_daily;
+NOTICE:  drop cascades to 2 other objects
+CREATE MATERIALIZED VIEW conditions_daily
+WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
+SELECT location,
+       time_bucket(INTERVAL '1 day', time) AS bucket,
+       AVG(temperature)
+  FROM conditions
+GROUP BY location, bucket
+WITH NO DATA;
+SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'conditions_daily' \gset
+-- Check the current watermark for an empty CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_empty_cagg;
+       watermak_empty_cagg       
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Refresh the CAGG
+CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
+-- Check the watermark after the refresh and before truncate the CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_before;
+       watermak_before        
+------------------------------
+ Fri Nov 02 17:00:00 2018 PDT
+(1 row)
+
+-- Truncate the given CAgg, it should reset the watermark to the empty state
+TRUNCATE conditions_daily;
+-- Watermark should be reseted
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_after;
+         watermak_after          
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Should return ROWS because the watermark was reseted by the TRUNCATE
+SELECT * FROM conditions_daily ORDER BY bucket, avg;
+ location |            bucket            | avg 
+----------+------------------------------+-----
+ SFO      | Sun Dec 31 16:00:00 2017 PST |  55
+ SFO      | Mon Jan 01 16:00:00 2018 PST |  65
+ NYC      | Mon Jan 01 16:00:00 2018 PST |  65
+ por      | Mon Jan 01 16:00:00 2018 PST | 100
+ NYC      | Wed Oct 31 17:00:00 2018 PDT |  65
+ NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
+(6 rows)
+
diff --git a/tsl/test/expected/cagg_ddl-16.out b/tsl/test/expected/cagg_ddl-16.out
index cd99cd50b..cb8aabe1d 100644
--- a/tsl/test/expected/cagg_ddl-16.out
+++ b/tsl/test/expected/cagg_ddl-16.out
@@ -2025,3 +2025,52 @@ SELECT * FROM conditions_daily ORDER BY bucket, avg;
  NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
 (6 rows)
 
+-- Test TRUNCATE over a Realtime CAgg
+DROP MATERIALIZED VIEW conditions_daily;
+NOTICE:  drop cascades to 2 other objects
+CREATE MATERIALIZED VIEW conditions_daily
+WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
+SELECT location,
+       time_bucket(INTERVAL '1 day', time) AS bucket,
+       AVG(temperature)
+  FROM conditions
+GROUP BY location, bucket
+WITH NO DATA;
+SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'conditions_daily' \gset
+-- Check the current watermark for an empty CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_empty_cagg;
+       watermak_empty_cagg       
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Refresh the CAGG
+CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
+-- Check the watermark after the refresh and before truncate the CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_before;
+       watermak_before        
+------------------------------
+ Fri Nov 02 17:00:00 2018 PDT
+(1 row)
+
+-- Truncate the given CAgg, it should reset the watermark to the empty state
+TRUNCATE conditions_daily;
+-- Watermark should be reseted
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_after;
+         watermak_after          
+---------------------------------
+ Sun Nov 23 16:00:00 4714 PST BC
+(1 row)
+
+-- Should return ROWS because the watermark was reseted by the TRUNCATE
+SELECT * FROM conditions_daily ORDER BY bucket, avg;
+ location |            bucket            | avg 
+----------+------------------------------+-----
+ SFO      | Sun Dec 31 16:00:00 2017 PST |  55
+ SFO      | Mon Jan 01 16:00:00 2018 PST |  65
+ NYC      | Mon Jan 01 16:00:00 2018 PST |  65
+ por      | Mon Jan 01 16:00:00 2018 PST | 100
+ NYC      | Wed Oct 31 17:00:00 2018 PDT |  65
+ NYC      | Thu Nov 01 17:00:00 2018 PDT |  15
+(6 rows)
+
diff --git a/tsl/test/sql/cagg_ddl.sql.in b/tsl/test/sql/cagg_ddl.sql.in
index bb7fd24f0..9ba0a7df0 100644
--- a/tsl/test/sql/cagg_ddl.sql.in
+++ b/tsl/test/sql/cagg_ddl.sql.in
@@ -1288,3 +1288,35 @@ ALTER MATERIALIZED VIEW conditions_daily SET (timescaledb.materialized_only=true
 \d+ conditions_daily
 CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
 SELECT * FROM conditions_daily ORDER BY bucket, avg;
+
+-- Test TRUNCATE over a Realtime CAgg
+DROP MATERIALIZED VIEW conditions_daily;
+
+CREATE MATERIALIZED VIEW conditions_daily
+WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
+SELECT location,
+       time_bucket(INTERVAL '1 day', time) AS bucket,
+       AVG(temperature)
+  FROM conditions
+GROUP BY location, bucket
+WITH NO DATA;
+
+SELECT mat_hypertable_id FROM _timescaledb_catalog.continuous_agg WHERE user_view_name = 'conditions_daily' \gset
+
+-- Check the current watermark for an empty CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_empty_cagg;
+
+-- Refresh the CAGG
+CALL refresh_continuous_aggregate('conditions_daily', NULL, NULL);
+
+-- Check the watermark after the refresh and before truncate the CAgg
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_before;
+
+-- Truncate the given CAgg, it should reset the watermark to the empty state
+TRUNCATE conditions_daily;
+
+-- Watermark should be reseted
+SELECT _timescaledb_functions.to_timestamp(_timescaledb_functions.cagg_watermark(:mat_hypertable_id)) AS watermak_after;
+
+-- Should return ROWS because the watermark was reseted by the TRUNCATE
+SELECT * FROM conditions_daily ORDER BY bucket, avg;