From bc55ca984ef457588cf059e91022702f8e16c880 Mon Sep 17 00:00:00 2001 From: Joshua Lockerman Date: Tue, 23 Apr 2019 12:43:45 -0400 Subject: [PATCH] Make continuous agg queries more efficient Our Chunk Exclusion and Ordered Append cannot understand the variables passed in to the SPI, and thus were not firing from our materialization worker. To work around this issue, we input the arguments as constants using string formatting. Note that the same issue may occur for prepared statements, so we may need to fix this more generally in the executor. --- tsl/src/continuous_aggs/materialize.c | 84 +++++++++++-------- .../expected/continuous_aggs_materialize.out | 5 -- tsl/test/sql/continuous_aggs_materialize.sql | 5 -- 3 files changed, 49 insertions(+), 45 deletions(-) diff --git a/tsl/src/continuous_aggs/materialize.c b/tsl/src/continuous_aggs/materialize.c index b2a292bf1..a619440dd 100644 --- a/tsl/src/continuous_aggs/materialize.c +++ b/tsl/src/continuous_aggs/materialize.c @@ -377,15 +377,13 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea Datum first_time_value; bool val_is_null; bool search_start_is_infinite = false; - Oid arg_types[] = { time_type }; - Datum args[] = { - internal_to_time_value_or_infinite(search_start, time_type, &search_start_is_infinite) - }; - int nargs = 1; bool found_new_tuples = false; StringInfo command = makeStringInfo(); int res; + Datum search_start_val = + internal_to_time_value_or_infinite(search_start, time_type, &search_start_is_infinite); + if (search_start_is_infinite && search_start > 0) { /* the previous completed time was +infinity, there can be no new ranges */ @@ -403,11 +401,10 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea * compared to the materialization. * Ordered append append also fires, so we never scan beyond the first and last chunks */ - if (search_start_is_infinite && search_start < 0) + if (search_start_is_infinite) { /* previous completed time is -infinity, or does not exist, so we must scan from the * beginning */ - nargs = 0; appendStringInfo(command, "SELECT max(%s), min(%s) FROM %s.%s", quote_identifier(NameStr(*time_column)), @@ -417,24 +414,32 @@ hypertable_get_min_and_max(SchemaAndName hypertable, Name time_column, int64 sea } else { + Oid out_fn; + bool type_is_varlena; + char *search_start_str; + + getTypeOutputInfo(time_type, &out_fn, &type_is_varlena); + + search_start_str = OidOutputFunctionCall(out_fn, search_start_val); + *min_out = search_start; /* normal case, add a WHERE to take advantage of chunk constraints */ /*We handled the +infinity case above*/ Assert(!search_start_is_infinite); - nargs = 1; appendStringInfo(command, - "SELECT max(%s), min(%s) FROM %s.%s WHERE %s >= $1", + "SELECT max(%s), min(%s) FROM %s.%s WHERE %s >= %s", quote_identifier(NameStr(*time_column)), quote_identifier(NameStr(*time_column)), quote_identifier(NameStr(*hypertable.schema)), quote_identifier(NameStr(*hypertable.name)), - quote_identifier(NameStr(*time_column))); + quote_identifier(NameStr(*time_column)), + quote_literal_cstr(search_start_str)); } res = SPI_execute_with_args(command->data, - nargs /*=nargs*/, - arg_types, - args, + 0 /*=nargs*/, + NULL, + NULL, NULL /*=Nulls*/, true /*=read_only*/, 0 /*count*/); @@ -993,25 +998,30 @@ spi_delete_materializations(SchemaAndName materialization_table, Name time_colum { int res; StringInfo command = makeStringInfo(); - Oid arg_types[2]; - Datum args[2]; + Oid out_fn; + bool type_is_varlena; + char *invalidation_start; + char *invalidation_end; - arg_types[0] = invalidation_range.type; - args[0] = invalidation_range.start; + getTypeOutputInfo(invalidation_range.type, &out_fn, &type_is_varlena); + + invalidation_start = OidOutputFunctionCall(out_fn, invalidation_range.start); + invalidation_end = OidOutputFunctionCall(out_fn, invalidation_range.end); - arg_types[1] = invalidation_range.type; - args[1] = invalidation_range.end; appendStringInfo(command, "DELETE FROM %s.%s AS D WHERE " - "D.%s >= $1 AND D.%s < $2;", + "D.%s >= %s AND D.%s < %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name))); + quote_literal_cstr(invalidation_start), + quote_identifier(NameStr(*time_column_name)), + quote_literal_cstr(invalidation_end)); + res = SPI_execute_with_args(command->data, - 2 /*=nargs*/, - arg_types, - args, + 0 /*=nargs*/, + NULL, + NULL, NULL /*=Nulls*/, false /*=read_only*/, 0 /*count*/); @@ -1025,28 +1035,32 @@ spi_insert_materializations(SchemaAndName partial_view, SchemaAndName materializ { int res; StringInfo command = makeStringInfo(); - Oid arg_types[2]; - Datum args[2]; + Oid out_fn; + bool type_is_varlena; + char *materialization_start; + char *materialization_end; - arg_types[0] = materialization_range.type; - args[0] = materialization_range.start; + getTypeOutputInfo(materialization_range.type, &out_fn, &type_is_varlena); + + materialization_start = OidOutputFunctionCall(out_fn, materialization_range.start); + materialization_end = OidOutputFunctionCall(out_fn, materialization_range.end); - arg_types[1] = materialization_range.type; - args[1] = materialization_range.end; appendStringInfo(command, "INSERT INTO %s.%s SELECT * FROM %s.%s AS I " - "WHERE I.%s >= $1 AND I.%s < $2;", + "WHERE I.%s >= %s AND I.%s < %s;", quote_identifier(NameStr(*materialization_table.schema)), quote_identifier(NameStr(*materialization_table.name)), quote_identifier(NameStr(*partial_view.schema)), quote_identifier(NameStr(*partial_view.name)), quote_identifier(NameStr(*time_column_name)), - quote_identifier(NameStr(*time_column_name))); + quote_literal_cstr(materialization_start), + quote_identifier(NameStr(*time_column_name)), + quote_literal_cstr(materialization_end)); res = SPI_execute_with_args(command->data, - 2 /*=nargs*/, - arg_types, - args, + 0 /*=nargs*/, + NULL /*=argtypes*/, + NULL /*=Values*/, NULL /*=Nulls*/, false /*=read_only*/, 0 /*count*/ diff --git a/tsl/test/expected/continuous_aggs_materialize.out b/tsl/test/expected/continuous_aggs_materialize.out index 93e4ab883..364c448bf 100644 --- a/tsl/test/expected/continuous_aggs_materialize.out +++ b/tsl/test/expected/continuous_aggs_materialize.out @@ -613,11 +613,6 @@ CREATE VIEW test_t_mat_view FROM continuous_agg_test_t GROUP BY 1; NOTICE: adding not-null constraint to column "time_partition_col" ---TODO this should be created as part of CREATE VIEW -SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='continuous_agg_test_t' \gset -CREATE TRIGGER continuous_agg_insert_trigger - AFTER INSERT ON continuous_agg_test_t - FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.continuous_agg_invalidation_trigger(:raw_table_id); SELECT mat_hypertable_id, raw_hypertable_id, user_view_schema, user_view_name, partial_view_schema, partial_view_name, _timescaledb_internal.to_timestamp(bucket_width), _timescaledb_internal.to_interval(refresh_lag), diff --git a/tsl/test/sql/continuous_aggs_materialize.sql b/tsl/test/sql/continuous_aggs_materialize.sql index 826883ec5..d2c414950 100644 --- a/tsl/test/sql/continuous_aggs_materialize.sql +++ b/tsl/test/sql/continuous_aggs_materialize.sql @@ -264,11 +264,6 @@ CREATE VIEW test_t_mat_view AS SELECT time_bucket('2 hours', time), COUNT(data) as value FROM continuous_agg_test_t GROUP BY 1; ---TODO this should be created as part of CREATE VIEW -SELECT id as raw_table_id FROM _timescaledb_catalog.hypertable WHERE table_name='continuous_agg_test_t' \gset -CREATE TRIGGER continuous_agg_insert_trigger - AFTER INSERT ON continuous_agg_test_t - FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.continuous_agg_invalidation_trigger(:raw_table_id); SELECT mat_hypertable_id, raw_hypertable_id, user_view_schema, user_view_name, partial_view_schema, partial_view_name,