Avoid having to cast time arg for cagg policy

This patch does a minor refactoring and adds a way to guess
interval argument type based on used cagg

Issue: #2286
This commit is contained in:
Dmitry Simonenko 2020-09-21 14:29:43 +03:00
parent e1a00eb517
commit c15d8be7f7
5 changed files with 143 additions and 88 deletions

View File

@ -41,6 +41,45 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
return res;
}
Datum
ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype)
{
Oid type = *argtype;
if (!OidIsValid(type) || type == UNKNOWNOID)
{
Oid infuncid = InvalidOid;
Oid typeioparam;
type = timetype;
getTypeInputInfo(type, &infuncid, &typeioparam);
switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
}
*argtype = type;
}
return arg;
}
/*
* Get the internal time value from a pseudo-type function argument.
*
@ -75,36 +114,9 @@ subtract_interval_from_now(Oid timetype, const Interval *interval)
int64
ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype)
{
if (!OidIsValid(argtype) || argtype == UNKNOWNOID)
{
/* No explicit cast was done by the user. Try to convert the argument
* to the time type used by the continuous aggregate. */
Oid infuncid = InvalidOid;
Oid typeioparam;
argtype = timetype;
getTypeInputInfo(argtype, &infuncid, &typeioparam);
switch (get_func_nargs(infuncid))
{
case 1:
/* Functions that take one input argument, e.g., the Date function */
arg = OidFunctionCall1(infuncid, arg);
break;
case 3:
/* Timestamp functions take three input arguments */
arg = OidFunctionCall3(infuncid,
arg,
ObjectIdGetDatum(InvalidOid),
Int32GetDatum(-1));
break;
default:
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid time argument"),
errhint("Time argument requires an explicit cast.")));
}
}
/* If no explicit cast was done by the user, try to convert the argument
* to the time type used by the continuous aggregate. */
arg = ts_time_datum_convert_arg(arg, &argtype, timetype);
if (argtype == INTERVALOID)
{

View File

@ -68,6 +68,7 @@
TS_TIME_IS_NOEND(timeval, type))
extern TSDLLEXPORT int64 ts_time_value_from_arg(Datum arg, Oid argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_convert_arg(Datum arg, Oid *argtype, Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_min(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_max(Oid timetype);
extern TSDLLEXPORT Datum ts_time_datum_get_end(Oid timetype);

View File

@ -6,16 +6,18 @@
#include <postgres.h>
#include <miscadmin.h>
#include <parser/parse_coerce.h>
#include <jsonb_utils.h>
#include <miscadmin.h>
#include <utils/builtins.h>
#include "bgw_policy/continuous_aggregate_api.h"
#include "bgw_policy/job.h"
#include "bgw/job.h"
#include "continuous_agg.h"
#include "continuous_aggs/materialize.h"
#include "dimension.h"
#include "hypertable_cache.h"
#include "time_utils.h"
#include "policy_utils.h"
#define POLICY_REFRESH_CAGG_PROC_NAME "policy_refresh_continuous_aggregate"
@ -143,26 +145,35 @@ json_add_dim_interval_value(JsonbParseState *parse_state, const char *json_label
}
}
static void
check_valid_interval(Oid dim_type, Oid interval_type, const char *str_msg)
static Datum
convert_interval_arg(Oid dim_type, Datum interval, Oid *interval_type, const char *str_msg)
{
if (IS_INTEGER_TYPE(dim_type))
Oid convert_to = dim_type;
if (*interval_type != convert_to)
{
if (interval_type != dim_type)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
}
else if (IS_TIMESTAMP_TYPE(dim_type) && (interval_type != INTERVALOID))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using timestamp-based time "
"bucket.")));
if (IS_TIMESTAMP_TYPE(dim_type))
convert_to = INTERVALOID;
if (!can_coerce_type(1, interval_type, &convert_to, COERCION_IMPLICIT))
{
if (IS_INTEGER_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval of type %s with the continuous aggregate.",
format_type_be(dim_type))));
else if (IS_TIMESTAMP_TYPE(dim_type))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid parameter value for %s", str_msg),
errhint("Use time interval with a continuous aggregate using "
"timestamp-based time "
"bucket.")));
}
}
return ts_time_datum_convert_arg(interval, interval_type, convert_to);
}
Datum
@ -182,39 +193,48 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
Oid cagg_oid, owner_id;
List *jobs;
bool if_not_exists, start_isnull, end_isnull;
if (PG_ARGISNULL(3))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
}
/* Verify that the owner can create a background worker */
cagg_oid = PG_GETARG_OID(0);
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);
cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
hcache = ts_hypertable_cache_pin();
mat_htid = cagg->data.mat_hypertable_id;
mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid);
dim = hyperspace_get_open_dimension(mat_ht->space, 0);
dim_type = ts_dimension_get_partition_type(dim);
ts_cache_release(hcache);
/* Try to convert the argument to the time type used by the
* continuous aggregate */
start_interval = PG_GETARG_DATUM(1);
end_interval = PG_GETARG_DATUM(2);
start_isnull = PG_ARGISNULL(1);
end_isnull = PG_ARGISNULL(2);
start_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 1);
end_interval_type = get_fn_expr_argtype(fcinfo->flinfo, 2);
if (!start_isnull)
start_interval =
convert_interval_arg(dim_type, start_interval, &start_interval_type, "start_interval");
if (!end_isnull)
end_interval =
convert_interval_arg(dim_type, end_interval, &end_interval_type, "end_interval");
if (PG_ARGISNULL(3))
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot use NULL schedule interval")));
refresh_interval = *PG_GETARG_INTERVAL_P(3);
if_not_exists = PG_GETARG_BOOL(4);
cagg = ts_continuous_agg_find_by_relid(cagg_oid);
if (!cagg)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("\"%s\" is not a continuous aggregate", get_rel_name(cagg_oid))));
}
mat_htid = cagg->data.mat_hypertable_id;
/* Verify that the owner can create a background worker */
owner_id = ts_cagg_permissions_check(cagg_oid, GetUserId());
ts_bgw_job_validate_job_owner(owner_id);
hcache = ts_hypertable_cache_pin();
mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, mat_htid);
dim = hyperspace_get_open_dimension(mat_ht->space, 0);
dim_type = ts_dimension_get_partition_type(dim);
ts_cache_release(hcache);
/* Make sure there is only 1 refresh policy on the cagg */
jobs = ts_bgw_job_find_by_proc_and_hypertable_id(POLICY_REFRESH_CAGG_PROC_NAME,
@ -265,11 +285,6 @@ policy_refresh_cagg_add(PG_FUNCTION_ARGS)
namestrcpy(&proc_schema, INTERNAL_SCHEMA_NAME);
namestrcpy(&owner, GetUserNameFromId(owner_id, false));
if (!start_isnull)
check_valid_interval(dim_type, start_interval_type, "start_interval");
if (!end_isnull)
check_valid_interval(dim_type, end_interval_type, "end_interval");
JsonbParseState *parse_state = NULL;
pushJsonbValue(&parse_state, WJB_BEGIN_OBJECT, NULL);
ts_jsonb_add_int32(parse_state, CONFIG_KEY_MAT_HYPERTABLE_ID, mat_htid);

View File

@ -133,10 +133,10 @@ CREATE MATERIALIZED VIEW max_mat_view_date
FROM continuous_agg_max_mat_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
ERROR: invalid parameter value for end_interval
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
config
@ -161,7 +161,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
AS SELECT time_bucket('7 days', time)
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
@ -182,8 +182,10 @@ SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
(1 row)
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: could not find start_interval in config for job
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
ERROR: invalid input syntax for type interval: "xyz"
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW max_mat_view_timestamp;
--smallint table
@ -226,8 +228,24 @@ SELECT * FROM mat_smallint;
10 | 1
(2 rows)
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
-1
(1 row)
\set ON_ERROR_STOP 1
-- end of coverage tests
-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for start_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
ERROR: invalid parameter value for end_interval
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
WARNING: could not add refresh policy due to existing policy on continuous aggregate with different arguments
add_continuous_aggregate_policy
---------------------------------
@ -237,4 +255,3 @@ WARNING: could not add refresh policy due to existing policy on continuous aggr
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;
NOTICE: drop cascades to table _timescaledb_internal._hyper_8_7_chunk
-- end of coverage tests

View File

@ -86,9 +86,9 @@ CREATE MATERIALIZED VIEW max_mat_view_date
GROUP BY 1 WITH NO DATA;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days'::interval, 10 , '1 day'::interval);
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', 10, '1 day'::interval);
\set ON_ERROR_STOP 1
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 day'::interval, '1 day'::interval , '1 day'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_date', '2 days', '1 day', '1 day'::interval) as job_id \gset
SELECT config FROM _timescaledb_config.bgw_job
WHERE id = :job_id;
@ -106,7 +106,7 @@ CREATE MATERIALIZED VIEW max_mat_view_timestamp
FROM continuous_agg_timestamp
GROUP BY 1 WITH NO DATA;
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 h'::interval , '1 h'::interval) as job_id \gset
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 h'::interval , '1 h'::interval) as job_id \gset
CALL run_job(:job_id);
SELECT config FROM _timescaledb_config.bgw_job
@ -120,7 +120,8 @@ WHERE id = :job_id;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT config FROM _timescaledb_config.bgw_job where id = :job_id;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day'::interval, '1 day'::interval, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', '10 day', '1 day', '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('max_mat_view_timestamp', 'xyz', '1 day', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW max_mat_view_timestamp;
@ -149,8 +150,17 @@ CALL run_job(:job_id);
SELECT * FROM mat_smallint;
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', 15::smallint, 10::smallint, '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;
-- end of coverage tests
-- tests for interval argument convertions
--
\set ON_ERROR_STOP 0
SELECT add_continuous_aggregate_policy('mat_smallint', 15, 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', 10, '1h'::interval, if_not_exists=>true);
SELECT add_continuous_aggregate_policy('mat_smallint', '15', '10', '1h'::interval, if_not_exists=>true);
\set ON_ERROR_STOP 1
DROP MATERIALIZED VIEW mat_smallint;