Restrict watermark to max for continuous aggregates

Set the threshold for continuous aggregates as the
max value in the raw hypertable when the max value
is lesser than the computed now time. This helps avoid
unnecessary materialization checks for data ranges
that do not exist. As a result, we also prevent
unnecessary writes to the thresholds and invalidation
log tables.
This commit is contained in:
gayyappan 2020-03-12 10:28:27 -04:00 committed by gayyappan
parent 9b3d2e6d52
commit ce624d61d3
14 changed files with 383 additions and 63 deletions

View File

@ -377,8 +377,9 @@ get_continuous_agg(int32 mat_hypertable_id)
return cagg;
}
static int64 hypertable_get_min(SchemaAndName hypertable, Name time_column, Oid time_type,
bool *found);
static bool hypertable_get_min_and_max_time_value(SchemaAndName hypertable, Name time_column,
int64 search_start, Oid time_type, int64 *min_out,
int64 *max_out);
static int64
get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materialization_id,
@ -396,16 +397,22 @@ get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materiali
NameData time_column_name = time_column->fd.column_name;
Oid time_column_type = ts_dimension_get_partition_type(time_column);
int64 now_time = ts_get_now_internal(time_column);
int64 end_time, start_time;
int64 end_time, start_time, min_time, max_time;
bool found_new_tuples = false;
start_time = old_completed_threshold;
found_new_tuples = hypertable_get_min_and_max_time_value(hypertable,
&time_column_name,
old_completed_threshold,
time_column_type,
&min_time,
&max_time);
if (start_time == PG_INT64_MIN)
{
/* If there is no completion threshold yet set, find the minimum value stored in the
/* If there is no completion threshold yet set, use the minimum value stored in the
* hypertable */
bool found;
start_time = hypertable_get_min(hypertable, &time_column_name, time_column_type, &found);
if (!found)
if (!found_new_tuples)
{
if (verbose)
elog(LOG,
@ -417,6 +424,7 @@ get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materiali
*materializing_new_range = false;
return old_completed_threshold;
}
start_time = min_time;
}
/* check for values which would overflow 64 bit subtractionb */
@ -445,7 +453,8 @@ get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materiali
end_time = now_time - refresh_lag;
end_time = ts_time_bucket_by_type(bucket_width, end_time, time_column_type);
if (end_time <= start_time)
if (!found_new_tuples || (end_time <= start_time))
{
if (verbose)
elog(LOG,
@ -459,7 +468,25 @@ get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materiali
*materializing_new_range = false;
return old_completed_threshold;
}
if (max_time < end_time)
{
/* end_time is based on now(). But we might not have any new data to materialize.
* So limit the end_time to max value from the hypertable's data, adjusted
* by time_bucket width parameter (which is a positive value)
*/
int64 maxtime_corrected = ts_time_bucket_by_type(bucket_width, max_time, time_column_type);
if (maxtime_corrected <= max_time)
{
/* we need to include max_time in our materialization range as we materialize upto the
* end_time but not inclusive. i.e. [start_time , end_time)
*/
maxtime_corrected = int64_saturating_add(maxtime_corrected, bucket_width);
}
if (maxtime_corrected < end_time)
{
end_time = maxtime_corrected;
}
}
/* pin the end time to start_time + max_interval_per_job
* so we don't materialize more than max_interval_per_job per run
*/
@ -492,25 +519,74 @@ get_materialization_end_point_for_table(int32 raw_hypertable_id, int32 materiali
return end_time;
}
static int64
hypertable_get_min(SchemaAndName hypertable, Name time_column, Oid time_type, bool *found)
/* get min and max value for timestamp column for hypertable */
static bool
hypertable_get_min_and_max_time_value(SchemaAndName hypertable, Name time_column,
int64 search_start, Oid time_type, int64 *min_out,
int64 *max_out)
{
Datum min_time_datum;
bool val_is_null;
Datum last_time_value;
Datum first_time_value;
bool fval_is_null, lval_is_null;
bool search_start_is_infinite = false;
bool found_new_tuples = false;
StringInfo command = makeStringInfo();
int res;
int64 min_time_internal = PG_INT64_MIN;
*found = false;
Datum search_start_val =
internal_to_time_value_or_infinite(search_start, time_type, &search_start_is_infinite);
if (search_start_is_infinite && search_start > 0)
{
/* the previous completed time was +infinity, there can be no new ranges */
return false;
}
res = SPI_connect();
if (res != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI while search for new tuples");
appendStringInfo(command,
"SELECT min(%s) FROM %s.%s",
quote_identifier(NameStr(*time_column)),
quote_identifier(NameStr(*hypertable.schema)),
quote_identifier(NameStr(*hypertable.name)));
/* We always SELECT both max and min in the following queries. There are two cases
* 1. there is no index on time: then we want to perform only one seqscan.
* 2. there is a btree index on time: then postgres will transform the query
* into two index-only scans, which should add very little extra work
* compared to the materialization.
* Ordered append append also fires, so we never scan beyond the first and last chunks
*/
if (search_start_is_infinite)
{
/* previous completed time is -infinity, or does not exist, so we must scan from the
* beginning */
appendStringInfo(command,
"SELECT max(%s), min(%s) FROM %s.%s",
quote_identifier(NameStr(*time_column)),
quote_identifier(NameStr(*time_column)),
quote_identifier(NameStr(*hypertable.schema)),
quote_identifier(NameStr(*hypertable.name)));
}
else
{
Oid out_fn;
bool type_is_varlena;
char *search_start_str;
getTypeOutputInfo(time_type, &out_fn, &type_is_varlena);
search_start_str = OidOutputFunctionCall(out_fn, search_start_val);
*min_out = search_start;
/* normal case, add a WHERE to take advantage of chunk constraints */
/*We handled the +infinity case above*/
Assert(!search_start_is_infinite);
appendStringInfo(command,
"SELECT max(%s), min(%s) FROM %s.%s WHERE %s >= %s",
quote_identifier(NameStr(*time_column)),
quote_identifier(NameStr(*time_column)),
quote_identifier(NameStr(*hypertable.schema)),
quote_identifier(NameStr(*hypertable.name)),
quote_identifier(NameStr(*time_column)),
quote_literal_cstr(search_start_str));
}
res = SPI_execute_with_args(command->data,
0 /*=nargs*/,
@ -520,25 +596,25 @@ hypertable_get_min(SchemaAndName hypertable, Name time_column, Oid time_type, bo
true /*=read_only*/,
0 /*count*/);
if (res < 0)
elog(ERROR, "could not find the minimum time value for a hypertable");
elog(ERROR, "could not find the minimum/maximum time value for hypertable");
Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 1) == time_type);
Assert(SPI_gettypeid(SPI_tuptable->tupdesc, 2) == time_type);
if (SPI_processed == 1)
first_time_value =
SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2, &fval_is_null);
last_time_value = SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &lval_is_null);
Assert(fval_is_null == lval_is_null);
if (!lval_is_null)
{
min_time_datum =
SPI_getbinval(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1, &val_is_null);
/* min on an empty table returns a NULL so only process not-nulls */
if (!val_is_null)
{
min_time_internal = ts_time_value_to_internal(min_time_datum, time_type);
*found = true;
}
*min_out = ts_time_value_to_internal(first_time_value, time_type);
*max_out = ts_time_value_to_internal(last_time_value, time_type);
found_new_tuples = true;
}
res = SPI_finish();
Assert(res == SPI_OK_FINISH);
return min_time_internal;
return found_new_tuples;
}
/* materialization_invalidation_threshold is used only for materialization_invalidation_log

View File

@ -237,7 +237,6 @@ FROM conditions
GROUP BY device, time_bucket(INTERVAL '1 hour', "time");
NOTICE: adding index _materialized_hypertable_8_device_day_idx ON _timescaledb_internal._materialized_hypertable_8 USING BTREE(device, day)
REFRESH MATERIALIZED VIEW conditions_summary;
WARNING: REFRESH did not materialize the entire range since it was limited by the max_interval_per_job setting
ALTER TABLE conditions SET (timescaledb.compress);
ALTER VIEW conditions_summary SET (
timescaledb.ignore_invalidation_older_than = '15 days'

View File

@ -1154,7 +1154,7 @@ select time_bucket(100, timec), aggregate_to_test_ffunc_extra(timec, 1, 3, 'test
from conditions
group by time_bucket(100, timec);
REFRESH MATERIALIZED VIEW mat_ffunc_test;
LOG: materializing continuous aggregate public.mat_ffunc_test: nothing to invalidate, new range up to 400
LOG: materializing continuous aggregate public.mat_ffunc_test: nothing to invalidate, new range up to 300
SELECT * FROM mat_ffunc_test;
WARNING: type integer text
WARNING: type integer text
@ -1175,7 +1175,7 @@ select time_bucket(100, timec), aggregate_to_test_ffunc_extra(timec, 4, 5, bigin
from conditions
group by time_bucket(100, timec);
REFRESH MATERIALIZED VIEW mat_ffunc_test;
LOG: materializing continuous aggregate public.mat_ffunc_test: nothing to invalidate, new range up to 400
LOG: materializing continuous aggregate public.mat_ffunc_test: nothing to invalidate, new range up to 300
SELECT * FROM mat_ffunc_test;
WARNING: type integer bigint
WARNING: type integer bigint
@ -1200,7 +1200,7 @@ NOTICE: adding index _materialized_hypertable_29_location_time_partition_col_id
insert into conditions
select generate_series(0, 50, 10), 'NYC', 55, 75, 40, 70, NULL;
REFRESH MATERIALIZED VIEW mat_refresh_test;
LOG: materializing continuous aggregate public.mat_refresh_test: nothing to invalidate, new range up to 400
LOG: materializing continuous aggregate public.mat_refresh_test: nothing to invalidate, new range up to 300
SELECT * FROM mat_refresh_test order by 1,2 ;
location | max
----------+-----
@ -1217,7 +1217,7 @@ from conditions
group by time_bucket(100, timec), location;
NOTICE: adding index _materialized_hypertable_30_grp_3_3_time_bucket_idx ON _timescaledb_internal._materialized_hypertable_30 USING BTREE(grp_3_3, time_bucket)
REFRESH MATERIALIZED VIEW conditions_grpby_view;
LOG: materializing continuous aggregate public.conditions_grpby_view: nothing to invalidate, new range up to 400
LOG: materializing continuous aggregate public.conditions_grpby_view: nothing to invalidate, new range up to 300
select * from conditions_grpby_view order by 1, 2;
time_bucket | sum
-------------+-----
@ -1235,7 +1235,7 @@ having avg(temperature) > 0
;
NOTICE: adding index _materialized_hypertable_31_grp_3_3_time_bucket_idx ON _timescaledb_internal._materialized_hypertable_31 USING BTREE(grp_3_3, time_bucket)
REFRESH MATERIALIZED VIEW conditions_grpby_view2;
LOG: materializing continuous aggregate public.conditions_grpby_view2: nothing to invalidate, new range up to 400
LOG: materializing continuous aggregate public.conditions_grpby_view2: nothing to invalidate, new range up to 300
select * from conditions_grpby_view2 order by 1, 2;
time_bucket | sum
-------------+-----
@ -1272,7 +1272,7 @@ SET timescaledb.current_timestamp_mock = '2001-03-11';
insert into conditions values('2001-03-10', '1');
insert into conditions values('2001-03-10', '2');
REFRESH MATERIALIZED VIEW mat_test5;
LOG: materializing continuous aggregate public.mat_test5: nothing to invalidate, new range up to Sun Mar 11 16:00:00 2001 PST
LOG: materializing continuous aggregate public.mat_test5: nothing to invalidate, new range up to Sat Mar 10 16:00:00 2001 PST
SELECT * FROM mat_test5;
timec | maxt
------------------------------+------
@ -1282,7 +1282,7 @@ SELECT * FROM mat_test5;
insert into conditions values('2001-02-15', '1');
insert into conditions values('2001-01-15', '1');
REFRESH MATERIALIZED VIEW mat_test5;
LOG: new materialization range not found for public.conditions (time column time): not enough new data past completion threshold of Sun Mar 11 16:00:00 2001 PST as of Sun Mar 11 00:00:00 2001 PST
LOG: new materialization range not found for public.conditions (time column time): not enough new data past completion threshold of Sat Mar 10 16:00:00 2001 PST as of Sun Mar 11 00:00:00 2001 PST
LOG: materializing continuous aggregate public.mat_test5: processing invalidations, no new range
--will see the feb but not the january change
SELECT * FROM mat_test5;
@ -1300,7 +1300,8 @@ SET timescaledb.current_timestamp_mock = '2001-05-11';
--but not this one
insert into conditions values('2001-02-20', '4');
REFRESH MATERIALIZED VIEW mat_test5;
LOG: materializing continuous aggregate public.mat_test5: processing invalidations, new range up to Fri May 11 17:00:00 2001 PDT
LOG: new materialization range not found for public.conditions (time column time): not enough new data past completion threshold of Sat Mar 10 16:00:00 2001 PST as of Fri May 11 00:00:00 2001 PDT
LOG: materializing continuous aggregate public.mat_test5: processing invalidations, no new range
SELECT * FROM mat_test5;
timec | maxt
------------------------------+------
@ -1308,3 +1309,26 @@ SELECT * FROM mat_test5;
Wed Feb 14 16:00:00 2001 PST | 3
(2 rows)
--verify that watermark is limited by max value and not by
-- the current time (now value)--
SET timescaledb.current_timestamp_mock = '2018-05-11';
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';
view_name | completed_threshold | invalidation_threshold
-----------+------------------------------+------------------------------
mat_test5 | Sat Mar 10 16:00:00 2001 PST | Sat Mar 10 16:00:00 2001 PST
(1 row)
REFRESH MATERIALIZED VIEW mat_test5;
LOG: new materialization range not found for public.conditions (time column time): not enough new data past completion threshold of Sat Mar 10 16:00:00 2001 PST as of Fri May 11 00:00:00 2018 PDT
LOG: materializing continuous aggregate public.mat_test5: nothing to invalidate, no new range
LOG: materializing continuous aggregate public.mat_test5: no new range to materialize or invalidations found, exiting early
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';
view_name | completed_threshold | invalidation_threshold
-----------+------------------------------+------------------------------
mat_test5 | Sat Mar 10 16:00:00 2001 PST | Sat Mar 10 16:00:00 2001 PST
(1 row)

View File

@ -101,7 +101,7 @@ NOTICE: adding index _materialized_hypertable_4_location_bucket_idx ON _timesca
SET client_min_messages TO LOG;
SET timescaledb.current_timestamp_mock = '2019-02-01 00:00';
REFRESH MATERIALIZED VIEW mat_before;
LOG: materializing continuous aggregate public.mat_before: nothing to invalidate, new range up to Sun Feb 24 16:00:00 2019 PST
LOG: materializing continuous aggregate public.mat_before: nothing to invalidate, new range up to Sun Jan 06 16:00:00 2019 PST
SELECT count(*) FROM conditions_before;
count
-------

View File

@ -1493,3 +1493,107 @@ SELECT count(*) FROM timezone_test_summary;
DROP TABLE timezone_test CASCADE;
NOTICE: drop cascades to 2 other objects
NOTICE: drop cascades to table _timescaledb_internal._hyper_23_44_chunk
-- TESTS for integer based table to verify watermark limited by max value of time column and not by now
CREATE TABLE continuous_agg_int(time BIGINT, data BIGINT);
SELECT create_hypertable('continuous_agg_int', 'time', chunk_time_interval=> 10);
NOTICE: adding not-null constraint to column "time"
create_hypertable
----------------------------------
(24,public,continuous_agg_int,t)
(1 row)
CREATE OR REPLACE FUNCTION integer_now_continuous_agg_max() returns BIGINT LANGUAGE SQL STABLE as $$ SELECT BIGINT '9223372036854775807' $$;
SELECT set_integer_now_func('continuous_agg_int', 'integer_now_continuous_agg_max');
set_integer_now_func
----------------------
(1 row)
CREATE VIEW continuous_agg_int_max
WITH (timescaledb.continuous, timescaledb.refresh_lag='0')
AS SELECT time_bucket('10', time), COUNT(data) as value
FROM continuous_agg_int
GROUP BY 1;
INSERT INTO continuous_agg_int values (-10, 100), (1,100), (10, 100);
select chunk_table, ranges from chunk_relation_size('continuous_agg_int');
chunk_table | ranges
------------------------------------------+-------------
_timescaledb_internal._hyper_24_45_chunk | {"[-10,0)"}
_timescaledb_internal._hyper_24_46_chunk | {"[0,10)"}
_timescaledb_internal._hyper_24_47_chunk | {"[10,20)"}
(3 rows)
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
select * from continuous_agg_int_max;
time_bucket | value
-------------+-------
-10 | 1
0 | 1
10 | 1
(3 rows)
--watermark is 20
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_int_max';
view_name | completed_threshold | invalidation_threshold
------------------------+---------------------+------------------------
continuous_agg_int_max | 20 | 20
(1 row)
-- TEST that watermark is limited by max value from data and not by now()
CREATE TABLE continuous_agg_ts_max_t(timecol TIMESTAMPTZ, data integer);
SELECT create_hypertable('continuous_agg_ts_max_t', 'timecol', chunk_time_interval=>'365 days'::interval);
NOTICE: adding not-null constraint to column "timecol"
create_hypertable
---------------------------------------
(26,public,continuous_agg_ts_max_t,t)
(1 row)
CREATE VIEW continuous_agg_ts_max_view
WITH (timescaledb.continuous, timescaledb.max_interval_per_job='365 days', timescaledb.refresh_lag='-2 hours')
AS SELECT time_bucket('2 hours', timecol), COUNT(data) as value
FROM continuous_agg_ts_max_t
GROUP BY 1;
INSERT INTO continuous_agg_ts_max_t
values ('1969-01-01 1:00'::timestamptz, 10);
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';
view_name | completed_threshold | invalidation_threshold
----------------------------+------------------------+------------------------
continuous_agg_ts_max_view | 1969-01-01 03:00:00+05 | 1969-01-01 03:00:00+05
(1 row)
INSERT INTO continuous_agg_ts_max_t
values ('1970-01-01 1:00'::timestamptz, 10);
select chunk_table, ranges from chunk_relation_size('continuous_agg_ts_max_t');
chunk_table | ranges
------------------------------------------+---------------------------------------
_timescaledb_internal._hyper_26_50_chunk | {"[-63072000000000,-31536000000000)"}
_timescaledb_internal._hyper_26_52_chunk | {"[-31536000000000,0)"}
(2 rows)
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';
view_name | completed_threshold | invalidation_threshold
----------------------------+------------------------+------------------------
continuous_agg_ts_max_view | 1970-01-01 03:00:00+05 | 1970-01-01 03:00:00+05
(1 row)
-- no more new data to materialize, threshold should not change
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';
view_name | completed_threshold | invalidation_threshold
----------------------------+------------------------+------------------------
continuous_agg_ts_max_view | 1970-01-01 03:00:00+05 | 1970-01-01 03:00:00+05
(1 row)

View File

@ -49,8 +49,16 @@ from conditions
group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_2_location_timec_idx ON _timescaledb_internal._materialized_hypertable_2 USING BTREE(location, timec)
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
--compute time_bucketted max+bucket_width for the materialized view
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m1;
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--test first/last
create or replace view mat_m2(location, timec, firsth, lasth, maxtemp, mintemp)
WITH ( timescaledb.continuous, timescaledb.max_interval_per_job='365 days')
@ -61,8 +69,15 @@ group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_3_location_timec_idx ON _timescaledb_internal._materialized_hypertable_3 USING BTREE(location, timec)
--time that refresh assumes as now() for repeatability
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m2;
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--normal view --
create or replace view regview( location, timec, minl, sumt , sumh)
as

View File

@ -49,8 +49,16 @@ from conditions
group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_2_location_timec_idx ON _timescaledb_internal._materialized_hypertable_2 USING BTREE(location, timec)
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
--compute time_bucketted max+bucket_width for the materialized view
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m1;
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--test first/last
create or replace view mat_m2(location, timec, firsth, lasth, maxtemp, mintemp)
WITH ( timescaledb.continuous, timescaledb.max_interval_per_job='365 days')
@ -61,8 +69,15 @@ group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_3_location_timec_idx ON _timescaledb_internal._materialized_hypertable_3 USING BTREE(location, timec)
--time that refresh assumes as now() for repeatability
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m2;
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--normal view --
create or replace view regview( location, timec, minl, sumt , sumh)
as

View File

@ -49,8 +49,16 @@ from conditions
group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_2_location_timec_idx ON _timescaledb_internal._materialized_hypertable_2 USING BTREE(location, timec)
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
--compute time_bucketted max+bucket_width for the materialized view
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m1;
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m1: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--test first/last
create or replace view mat_m2(location, timec, firsth, lasth, maxtemp, mintemp)
WITH ( timescaledb.continuous, timescaledb.max_interval_per_job='365 days')
@ -61,8 +69,15 @@ group by time_bucket('1day', timec), location;
NOTICE: adding index _materialized_hypertable_3_location_timec_idx ON _timescaledb_internal._materialized_hypertable_3 USING BTREE(location, timec)
--time that refresh assumes as now() for repeatability
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
time_bucket
------------------------------
Sat Nov 03 17:00:00 2018 PDT
(1 row)
REFRESH MATERIALIZED VIEW mat_m2;
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Fri Dec 28 16:00:00 2018 PST
LOG: materializing continuous aggregate public.mat_m2: nothing to invalidate, new range up to Sat Nov 03 17:00:00 2018 PDT
--normal view --
create or replace view regview( location, timec, minl, sumt , sumh)
as

View File

@ -23,7 +23,7 @@ lock_mattable
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>
@ -49,7 +49,7 @@ step Setup2:
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 30
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';
@ -59,7 +59,7 @@ lock_mattable
step I1: INSERT INTO ts_continuous_test SELECT 0, i*10 FROM (SELECT generate_series(0, 10) AS i) AS i;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 35 as of 29
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29
LOG: materializing continuous aggregate public.continuous_view_2: processing invalidations, no new range
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
@ -96,7 +96,7 @@ step Setup2:
step AlterLag1: alter view continuous_view_1 set (timescaledb.refresh_lag = 10);
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 15
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
bkt cnt
@ -114,7 +114,7 @@ lock_mattable
step I2: INSERT INTO ts_continuous_test SELECT 40, 1000 ;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 50
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 45
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>

View File

@ -23,7 +23,7 @@ lock_mattable
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>
@ -49,7 +49,7 @@ step Setup2:
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 30
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';
@ -59,7 +59,7 @@ lock_mattable
step I1: INSERT INTO ts_continuous_test SELECT 0, i*10 FROM (SELECT generate_series(0, 10) AS i) AS i;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 35 as of 29
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29
LOG: materializing continuous aggregate public.continuous_view_2: processing invalidations, no new range
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
@ -96,7 +96,7 @@ step Setup2:
step AlterLag1: alter view continuous_view_1 set (timescaledb.refresh_lag = 10);
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 15
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
bkt cnt
@ -114,7 +114,7 @@ lock_mattable
step I2: INSERT INTO ts_continuous_test SELECT 40, 1000 ;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 50
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 45
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>

View File

@ -23,7 +23,7 @@ lock_mattable
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>
@ -49,7 +49,7 @@ step Setup2:
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 30
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step LockCompleted: BEGIN; LOCK TABLE _timescaledb_catalog.continuous_aggs_completed_threshold IN SHARE MODE;
step LockMat1: BEGIN; select lock_mattable(materialization_hypertable::text) from timescaledb_information.continuous_aggregates where view_name::text like 'continuous_view_1';
@ -59,7 +59,7 @@ lock_mattable
step I1: INSERT INTO ts_continuous_test SELECT 0, i*10 FROM (SELECT generate_series(0, 10) AS i) AS i;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 35 as of 29
LOG: new materialization range not found for public.ts_continuous_test (time column time): not enough new data past completion threshold of 30 as of 29
LOG: materializing continuous aggregate public.continuous_view_2: processing invalidations, no new range
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
@ -96,7 +96,7 @@ step Setup2:
step AlterLag1: alter view continuous_view_1 set (timescaledb.refresh_lag = 10);
LOG: materializing continuous aggregate public.continuous_view_1: nothing to invalidate, new range up to 15
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1;
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 35
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 30
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2;
step Refresh1_sel: select * from continuous_view_1 where bkt = 0 or bkt > 30
bkt cnt
@ -114,7 +114,7 @@ lock_mattable
step I2: INSERT INTO ts_continuous_test SELECT 40, 1000 ;
step Refresh1: REFRESH MATERIALIZED VIEW continuous_view_1; <waiting ...>
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 50
LOG: materializing continuous aggregate public.continuous_view_2: nothing to invalidate, new range up to 45
step Refresh2: REFRESH MATERIALIZED VIEW continuous_view_2; <waiting ...>
step UnlockCompleted: ROLLBACK;
step Refresh2: <... completed>

View File

@ -931,3 +931,13 @@ SET timescaledb.current_timestamp_mock = '2001-05-11';
insert into conditions values('2001-02-20', '4');
REFRESH MATERIALIZED VIEW mat_test5;
SELECT * FROM mat_test5;
--verify that watermark is limited by max value and not by
-- the current time (now value)--
SET timescaledb.current_timestamp_mock = '2018-05-11';
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';
REFRESH MATERIALIZED VIEW mat_test5;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'mat_test5';

View File

@ -696,3 +696,60 @@ REFRESH MATERIALIZED VIEW timezone_test_summary;
SELECT count(*) FROM timezone_test_summary;
DROP TABLE timezone_test CASCADE;
-- TESTS for integer based table to verify watermark limited by max value of time column and not by now
CREATE TABLE continuous_agg_int(time BIGINT, data BIGINT);
SELECT create_hypertable('continuous_agg_int', 'time', chunk_time_interval=> 10);
CREATE OR REPLACE FUNCTION integer_now_continuous_agg_max() returns BIGINT LANGUAGE SQL STABLE as $$ SELECT BIGINT '9223372036854775807' $$;
SELECT set_integer_now_func('continuous_agg_int', 'integer_now_continuous_agg_max');
CREATE VIEW continuous_agg_int_max
WITH (timescaledb.continuous, timescaledb.refresh_lag='0')
AS SELECT time_bucket('10', time), COUNT(data) as value
FROM continuous_agg_int
GROUP BY 1;
INSERT INTO continuous_agg_int values (-10, 100), (1,100), (10, 100);
select chunk_table, ranges from chunk_relation_size('continuous_agg_int');
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
REFRESH MATERIALIZED VIEW continuous_agg_int_max;
select * from continuous_agg_int_max;
--watermark is 20
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_int_max';
-- TEST that watermark is limited by max value from data and not by now()
CREATE TABLE continuous_agg_ts_max_t(timecol TIMESTAMPTZ, data integer);
SELECT create_hypertable('continuous_agg_ts_max_t', 'timecol', chunk_time_interval=>'365 days'::interval);
CREATE VIEW continuous_agg_ts_max_view
WITH (timescaledb.continuous, timescaledb.max_interval_per_job='365 days', timescaledb.refresh_lag='-2 hours')
AS SELECT time_bucket('2 hours', timecol), COUNT(data) as value
FROM continuous_agg_ts_max_t
GROUP BY 1;
INSERT INTO continuous_agg_ts_max_t
values ('1969-01-01 1:00'::timestamptz, 10);
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';
INSERT INTO continuous_agg_ts_max_t
values ('1970-01-01 1:00'::timestamptz, 10);
select chunk_table, ranges from chunk_relation_size('continuous_agg_ts_max_t');
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';
-- no more new data to materialize, threshold should not change
REFRESH MATERIALIZED VIEW continuous_agg_ts_max_view;
SELECT view_name, completed_threshold, invalidation_threshold
FROM timescaledb_information.continuous_aggregate_stats
where view_name::text like 'continuous_agg_ts_max_view';

View File

@ -53,6 +53,9 @@ from conditions
group by time_bucket('1day', timec), location;
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
--compute time_bucketted max+bucket_width for the materialized view
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
REFRESH MATERIALIZED VIEW mat_m1;
--test first/last
@ -64,6 +67,8 @@ from conditions
group by time_bucket('1day', timec), location;
--time that refresh assumes as now() for repeatability
SET timescaledb.current_timestamp_mock = '2018-12-31 00:00';
SELECT time_bucket('1day' , q.timeval+ '1day'::interval)
FROM ( select max(timec)as timeval from conditions ) as q;
REFRESH MATERIALIZED VIEW mat_m2;
--normal view --