mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 11:03:36 +08:00
Make continuous agg queries more efficient
Our Chunk Exclusion and Ordered Append cannot understand the variables passed in to the SPI, and thus were not firing from our materialization worker. To work around this issue, we input the arguments as constants using string formatting. Note that the same issue may occur for prepared statements, so we may need to fix this more generally in the executor.
This commit is contained in:
parent
c463d2634c
commit
bc55ca984e
@ -377,15 +377,13 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea
|
||||
Datum first_time_value;
|
||||
bool val_is_null;
|
||||
bool search_start_is_infinite = false;
|
||||
Oid arg_types[] = { time_type };
|
||||
Datum args[] = {
|
||||
internal_to_time_value_or_infinite(search_start, time_type, &search_start_is_infinite)
|
||||
};
|
||||
int nargs = 1;
|
||||
bool found_new_tuples = false;
|
||||
StringInfo command = makeStringInfo();
|
||||
int res;
|
||||
|
||||
Datum search_start_val =
|
||||
internal_to_time_value_or_infinite(search_start, time_type, &search_start_is_infinite);
|
||||
|
||||
if (search_start_is_infinite && search_start > 0)
|
||||
{
|
||||
/* the previous completed time was +infinity, there can be no new ranges */
|
||||
@ -403,11 +401,10 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea
|
||||
* compared to the materialization.
|
||||
* Ordered append append also fires, so we never scan beyond the first and last chunks
|
||||
*/
|
||||
if (search_start_is_infinite && search_start < 0)
|
||||
if (search_start_is_infinite)
|
||||
{
|
||||
/* previous completed time is -infinity, or does not exist, so we must scan from the
|
||||
* beginning */
|
||||
nargs = 0;
|
||||
appendStringInfo(command,
|
||||
"SELECT max(%s), min(%s) FROM %s.%s",
|
||||
quote_identifier(NameStr(*time_column)),
|
||||
@ -417,24 +414,32 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea
|
||||
}
|
||||
else
|
||||
{
|
||||
Oid out_fn;
|
||||
bool type_is_varlena;
|
||||
char *search_start_str;
|
||||
|
||||
getTypeOutputInfo(time_type, &out_fn, &type_is_varlena);
|
||||
|
||||
search_start_str = OidOutputFunctionCall(out_fn, search_start_val);
|
||||
|
||||
*min_out = search_start;
|
||||
/* normal case, add a WHERE to take advantage of chunk constraints */
|
||||
/*We handled the +infinity case above*/
|
||||
Assert(!search_start_is_infinite);
|
||||
nargs = 1;
|
||||
appendStringInfo(command,
|
||||
"SELECT max(%s), min(%s) FROM %s.%s WHERE %s >= $1",
|
||||
"SELECT max(%s), min(%s) FROM %s.%s WHERE %s >= %s",
|
||||
quote_identifier(NameStr(*time_column)),
|
||||
quote_identifier(NameStr(*time_column)),
|
||||
quote_identifier(NameStr(*hypertable.schema)),
|
||||
quote_identifier(NameStr(*hypertable.name)),
|
||||
quote_identifier(NameStr(*time_column)));
|
||||
quote_identifier(NameStr(*time_column)),
|
||||
quote_literal_cstr(search_start_str));
|
||||
}
|
||||
|
||||
res = SPI_execute_with_args(command->data,
|
||||
nargs /*=nargs*/,
|
||||
arg_types,
|
||||
args,
|
||||
0 /*=nargs*/,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL /*=Nulls*/,
|
||||
true /*=read_only*/,
|
||||
0 /*count*/);
|
||||
@ -993,25 +998,30 @@ spi_delete_materializations(SchemaAndName materialization_table, Name time_colum
|
||||
{
|
||||
int res;
|
||||
StringInfo command = makeStringInfo();
|
||||
Oid arg_types[2];
|
||||
Datum args[2];
|
||||
Oid out_fn;
|
||||
bool type_is_varlena;
|
||||
char *invalidation_start;
|
||||
char *invalidation_end;
|
||||
|
||||
arg_types[0] = invalidation_range.type;
|
||||
args[0] = invalidation_range.start;
|
||||
getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena);
|
||||
|
||||
invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start);
|
||||
invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end);
|
||||
|
||||
arg_types[1] = invalidation_range.type;
|
||||
args[1] = invalidation_range.end;
|
||||
appendStringInfo(command,
|
||||
"DELETE FROM %s.%s AS D WHERE "
|
||||
"D.%s >= $1 AND D.%s < $2;",
|
||||
"D.%s >= %s AND D.%s < %s;",
|
||||
quote_identifier(NameStr(*materialization_table.schema)),
|
||||
quote_identifier(NameStr(*materialization_table.name)),
|
||||
quote_identifier(NameStr(*time_column_name)),
|
||||
quote_identifier(NameStr(*time_column_name)));
|
||||
quote_literal_cstr(invalidation_start),
|
||||
quote_identifier(NameStr(*time_column_name)),
|
||||
quote_literal_cstr(invalidation_end));
|
||||
|
||||
res = SPI_execute_with_args(command->data,
|
||||
2 /*=nargs*/,
|
||||
arg_types,
|
||||
args,
|
||||
0 /*=nargs*/,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL /*=Nulls*/,
|
||||
false /*=read_only*/,
|
||||
0 /*count*/);
|
||||
@ -1025,28 +1035,32 @@ spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materializ
|
||||
{
|
||||
int res;
|
||||
StringInfo command = makeStringInfo();
|
||||
Oid arg_types[2];
|
||||
Datum args[2];
|
||||
Oid out_fn;
|
||||
bool type_is_varlena;
|
||||
char *materialization_start;
|
||||
char *materialization_end;
|
||||
|
||||
arg_types[0] = materialization_range.type;
|
||||
args[0] = materialization_range.start;
|
||||
getTypeOutputInfo(materialization_range.type, &out_fn, &type_is_varlena);
|
||||
|
||||
materialization_start = OidOutputFunctionCall(out_fn, materialization_range.start);
|
||||
materialization_end = OidOutputFunctionCall(out_fn, materialization_range.end);
|
||||
|
||||
arg_types[1] = materialization_range.type;
|
||||
args[1] = materialization_range.end;
|
||||
appendStringInfo(command,
|
||||
"INSERT INTO %s.%s SELECT * FROM %s.%s AS I "
|
||||
"WHERE I.%s >= $1 AND I.%s < $2;",
|
||||
"WHERE I.%s >= %s AND I.%s < %s;",
|
||||
quote_identifier(NameStr(*materialization_table.schema)),
|
||||
quote_identifier(NameStr(*materialization_table.name)),
|
||||
quote_identifier(NameStr(*partial_view.schema)),
|
||||
quote_identifier(NameStr(*partial_view.name)),
|
||||
quote_identifier(NameStr(*time_column_name)),
|
||||
quote_identifier(NameStr(*time_column_name)));
|
||||
quote_literal_cstr(materialization_start),
|
||||
quote_identifier(NameStr(*time_column_name)),
|
||||
quote_literal_cstr(materialization_end));
|
||||
|
||||
res = SPI_execute_with_args(command->data,
|
||||
2 /*=nargs*/,
|
||||
arg_types,
|
||||
args,
|
||||
0 /*=nargs*/,
|
||||
NULL /*=argtypes*/,
|
||||
NULL /*=Values*/,
|
||||
NULL /*=Nulls*/,
|
||||
false /*=read_only*/,
|
||||
0 /*count*/
|
||||
|
@ -613,11 +613,6 @@ CREATE VIEW test_t_mat_view
|
||||
FROM continuous_agg_test_t
|
||||
GROUP BY 1;
|
||||
NOTICE: adding not-null constraint to column "time_partition_col"
|
||||
--TODO this should be created as part of CREATE VIEW
|
||||
SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='continuous_agg_test_t' \gset
|
||||
CREATE TRIGGER continuous_agg_insert_trigger
|
||||
AFTER INSERT ON continuous_agg_test_t
|
||||
FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.continuous_agg_invalidation_trigger(:raw_table_id);
|
||||
SELECT mat_hypertable_id, raw_hypertable_id, user_view_schema, user_view_name,
|
||||
partial_view_schema, partial_view_name,
|
||||
_timescaledb_internal.to_timestamp(bucket_width), _timescaledb_internal.to_interval(refresh_lag),
|
||||
|
@ -264,11 +264,6 @@ CREATE VIEW test_t_mat_view
|
||||
AS SELECT time_bucket('2 hours', time), COUNT(data) as value
|
||||
FROM continuous_agg_test_t
|
||||
GROUP BY 1;
|
||||
--TODO this should be created as part of CREATE VIEW
|
||||
SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='continuous_agg_test_t' \gset
|
||||
CREATE TRIGGER continuous_agg_insert_trigger
|
||||
AFTER INSERT ON continuous_agg_test_t
|
||||
FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.continuous_agg_invalidation_trigger(:raw_table_id);
|
||||
|
||||
SELECT mat_hypertable_id, raw_hypertable_id, user_view_schema, user_view_name,
|
||||
partial_view_schema, partial_view_name,
|
||||
|
Loading…
x
Reference in New Issue
Block a user