Add refresh function for continuous aggregates

This change adds a new refresh function called
`refresh_continuous_aggregate` that allows refreshing a continuous
aggregate over a given window of data, called the "refresh window".

This is the first step in a larger overhaul of the continuous
aggregate feature with the goal of cleaning up the API and separating
policy from the core functionality.

Currently, the refresh function does a brute-force refresh of a window
and it bypasses the whole invalidation framework. Future updates
intend to integrate with this framework (with modifications) to
optimize refreshes. An exclusive lock is take on the continuous
aggregate's internal materialized hypertable in order to protect
against concurrent refreshing. However, as this serializes refreshes,
we might want to relax this locking in the future to allow, e.g.,
concurrent refreshes of non-overlapping windows.

The new refresh functionality includes basic tests for bad input and
refreshing across different windows. Unfortunately, a bug in the
optimization code for `time_bucket` causes timestamps to overflow the
allowed MAX time. Therefore, refresh windows that are close to the MAX
allowed size are not yet supported or tested.
This commit is contained in:
Erik Nordström 2020-07-09 15:29:29 +02:00 committed by Erik Nordström
parent a3a668e654
commit 84fd3b09b4
15 changed files with 847 additions and 39 deletions

View File

@ -211,5 +211,12 @@ AS '@MODULE_PATHNAME@', 'ts_distributed_exec' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION set_replication_factor(
hypertable REGCLASS,
replication_factor INTEGER
) RETURNS VOID
) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_hypertable_distributed_set_replication_factor' LANGUAGE C VOLATILE;
-- Refresh a continuous aggregate across the given window.
CREATE OR REPLACE FUNCTION refresh_continuous_aggregate(
cagg REGCLASS,
window_start "any",
window_end "any"
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_continuous_agg_refresh' LANGUAGE C VOLATILE;

View File

@ -473,6 +473,21 @@ ts_continuous_agg_find_userview_name(const char *schema, const char *name)
return ca;
}
/*
* Find a continuous agg object by the main relid.
*
* The relid is the user-facing object ID that represents the continuous
* aggregate (i.e., the query view's ID).
*/
ContinuousAgg *
ts_continuous_agg_find_by_relid(Oid relid)
{
const char *relname = get_rel_name(relid);
const char *schemaname = get_namespace_name(get_rel_namespace(relid));
return ts_continuous_agg_find_userview_name(schemaname, relname);
}
ContinuousAgg *
ts_continuous_agg_find_by_job_id(int32 job_id)
{

View File

@ -74,6 +74,7 @@ int64 ts_continuous_agg_get_completed_threshold(int32 materialization_id);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema,
const char *name);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_relid(Oid relid);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_job_id(int32 job_id);
extern void ts_continuous_agg_drop_view_callback(ContinuousAgg *ca, const char *schema,

View File

@ -58,6 +58,7 @@ CROSSMODULE_WRAPPER(decompress_chunk);
/* continous aggregate */
CROSSMODULE_WRAPPER(continuous_agg_invalidation_trigger);
CROSSMODULE_WRAPPER(continuous_agg_refresh);
CROSSMODULE_WRAPPER(data_node_ping);
CROSSMODULE_WRAPPER(data_node_block_new_chunks);
@ -355,6 +356,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.process_cagg_viewstmt = process_cagg_viewstmt_default,
.continuous_agg_drop_chunks_by_chunk_id = continuous_agg_drop_chunks_by_chunk_id_default,
.continuous_agg_invalidation_trigger = error_no_default_fn_pg_community,
.continuous_agg_refresh = error_no_default_fn_pg_community,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_materialize = cagg_materialize_default_fn,

View File

@ -88,6 +88,7 @@ typedef struct CrossModuleFunctions
Datum newer_than_datum, Oid older_than_type,
Oid newer_than_type, int32 log_level);
PGFunction continuous_agg_invalidation_trigger;
PGFunction continuous_agg_refresh;
void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options);

View File

@ -53,6 +53,7 @@ WHERE oid IN (
last
locf
move_chunk
refresh_continuous_aggregate
remove_compress_chunks_policy
remove_drop_chunks_policy
remove_reorder_policy
@ -70,5 +71,5 @@ WHERE oid IN (
timescaledb_fdw_validator
timescaledb_post_restore
timescaledb_pre_restore
(51 rows)
(52 rows)

View File

@ -5,5 +5,6 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/job.c
${CMAKE_CURRENT_SOURCE_DIR}/materialize.c
${CMAKE_CURRENT_SOURCE_DIR}/options.c
${CMAKE_CURRENT_SOURCE_DIR}/refresh.c
)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})

View File

@ -36,24 +36,6 @@
#include "continuous_aggs/materialize.h"
/***********************
* Time ranges
***********************/
typedef struct InternalTimeRange
{
Oid type;
int64 start; /* inclusive */
int64 end; /* exclusive */
} InternalTimeRange;
typedef struct TimeRange
{
Oid type;
Datum start;
Datum end;
} TimeRange;
static bool ranges_overlap(InternalTimeRange invalidation_range,
InternalTimeRange new_materialization_range);
static TimeRange internal_time_range_to_time_range(InternalTimeRange internal);
@ -1039,10 +1021,6 @@ insert_materialization_invalidation_logs(List *caggs, List *invalidations,
* materialization support *
***************************/
static void update_materializations(SchemaAndName partial_view, SchemaAndName materialization_table,
Name time_column_name,
InternalTimeRange new_materialization_range, int64 bucket_width,
InternalTimeRange invalidation_range);
static void spi_update_materializations(SchemaAndName partial_view,
SchemaAndName materialization_table, Name time_column_name,
TimeRange invalidation_range);
@ -1142,13 +1120,12 @@ continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable_id,
materialization_table_name.name = &materialization_table->fd.table_name;
/* to prevent deadlocks later on, lock the tables we will be inserting to now */
update_materializations(partial_view,
materialization_table_name,
&time_column_name,
new_materialization_range,
bucket_width,
new_invalidation_range);
continuous_agg_update_materialization(partial_view,
materialization_table_name,
&time_column_name,
new_materialization_range,
new_invalidation_range,
bucket_width);
/* update the completed watermark */
ts_catalog_database_info_become_owner(ts_catalog_database_info_get(), &sec_ctx);
@ -1292,9 +1269,10 @@ invalidation_threshold_get(int32 hypertable_id)
}
void
update_materializations(SchemaAndName partial_view, SchemaAndName materialization_table,
Name time_column_name, InternalTimeRange new_materialization_range,
int64 bucket_width, InternalTimeRange invalidation_range)
continuous_agg_update_materialization(SchemaAndName partial_view,
SchemaAndName materialization_table, Name time_column_name,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range, int64 bucket_width)
{
InternalTimeRange combined_materialization_range = new_materialization_range;
bool materialize_invalidations_separately = range_length(invalidation_range) > 0;
@ -1552,7 +1530,6 @@ spi_delete_materializations(SchemaAndName materialization_table, Name time_colum
char *invalidation_end;
getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena);
invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start);
invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end);
@ -1589,7 +1566,6 @@ spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materializ
char *materialization_end;
getTypeOutputInfo(materialization_range.type, &out_fn, &type_is_varlena);
materialization_start = OidOutputFunctionCall(out_fn, materialization_range.start);
materialization_end = OidOutputFunctionCall(out_fn, materialization_range.end);
@ -1611,8 +1587,8 @@ spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materializ
NULL /*=Values*/,
NULL /*=Nulls*/,
false /*=read_only*/,
0 /*count*/
);
0 /*count*/);
if (res < 0)
elog(ERROR, "could materialize values into the materialization table");
}

View File

@ -24,6 +24,24 @@ typedef struct Invalidation
int64 greatest_modified_value;
} Invalidation;
/***********************
* Time ranges
***********************/
typedef struct TimeRange
{
Oid type;
Datum start;
Datum end;
} TimeRange;
typedef struct InternalTimeRange
{
Oid type;
int64 start; /* inclusive */
int64 end; /* exclusive */
} InternalTimeRange;
int64 invalidation_threshold_get(int32 raw_hypertable_id);
bool continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *options);
void continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable_id,
@ -31,5 +49,11 @@ void continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable
int64 invalidation_range_start,
int64 invalidation_range_end,
int64 materialization_invalidation_threshold);
void continuous_agg_update_materialization(SchemaAndName partial_view,
SchemaAndName materialization_table,
Name time_column_name,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range,
int64 bucket_width);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_MATERIALIZE_H */

View File

@ -0,0 +1,371 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <utils/lsyscache.h>
#include <utils/fmgrprotos.h>
#include <storage/lmgr.h>
#include <miscadmin.h>
#include <fmgr.h>
#include <catalog.h>
#include <continuous_agg.h>
#include <dimension.h>
#include <hypertable.h>
#include <hypertable_cache.h>
#include <time_bucket.h>
#include <utils.h>
#include "refresh.h"
#include "materialize.h"
typedef struct CaggRefreshState
{
ContinuousAgg cagg;
Hypertable *cagg_ht;
InternalTimeRange refresh_window;
SchemaAndName partial_view;
} CaggRefreshState;
static Hypertable *
cagg_get_hypertable_or_fail(int32 hypertable_id)
{
Hypertable *ht = ts_hypertable_get_by_id(hypertable_id);
if (NULL == ht)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("invalid continuous aggregate state"),
errdetail("A continuous aggregate references a hypertable that does not exist.")));
return ht;
}
/*
* Get the largest window allowed given the time type of a continuous
* aggregate and other restrictions.
*
* The largest window allowed sets an upper bound on the range a refresh
* window can have given a specific time type. Note that this bound might be
* lower than supported by the time type itself. This is because we internally
* use UNIX epoch time for all non-integer types and must be able to convert
* back-and-forth between them.
*
* Note that each time type has its own range of possible time values. For
* instance, Date has a much bigger range than timestamp.
*
* This is further complicated by the fact that we internally convert
* timestamp and date types to UNIX time. The conversion ensures that all
* hypertables use the same epoch and ranges internally, although this adds
* further restrictions as we must be able to convert back-and-forth between
* types and time epochs.
*/
static InternalTimeRange
get_max_window(Oid timetype)
{
InternalTimeRange maxrange = {
.type = timetype,
};
/* PG checks for valid timestamps and dates in the range MIN <= time <
* END. So, for those types we subtract 1 from the MAX to get into the
* valid range. The MAX values account for ability to convert to UNIX time
* without overflow. */
switch (timetype)
{
case DATEOID:
/* Since our internal conversion turns a date into a timestamp,
* dates are governed by the same limits as timestamps. This is
* probably a limitation we want to do away with, even though it
* has little effect in practice. */
maxrange.start = DATETIME_MIN_JULIAN - POSTGRES_EPOCH_JDATE;
maxrange.end = TIMESTAMP_END_JULIAN - (2 * POSTGRES_EPOCH_JDATE) + UNIX_EPOCH_JDATE - 1;
break;
case TIMESTAMPTZOID:
case TIMESTAMPOID:
maxrange.start = MIN_TIMESTAMP;
maxrange.end = (END_TIMESTAMP - TS_EPOCH_DIFF_MICROSECONDS) - 1;
break;
case INT8OID:
case INT4OID:
case INT2OID:
/* There is no "date"/"time" range for integers so they can take
* any value. */
maxrange.start = PG_INT64_MIN;
maxrange.end = PG_INT64_MAX;
break;
default:
elog(ERROR, "unrecognized time type %d", timetype);
break;
}
maxrange.start = ts_time_value_to_internal(Int64GetDatum(maxrange.start), timetype);
maxrange.end = ts_time_value_to_internal(Int64GetDatum(maxrange.end), timetype);
return maxrange;
}
/*
* Compute the largest possible bucketed window given the time type and
* internal restrictions.
*
* The largest bucketed window is governed by restrictions set by the type and
* internal, TimescaleDB-specific legacy details (see get_max_window above for
* further explanation).
*/
static InternalTimeRange
get_largest_bucketed_window(Oid timetype, int64 bucket_width)
{
InternalTimeRange maxwindow = get_max_window(timetype);
InternalTimeRange maxbuckets;
/* For the MIN value, the corresponding bucket either falls on the exact
* MIN or it will be below it. Therefore, we add (bucket_width - 1) to
* move to the next bucket to be within the allowed range. */
maxwindow.start = maxwindow.start + bucket_width - 1;
maxbuckets.start = ts_time_bucket_by_type(bucket_width, maxwindow.start, timetype);
maxbuckets.end = ts_time_bucket_by_type(bucket_width, maxwindow.end, timetype);
return maxbuckets;
}
/*
* Adjust the refresh window to align with buckets in an inclusive manner.
*
* It is OK to refresh more than the given refresh window, but not less. Since
* we can only refresh along bucket boundaries, we need to adjust the refresh
* window to be inclusive in both ends to be able to refresh the given
* region. For example, if the dotted region below is the original window, the
* adjusted refresh window includes all four buckets shown.
*
* | ....|.....|.. |
*/
static InternalTimeRange
compute_bucketed_refresh_window(const InternalTimeRange *refresh_window, int64 bucket_width)
{
InternalTimeRange result = *refresh_window;
InternalTimeRange largest_bucketed_window =
get_largest_bucketed_window(refresh_window->type, bucket_width);
if (result.start <= largest_bucketed_window.start)
result.start = largest_bucketed_window.start;
else
result.start = ts_time_bucket_by_type(bucket_width, result.start, result.type);
if (result.end >= largest_bucketed_window.end)
result.end = largest_bucketed_window.end;
else
{
/* We get the time value for the start of the bucket, so need to add
* bucket_width to get the end of it */
result.end = ts_time_bucket_by_type(bucket_width, result.end, result.type);
result.end = result.end + bucket_width;
}
return result;
}
/*
* Initialize the refresh state for a continuous aggregate.
*
* The state holds information for executing a refresh of a continuous aggregate.
*/
static void
continuous_agg_refresh_init(CaggRefreshState *refresh, ContinuousAgg *cagg,
const InternalTimeRange *refresh_window)
{
MemSet(refresh, 0, sizeof(*refresh));
refresh->cagg = *cagg;
refresh->cagg_ht = cagg_get_hypertable_or_fail(cagg->data.mat_hypertable_id);
refresh->refresh_window =
compute_bucketed_refresh_window(refresh_window, cagg->data.bucket_width);
refresh->partial_view.schema = &cagg->data.partial_view_schema;
refresh->partial_view.name = &cagg->data.partial_view_name;
/* Lock the continuous aggregate's materialized hypertable to protect
* against concurrent refreshes. Only reads will be allowed. This is a
* heavy lock that serializes all refreshes. We might want to consider
* relaxing this in the future, e.g., we'd like to at least allow
* concurrent refreshes that don't have overlapping refresh windows.
*
* Concurrent refreshes on the same continuous aggregate could be achieved
* if we protect the aggregate with a UNIQUE constraint on the GROUP BY
* columns. This would allow concurrent refreshes, but overlapping ones
* might fail with, e.g., unique violation errors. Those could be
* captured, however, and ignored when we know it means someone else just
* did the same work.
*/
LockRelationOid(refresh->cagg_ht->main_table_relid, ExclusiveLock);
}
/*
* Execute a refresh.
*
* The refresh will materialize the area given by the refresh window in the
* refresh state.
*/
static void
continuous_agg_refresh_execute(const CaggRefreshState *refresh)
{
SchemaAndName cagg_hypertable_name = {
.schema = &refresh->cagg_ht->fd.schema_name,
.name = &refresh->cagg_ht->fd.table_name,
};
/* The materialization function takes two ranges, one for new data and one
* for invalidated data. A refresh just uses one of them so the other one
* has a zero range. */
InternalTimeRange unused_invalidation_range = {
.type = refresh->refresh_window.type,
.start = 0,
.end = 0,
};
Dimension *time_dim = hyperspace_get_open_dimension(refresh->cagg_ht->space, 0);
Assert(time_dim != NULL);
continuous_agg_update_materialization(refresh->partial_view,
cagg_hypertable_name,
&time_dim->fd.column_name,
refresh->refresh_window,
unused_invalidation_range,
refresh->cagg.data.bucket_width);
}
static void
continuous_agg_refresh_with_window(ContinuousAgg *cagg, const InternalTimeRange *refresh_window)
{
CaggRefreshState refresh;
continuous_agg_refresh_init(&refresh, cagg, refresh_window);
continuous_agg_refresh_execute(&refresh);
}
/*
* Get the time value for a given time argument in "internal" time.
*
* Since the window parameters are of type "any", there is no type information
* given to the function unless the user did an explicit cast. Thus, if the
* refresh is executed as follows:
*
* refresh_continuous_aggregate('daily_temp', '2020-10-01', '2020-10-04');
*
* then the argument type will be UNKNOWNOID. And the user would have to add
* explicit type casts:
*
* refresh_continuous_aggregate('daily_temp', '2020-10-01'::date, '2020-10-04'::date);
*
* However, we can easily handle the UNKNOWNOID case since we have the time
* type information in the continuous aggregate and we can try to convert the
* argument to that type.
*
* Thus, there are two cases:
*
* 1. An explicit cast was done --> the type is given in argtype.
* 2. No cast was done --> We try to convert the argument to the caggs time
* type.
*
* If an unsupported type is given, or the typeless argument has a nonsensical
* string, then there will be an error raised.
*/
static int64
get_time_value_from_arg(Datum arg, Oid argtype, Oid cagg_timetype)
{
if (!OidIsValid(argtype) || argtype == UNKNOWNOID)
{
/* No explicit cast was done by the user. Try to convert the argument
* to the time type used by the continuous aggregate. */
Oid infuncid = InvalidOid;
Oid typeioparam;
argtype = cagg_timetype;
getTypeInputInfo(argtype, &infuncid, &typeioparam);
switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid window parameter"),
errhint("The window parameter requires an explicit cast.")));
}
}
return ts_time_value_to_internal(arg, argtype);
}
/*
* Refresh a continuous aggregate across the given window.
*/
Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
ContinuousAgg *cagg;
Hypertable *cagg_ht;
Dimension *time_dim;
InternalTimeRange refresh_window = {
.type = InvalidOid,
.start = PG_INT64_MIN,
.end = PG_INT64_MAX,
};
PreventCommandIfReadOnly("refresh_continuous_aggregate()");
if (!OidIsValid(cagg_relid))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid continuous aggregate")));
cagg = ts_continuous_agg_find_by_relid(cagg_relid);
if (NULL == cagg)
{
const char *relname = get_rel_name(cagg_relid);
if (relname == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
(errmsg("continuous aggregate does not exist"))));
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
(errmsg("relation \"%s\" is not a continuous aggregate", relname))));
}
cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
Assert(cagg_ht != NULL);
time_dim = hyperspace_get_open_dimension(cagg_ht->space, 0);
Assert(time_dim != NULL);
refresh_window.type = ts_dimension_get_partition_type(time_dim);
if (!PG_ARGISNULL(1))
refresh_window.start = get_time_value_from_arg(PG_GETARG_DATUM(1),
get_fn_expr_argtype(fcinfo->flinfo, 1),
refresh_window.type);
if (!PG_ARGISNULL(2))
refresh_window.end = get_time_value_from_arg(PG_GETARG_DATUM(2),
get_fn_expr_argtype(fcinfo->flinfo, 2),
refresh_window.type);
if (refresh_window.start >= refresh_window.end)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid refresh window"),
errhint("The start of the window must be before the end.")));
continuous_agg_refresh_with_window(cagg, &refresh_window);
PG_RETURN_VOID();
}

View File

@ -0,0 +1,14 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H
#include <postgres.h>
#include <fmgr.h>
extern Datum continuous_agg_refresh(PG_FUNCTION_ARGS);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_REFRESH_H */

View File

@ -25,6 +25,7 @@
#include "continuous_aggs/insert.h"
#include "continuous_aggs/materialize.h"
#include "continuous_aggs/options.h"
#include "continuous_aggs/refresh.h"
#include "cross_module_fn.h"
#include "data_node_dispatch.h"
#include "data_node.h"
@ -118,6 +119,7 @@ CrossModuleFunctions tsl_cm_functions = {
.continuous_agg_drop_chunks_by_chunk_id = ts_continuous_agg_drop_chunks_by_chunk_id,
.continuous_agg_invalidation_trigger = continuous_agg_trigfn,
.continuous_agg_update_options = continuous_agg_update_options,
.continuous_agg_refresh = continuous_agg_refresh,
.compressed_data_decompress_forward = tsl_compressed_data_decompress_forward,
.compressed_data_decompress_reverse = tsl_compressed_data_decompress_reverse,
.compressed_data_send = tsl_compressed_data_send,

View File

@ -0,0 +1,268 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Disable background workers since we are testing manual refresh
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers
-------------------------
t
(1 row)
SET ROLE :ROLE_DEFAULT_PERM_USER;
CREATE TABLE conditions (time timestamptz NOT NULL, device int, temp float);
SELECT create_hypertable('conditions', 'time');
create_hypertable
-------------------------
(1,public,conditions,t)
(1 row)
SELECT setseed(.12);
setseed
---------
(1 row)
INSERT INTO conditions
SELECT t, ceil(abs(timestamp_hash(t::timestamp))%4)::int, abs(timestamp_hash(t::timestamp))%40
FROM generate_series('2020-05-01', '2020-05-05', '10 minutes'::interval) t;
-- Show the most recent data
SELECT * FROM conditions
ORDER BY time DESC, device
LIMIT 10;
time | device | temp
------------------------------+--------+------
Tue May 05 00:00:00 2020 PDT | 2 | 30
Mon May 04 23:50:00 2020 PDT | 2 | 10
Mon May 04 23:40:00 2020 PDT | 0 | 20
Mon May 04 23:30:00 2020 PDT | 1 | 1
Mon May 04 23:20:00 2020 PDT | 2 | 34
Mon May 04 23:10:00 2020 PDT | 1 | 37
Mon May 04 23:00:00 2020 PDT | 0 | 4
Mon May 04 22:50:00 2020 PDT | 2 | 10
Mon May 04 22:40:00 2020 PDT | 1 | 37
Mon May 04 22:30:00 2020 PDT | 0 | 8
(10 rows)
CREATE VIEW daily_temp
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2;
NOTICE: adding index _materialized_hypertable_2_device_day_idx ON _timescaledb_internal._materialized_hypertable_2 USING BTREE(device, day)
-- The continuous aggregate should be empty
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
-----+--------+----------
(0 rows)
-- Refresh the most recent few days:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05');
refresh_continuous_aggregate
------------------------------
(1 row)
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 17:00:00 2020 PDT | 0 | 19.3846153846154
Mon May 04 17:00:00 2020 PDT | 1 | 16.5555555555556
Mon May 04 17:00:00 2020 PDT | 2 | 18.5714285714286
Mon May 04 17:00:00 2020 PDT | 3 | 23.5714285714286
Sun May 03 17:00:00 2020 PDT | 0 | 15.7647058823529
Sun May 03 17:00:00 2020 PDT | 1 | 24.3142857142857
Sun May 03 17:00:00 2020 PDT | 2 | 14.8205128205128
Sun May 03 17:00:00 2020 PDT | 3 | 18.1111111111111
Sat May 02 17:00:00 2020 PDT | 0 | 17
Sat May 02 17:00:00 2020 PDT | 1 | 18.75
Sat May 02 17:00:00 2020 PDT | 2 | 20
Sat May 02 17:00:00 2020 PDT | 3 | 21.5217391304348
(12 rows)
-- Refresh the rest
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03');
refresh_continuous_aggregate
------------------------------
(1 row)
-- Compare the aggregate to the equivalent query on the source table
SELECT * FROM daily_temp
ORDER BY day DESC, device;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 17:00:00 2020 PDT | 0 | 19.3846153846154
Mon May 04 17:00:00 2020 PDT | 1 | 16.5555555555556
Mon May 04 17:00:00 2020 PDT | 2 | 18.5714285714286
Mon May 04 17:00:00 2020 PDT | 3 | 23.5714285714286
Sun May 03 17:00:00 2020 PDT | 0 | 15.7647058823529
Sun May 03 17:00:00 2020 PDT | 1 | 24.3142857142857
Sun May 03 17:00:00 2020 PDT | 2 | 14.8205128205128
Sun May 03 17:00:00 2020 PDT | 3 | 18.1111111111111
Sat May 02 17:00:00 2020 PDT | 0 | 17
Sat May 02 17:00:00 2020 PDT | 1 | 18.75
Sat May 02 17:00:00 2020 PDT | 2 | 20
Sat May 02 17:00:00 2020 PDT | 3 | 21.5217391304348
Fri May 01 17:00:00 2020 PDT | 0 | 19
Fri May 01 17:00:00 2020 PDT | 1 | 15.1463414634146
Fri May 01 17:00:00 2020 PDT | 2 | 19.7674418604651
Fri May 01 17:00:00 2020 PDT | 3 | 22.25
Thu Apr 30 17:00:00 2020 PDT | 0 | 17.6666666666667
Thu Apr 30 17:00:00 2020 PDT | 1 | 18.8333333333333
Thu Apr 30 17:00:00 2020 PDT | 2 | 16.7586206896552
Thu Apr 30 17:00:00 2020 PDT | 3 | 20.76
(20 rows)
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2
ORDER BY 1 DESC,2;
day | device | avg_temp
------------------------------+--------+------------------
Mon May 04 17:00:00 2020 PDT | 0 | 19.3846153846154
Mon May 04 17:00:00 2020 PDT | 1 | 16.5555555555556
Mon May 04 17:00:00 2020 PDT | 2 | 18.5714285714286
Mon May 04 17:00:00 2020 PDT | 3 | 23.5714285714286
Sun May 03 17:00:00 2020 PDT | 0 | 15.7647058823529
Sun May 03 17:00:00 2020 PDT | 1 | 24.3142857142857
Sun May 03 17:00:00 2020 PDT | 2 | 14.8205128205128
Sun May 03 17:00:00 2020 PDT | 3 | 18.1111111111111
Sat May 02 17:00:00 2020 PDT | 0 | 17
Sat May 02 17:00:00 2020 PDT | 1 | 18.75
Sat May 02 17:00:00 2020 PDT | 2 | 20
Sat May 02 17:00:00 2020 PDT | 3 | 21.5217391304348
Fri May 01 17:00:00 2020 PDT | 0 | 19
Fri May 01 17:00:00 2020 PDT | 1 | 15.1463414634146
Fri May 01 17:00:00 2020 PDT | 2 | 19.7674418604651
Fri May 01 17:00:00 2020 PDT | 3 | 22.25
Thu Apr 30 17:00:00 2020 PDT | 0 | 17.6666666666667
Thu Apr 30 17:00:00 2020 PDT | 1 | 18.8333333333333
Thu Apr 30 17:00:00 2020 PDT | 2 | 16.7586206896552
Thu Apr 30 17:00:00 2020 PDT | 3 | 20.76
(20 rows)
-- Test unusual, but valid input
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::timestamptz, '2020-05-03'::date);
refresh_continuous_aggregate
------------------------------
(1 row)
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::date, '2020-05-03'::date);
refresh_continuous_aggregate
------------------------------
(1 row)
SELECT refresh_continuous_aggregate('daily_temp', 0, '2020-05-01');
refresh_continuous_aggregate
------------------------------
(1 row)
-- Unbounded window forward in time
\set ON_ERROR_STOP 0
-- Currently doesn't work due to timestamp overflow bug in a query optimization
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', NULL);
ERROR: timestamp out of range
SELECT refresh_continuous_aggregate('daily_temp', NULL, NULL);
ERROR: timestamp out of range
\set ON_ERROR_STOP 1
-- Unbounded window back in time
SELECT refresh_continuous_aggregate('daily_temp', NULL, '2020-05-01');
refresh_continuous_aggregate
------------------------------
(1 row)
-- Test bad input
\set ON_ERROR_STOP 0
-- Bad continuous aggregate name
SELECT refresh_continuous_aggregate(NULL, '2020-05-03', '2020-05-05');
ERROR: invalid continuous aggregate
SELECT refresh_continuous_aggregate('xyz', '2020-05-03', '2020-05-05');
ERROR: relation "xyz" does not exist at character 37
-- Valid object, but not a continuous aggregate
SELECT refresh_continuous_aggregate('conditions', '2020-05-03', '2020-05-05');
ERROR: relation "conditions" is not a continuous aggregate
-- Object ID with no object
SELECT refresh_continuous_aggregate(1, '2020-05-03', '2020-05-05');
ERROR: continuous aggregate does not exist
-- Lacking arguments
SELECT refresh_continuous_aggregate('daily_temp');
ERROR: function refresh_continuous_aggregate(unknown) does not exist at character 8
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03');
ERROR: function refresh_continuous_aggregate(unknown, unknown) does not exist at character 8
-- Bad time ranges
SELECT refresh_continuous_aggregate('daily_temp', 'xyz', '2020-05-05');
ERROR: invalid input syntax for type timestamp with time zone: "xyz"
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', 'xyz');
ERROR: invalid input syntax for type timestamp with time zone: "xyz"
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-01');
ERROR: invalid refresh window
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-03');
ERROR: invalid refresh window
-- Bad time input
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::text, '2020-05-03'::text);
ERROR: unknown time type OID 25
\set ON_ERROR_STOP 1
-- Test different time types
CREATE TABLE conditions_date (time date NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_date', 'time');
create_hypertable
------------------------------
(3,public,conditions_date,t)
(1 row)
CREATE VIEW daily_temp_date
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions_date
GROUP BY 1,2;
NOTICE: adding index _materialized_hypertable_4_device_day_idx ON _timescaledb_internal._materialized_hypertable_4 USING BTREE(device, day)
SELECT refresh_continuous_aggregate('daily_temp_date', '2020-05-01', '2020-05-03');
refresh_continuous_aggregate
------------------------------
(1 row)
CREATE TABLE conditions_int (time int NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_int', 'time', chunk_time_interval => 10);
create_hypertable
-----------------------------
(5,public,conditions_int,t)
(1 row)
CREATE OR REPLACE FUNCTION integer_now_conditions()
RETURNS int LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM conditions_int
$$;
SELECT set_integer_now_func('conditions_int', 'integer_now_conditions');
set_integer_now_func
----------------------
(1 row)
CREATE VIEW daily_temp_int
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket(4, time) AS day, device, avg(temp) AS avg_temp
FROM conditions_int
GROUP BY 1,2;
NOTICE: adding index _materialized_hypertable_6_device_day_idx ON _timescaledb_internal._materialized_hypertable_6 USING BTREE(device, day)
SELECT refresh_continuous_aggregate('daily_temp_int', 5, 10);
refresh_continuous_aggregate
------------------------------
(1 row)

View File

@ -3,6 +3,7 @@ set(TEST_FILES
continuous_aggs_errors.sql
continuous_aggs_permissions.sql
continuous_aggs_watermark.sql
continuous_aggs_refresh.sql
edition.sql
gapfill.sql
partialize_finalize.sql
@ -180,4 +181,3 @@ foreach(TEST_FILE ${TEST_FILES})
endif()
endforeach(TEST_FILE)
file(APPEND ${TEST_SCHEDULE} "\n")

View File

@ -0,0 +1,125 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Disable background workers since we are testing manual refresh
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT _timescaledb_internal.stop_background_workers();
SET ROLE :ROLE_DEFAULT_PERM_USER;
CREATE TABLE conditions (time timestamptz NOT NULL, device int, temp float);
SELECT create_hypertable('conditions', 'time');
SELECT setseed(.12);
INSERT INTO conditions
SELECT t, ceil(abs(timestamp_hash(t::timestamp))%4)::int, abs(timestamp_hash(t::timestamp))%40
FROM generate_series('2020-05-01', '2020-05-05', '10 minutes'::interval) t;
-- Show the most recent data
SELECT * FROM conditions
ORDER BY time DESC, device
LIMIT 10;
CREATE VIEW daily_temp
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2;
-- The continuous aggregate should be empty
SELECT * FROM daily_temp
ORDER BY day DESC, device;
-- Refresh the most recent few days:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-05');
SELECT * FROM daily_temp
ORDER BY day DESC, device;
-- Refresh the rest
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-03');
-- Compare the aggregate to the equivalent query on the source table
SELECT * FROM daily_temp
ORDER BY day DESC, device;
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1,2
ORDER BY 1 DESC,2;
-- Test unusual, but valid input
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::timestamptz, '2020-05-03'::date);
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::date, '2020-05-03'::date);
SELECT refresh_continuous_aggregate('daily_temp', 0, '2020-05-01');
-- Unbounded window forward in time
\set ON_ERROR_STOP 0
-- Currently doesn't work due to timestamp overflow bug in a query optimization
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', NULL);
SELECT refresh_continuous_aggregate('daily_temp', NULL, NULL);
\set ON_ERROR_STOP 1
-- Unbounded window back in time
SELECT refresh_continuous_aggregate('daily_temp', NULL, '2020-05-01');
-- Test bad input
\set ON_ERROR_STOP 0
-- Bad continuous aggregate name
SELECT refresh_continuous_aggregate(NULL, '2020-05-03', '2020-05-05');
SELECT refresh_continuous_aggregate('xyz', '2020-05-03', '2020-05-05');
-- Valid object, but not a continuous aggregate
SELECT refresh_continuous_aggregate('conditions', '2020-05-03', '2020-05-05');
-- Object ID with no object
SELECT refresh_continuous_aggregate(1, '2020-05-03', '2020-05-05');
-- Lacking arguments
SELECT refresh_continuous_aggregate('daily_temp');
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03');
-- Bad time ranges
SELECT refresh_continuous_aggregate('daily_temp', 'xyz', '2020-05-05');
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', 'xyz');
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-01');
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-03', '2020-05-03');
-- Bad time input
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01'::text, '2020-05-03'::text);
\set ON_ERROR_STOP 1
-- Test different time types
CREATE TABLE conditions_date (time date NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_date', 'time');
CREATE VIEW daily_temp_date
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 day', time) AS day, device, avg(temp) AS avg_temp
FROM conditions_date
GROUP BY 1,2;
SELECT refresh_continuous_aggregate('daily_temp_date', '2020-05-01', '2020-05-03');
CREATE TABLE conditions_int (time int NOT NULL, device int, temp float);
SELECT create_hypertable('conditions_int', 'time', chunk_time_interval => 10);
CREATE OR REPLACE FUNCTION integer_now_conditions()
RETURNS int LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM conditions_int
$$;
SELECT set_integer_now_func('conditions_int', 'integer_now_conditions');
CREATE VIEW daily_temp_int
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket(4, time) AS day, device, avg(temp) AS avg_temp
FROM conditions_int
GROUP BY 1,2;
SELECT refresh_continuous_aggregate('daily_temp_int', 5, 10);