From 5910f6830501d1e54d607fa9f244cedeec081bd6 Mon Sep 17 00:00:00 2001 From: Brian Rowe Date: Thu, 26 Sep 2019 22:40:05 -0700 Subject: [PATCH] Improve pushdown handling of time functions This change allows certain mutable functions to be whitelisted so that they can be safely pushed down to data nodes. Additionally, this change will no longer prevent queries containing the `now` function from being pushed down to data nodes, but will instead replace the function call with the transaction start time (which is the same value which would be used had the query been run solely on the access node). --- tsl/src/fdw/deparse.c | 96 +++++++++-- tsl/src/fdw/deparse.h | 2 +- tsl/src/fdw/estimate.c | 3 +- tsl/src/fdw/scan_exec.c | 74 ++++++++- tsl/src/fdw/scan_exec.h | 6 + tsl/src/fdw/scan_plan.c | 5 +- .../expected/partitionwise_distributed-11.out | 151 ++++++++++++++++++ tsl/test/sql/partitionwise_distributed.sql.in | 37 +++++ tsl/test/src/remote/CMakeLists.txt | 1 + tsl/test/src/remote/scan_exec_debug.c | 23 +++ 10 files changed, 381 insertions(+), 17 deletions(-) create mode 100644 tsl/test/src/remote/scan_exec_debug.c diff --git a/tsl/src/fdw/deparse.c b/tsl/src/fdw/deparse.c index 8faf976c0..c0c3e2dd6 100644 --- a/tsl/src/fdw/deparse.c +++ b/tsl/src/fdw/deparse.c @@ -102,13 +102,15 @@ typedef struct foreign_glob_cxt */ typedef struct deparse_expr_cxt { - PlannerInfo *root; /* global planner state */ - RelOptInfo *foreignrel; /* the foreign relation we are planning for */ - RelOptInfo *scanrel; /* the underlying scan relation. Same as - * foreignrel, when that represents a join or - * a base relation. */ - StringInfo buf; /* output buffer to append to */ - List **params_list; /* exprs that will become remote Params */ + PlannerInfo *root; /* global planner state */ + RelOptInfo *foreignrel; /* the foreign relation we are planning for */ + RelOptInfo *scanrel; /* the underlying scan relation. Same as + * foreignrel, when that represents a join or + * a base relation. */ + StringInfo buf; /* output buffer to append to */ + List **params_list; /* exprs that will become remote Params */ + List **current_time_idx; /* locations in the sql output that need to + * have the current time appended */ DataNodeChunkAssignment *sca; } deparse_expr_cxt; @@ -118,6 +120,33 @@ typedef struct deparse_expr_cxt #define SUBQUERY_REL_ALIAS_PREFIX "s" #define SUBQUERY_COL_ALIAS_PREFIX "c" +/* Oids of mutable functions determined to safe to pushdown to data nodes */ +static Oid PushdownSafeFunctionOIDs[] = { + F_TIMESTAMPTZ_PL_INTERVAL, + F_TIMESTAMPTZ_MI_INTERVAL, + F_NOW, /* Special case, this will be evaluated prior to pushdown */ + F_TIMESTAMPTZ_TIMESTAMP, + F_TIMESTAMP_TIMESTAMPTZ, + F_TIMESTAMP_PL_INTERVAL, + F_TIMESTAMP_MI_INTERVAL, + F_TIMESTAMP_LT_TIMESTAMPTZ, + F_TIMESTAMP_LE_TIMESTAMPTZ, + F_TIMESTAMP_EQ_TIMESTAMPTZ, + F_TIMESTAMP_GT_TIMESTAMPTZ, + F_TIMESTAMP_GE_TIMESTAMPTZ, + F_TIMESTAMP_NE_TIMESTAMPTZ, + F_TIMESTAMP_CMP_TIMESTAMPTZ, + F_TIMESTAMPTZ_LT_TIMESTAMP, + F_TIMESTAMPTZ_LE_TIMESTAMP, + F_TIMESTAMPTZ_EQ_TIMESTAMP, + F_TIMESTAMPTZ_GT_TIMESTAMP, + F_TIMESTAMPTZ_GE_TIMESTAMP, + F_TIMESTAMPTZ_NE_TIMESTAMP, + F_TIMESTAMPTZ_CMP_TIMESTAMP, +}; +static const int NumPushdownSafeOIDs = + sizeof(PushdownSafeFunctionOIDs) / sizeof(PushdownSafeFunctionOIDs[0]); + /* * Functions to determine whether an expression can be evaluated safely on * data node. @@ -235,6 +264,35 @@ classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, L } } +static int +oid_comparator(const void *a, const void *b) +{ + if (*(Oid *) a == *(Oid *) b) + return 0; + else if (*(Oid *) a < *(Oid *) b) + return -1; + else + return 1; +} + +static bool +function_is_whitelisted(Oid func_id) +{ + static bool PushdownOIDsSorted = false; + + if (!PushdownOIDsSorted) + { + qsort(PushdownSafeFunctionOIDs, NumPushdownSafeOIDs, sizeof(Oid), oid_comparator); + PushdownOIDsSorted = true; + } + + return bsearch(&func_id, + PushdownSafeFunctionOIDs, + NumPushdownSafeOIDs, + sizeof(Oid), + oid_comparator) != NULL; +} + /* * Check for mutable functions in an expression. * @@ -255,7 +313,14 @@ contain_mutable_functions_checker(Oid func_id, void *context) if (NULL != finfo) return false; - return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE); + if (func_volatile(func_id) == PROVOLATILE_IMMUTABLE) + return false; + + /* Certain functions are mutable but are known to safe to push down to the data node. */ + if (function_is_whitelisted(func_id)) + return false; + + return true; } /* @@ -720,7 +785,8 @@ build_tlist_to_deparse(RelOptInfo *foreignrel) void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, bool is_subquery, - List **retrieved_attrs, List **params_list, DataNodeChunkAssignment *sca) + List **retrieved_attrs, List **params_list, DataNodeChunkAssignment *sca, + List **current_time_idx) { deparse_expr_cxt context; TsFdwRelInfo *fpinfo = fdw_relinfo_get(rel); @@ -738,6 +804,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List context.foreignrel = rel; context.scanrel = IS_UPPER_REL(rel) ? fpinfo->outerrel : rel; context.params_list = params_list; + context.current_time_idx = current_time_idx; context.sca = sca; /* Construct SELECT clause */ @@ -2772,8 +2839,17 @@ appendFunctionName(Oid funcid, deparse_expr_cxt *context) appendStringInfo(buf, "%s.", quote_identifier(schemaname)); } - /* Always print the function name */ proname = NameStr(procform->proname); + + /* + * If the function is the 'now' function, we'll have to replace it before pushing + * it down to the data node. For now just make a note of the index at which we're + * inserting it into the sql statement. + */ + if (funcid == F_NOW && context->current_time_idx) + *context->current_time_idx = lappend_int(*context->current_time_idx, buf->len); + + /* Always print the function name */ appendStringInfoString(buf, quote_identifier(proname)); ReleaseSysCache(proctup); diff --git a/tsl/src/fdw/deparse.h b/tsl/src/fdw/deparse.h index c24d1b1dc..b036f4cd5 100644 --- a/tsl/src/fdw/deparse.h +++ b/tsl/src/fdw/deparse.h @@ -50,7 +50,7 @@ extern List *build_tlist_to_deparse(RelOptInfo *foreignrel); extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist, List *remote_conds, List *pathkeys, bool is_subquery, List **retrieved_attrs, List **params_list, - DataNodeChunkAssignment *swa); + DataNodeChunkAssignment *swa, List **current_time_idx); extern const char *get_jointype_name(JoinType jointype); extern void deparseStringLiteral(StringInfo buf, const char *val); diff --git a/tsl/src/fdw/estimate.c b/tsl/src/fdw/estimate.c index 4feab5f57..895890408 100644 --- a/tsl/src/fdw/estimate.c +++ b/tsl/src/fdw/estimate.c @@ -153,7 +153,8 @@ get_remote_estimate(PlannerInfo *root, RelOptInfo *rel, List *param_join_conds, false, &retrieved_attrs, NULL, - fpinfo->sca); + fpinfo->sca, + NULL); /* Get the remote estimate */ conn = remote_dist_txn_get_connection(fpinfo->cid, REMOTE_TXN_NO_PREP_STMT); diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index 7e01bf1e5..aa57baeee 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -42,6 +42,8 @@ enum FdwScanPrivateIndex FdwScanPrivateServerId, /* OID list of chunk oids, used by EXPLAIN */ FdwScanPrivateChunkOids, + /* Places in the remote query that need to have the current timestamp inserted */ + FdwScanCurrentTimeIndexes, /* * String describing join i.e. names of relations being joined and types * of join, added when the scan is join @@ -180,6 +182,54 @@ prepare_query_params(PlanState *node, List *fdw_exprs, int num_params, FmgrInfo *param_values = (const char **) palloc0(num_params * sizeof(char *)); } +#ifdef TS_DEBUG +/* Allow tests to specify the time to push down in place of now() */ +TimestampTz ts_current_timestamp_override_value = -1; + +extern void +fdw_scan_debug_override_pushdown_timestamp(TimestampTz time) +{ + ts_current_timestamp_override_value = time; +} +#endif + +/* + * This function takes a sql statement char string and list of indicies to occurrences of `now()` + * within that string and then returns a new string which will be the same sql statement, only with + * the now calls replaced with the current transaction timestamp. + */ +static char * +generate_updated_sql_using_current_timestamp(const char *original_sql, List *now_indicies) +{ + static const char string_to_replace[] = "now()"; + int replace_length = strlen(string_to_replace); + StringInfoData new_query; + ListCell *lc; + int curr_index = 0; + TimestampTz now; + + initStringInfo(&new_query); + now = GetSQLCurrentTimestamp(-1); +#ifdef TS_DEBUG + if (ts_current_timestamp_override_value >= 0) + now = ts_current_timestamp_override_value; +#endif + + foreach (lc, now_indicies) + { + int next_index = lfirst_int(lc); + + Assert(next_index < strlen(original_sql) && + strncmp(string_to_replace, original_sql + next_index, replace_length) == 0); + appendBinaryStringInfo(&new_query, original_sql + curr_index, next_index - curr_index); + appendStringInfo(&new_query, "('%s'::timestamptz)", timestamptz_to_str(now)); + curr_index = next_index + replace_length; + } + + appendStringInfo(&new_query, "%s", original_sql + curr_index); + return new_query.data; +} + static TSConnection * get_connection(ScanState *ss, Oid const server_id, Bitmapset *scanrelids, List *exprs) { @@ -228,7 +278,14 @@ fdw_scan_init(ScanState *ss, TsFdwScanState *fsstate, Bitmapset *scanrelids, Lis fdw_exprs); /* Get private info created by planner functions. */ - fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + if (list_nth(fdw_private, FdwScanCurrentTimeIndexes) == NIL) + fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + else + fsstate->query = + generate_updated_sql_using_current_timestamp(strVal(list_nth(fdw_private, + FdwScanPrivateSelectSql)), + list_nth(fdw_private, + FdwScanCurrentTimeIndexes)); fsstate->retrieved_attrs = (List *) list_nth(fdw_private, FdwScanPrivateRetrievedAttrs); fsstate->fetch_size = intVal(list_nth(fdw_private, FdwScanPrivateFetchSize)); @@ -401,6 +458,7 @@ fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, TsFdwScanSt Oid server_id = intVal(list_nth(fdw_private, FdwScanPrivateServerId)); ForeignServer *server = GetForeignServer(server_id); List *chunk_oids = (List *) list_nth(fdw_private, FdwScanPrivateChunkOids); + char *sql; ExplainPropertyText("Data node", server->servername, es); @@ -423,9 +481,17 @@ fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, TsFdwScanSt ExplainPropertyText("Chunks", chunk_names.data, es); } - ExplainPropertyText("Remote SQL", - strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)), - es); + if (list_nth(fdw_private, FdwScanCurrentTimeIndexes) != NIL) + sql = + generate_updated_sql_using_current_timestamp(strVal( + list_nth(fdw_private, + FdwScanPrivateSelectSql)), + list_nth(fdw_private, + FdwScanCurrentTimeIndexes)); + else + sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + + ExplainPropertyText("Remote SQL", sql, es); if (ts_guc_enable_remote_explain) { diff --git a/tsl/src/fdw/scan_exec.h b/tsl/src/fdw/scan_exec.h index c8944b80b..22495c854 100644 --- a/tsl/src/fdw/scan_exec.h +++ b/tsl/src/fdw/scan_exec.h @@ -11,6 +11,7 @@ #include #include #include +#include /* * Execution state of a foreign scan using timescaledb_fdw. @@ -47,4 +48,9 @@ extern void fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, extern void create_cursor(ScanState *ss, TsFdwScanState *fsstate, bool block); +#ifdef TS_DEBUG +/* Allow tests to specify the time to push down in place of now() */ +extern void fdw_scan_debug_override_pushdown_timestamp(TimestampTz time); +#endif + #endif /* TIMESCALEDB_TSL_FDW_SCAN_EXEC_H */ diff --git a/tsl/src/fdw/scan_plan.c b/tsl/src/fdw/scan_plan.c index d0fbdeb8a..6f180c717 100644 --- a/tsl/src/fdw/scan_plan.c +++ b/tsl/src/fdw/scan_plan.c @@ -340,6 +340,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path List *remote_exprs = NIL; List *local_exprs = NIL; List *params_list = NIL; + List *current_time_idx = NIL; List *fdw_scan_tlist = NIL; List *fdw_recheck_quals = NIL; List *retrieved_attrs; @@ -454,7 +455,8 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path false, &retrieved_attrs, ¶ms_list, - fpinfo->sca); + fpinfo->sca, + ¤t_time_idx); /* Remember remote_exprs for possible use by PlanDirectModify */ fpinfo->final_remote_exprs = remote_exprs; @@ -468,6 +470,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path makeInteger(fpinfo->fetch_size), makeInteger(fpinfo->server->serverid), (fpinfo->sca != NULL ? list_copy(fpinfo->sca->chunk_oids) : NIL)); + fdw_private = lappend(fdw_private, current_time_idx); Assert(!IS_JOIN_REL(rel)); if (IS_UPPER_REL(rel)) diff --git a/tsl/test/expected/partitionwise_distributed-11.out b/tsl/test/expected/partitionwise_distributed-11.out index d8b5d003a..b5dc0f8d3 100644 --- a/tsl/test/expected/partitionwise_distributed-11.out +++ b/tsl/test/expected/partitionwise_distributed-11.out @@ -15,6 +15,9 @@ RETURNS VOID AS :TSL_MODULE_PATHNAME, 'ts_remote_exec' LANGUAGE C; CREATE EXTENSION IF NOT EXISTS postgres_fdw; +CREATE OR REPLACE FUNCTION test_override_pushdown_timestamptz(new_value TIMESTAMPTZ) RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'test_override_pushdown_timestamptz' +LANGUAGE C VOLATILE STRICT; -- Cleanup from other potential tests that created these databases SET client_min_messages TO ERROR; DROP DATABASE IF EXISTS data_node_1; @@ -2116,6 +2119,154 @@ LIMIT 1; Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[2, 1]) AND (("time" < '2018-06-01 00:00:00-07'::timestamp with time zone)) ORDER BY "time" ASC NULLS LAST, device ASC NULLS LAST (21 rows) +-- contains whitelisted time expressions +SELECT test_override_pushdown_timestamptz('2018-06-01 00:00'::timestamptz); + test_override_pushdown_timestamptz +------------------------------------ + +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT time, device, avg(temp) +FROM pg2dim +WHERE time < Now( ) - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; + QUERY PLAN +----------------------------------------------------------------------------------------- + Limit + Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, (avg(pg2dim_h1_t1.temp)) + -> Append + -> HashAggregate + Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, avg(pg2dim_h1_t1.temp) + Group Key: pg2dim_h1_t1."time", pg2dim_h1_t1.device + -> Foreign Scan on public.pg2dim_h1_t1 + Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, pg2dim_h1_t1.temp + Filter: (pg2dim_h1_t1."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t1 + -> HashAggregate + Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, avg(pg2dim_h1_t2.temp) + Group Key: pg2dim_h1_t2."time", pg2dim_h1_t2.device + -> Foreign Scan on public.pg2dim_h1_t2 + Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, pg2dim_h1_t2.temp + Filter: (pg2dim_h1_t2."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t2 + -> HashAggregate + Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, avg(pg2dim_h1_t3.temp) + Group Key: pg2dim_h1_t3."time", pg2dim_h1_t3.device + -> Foreign Scan on public.pg2dim_h1_t3 + Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, pg2dim_h1_t3.temp + Filter: (pg2dim_h1_t3."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t3 + -> HashAggregate + Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, avg(pg2dim_h2_t1.temp) + Group Key: pg2dim_h2_t1."time", pg2dim_h2_t1.device + -> Foreign Scan on public.pg2dim_h2_t1 + Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, pg2dim_h2_t1.temp + Filter: (pg2dim_h2_t1."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t1 + -> HashAggregate + Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, avg(pg2dim_h2_t2.temp) + Group Key: pg2dim_h2_t2."time", pg2dim_h2_t2.device + -> Foreign Scan on public.pg2dim_h2_t2 + Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, pg2dim_h2_t2.temp + Filter: (pg2dim_h2_t2."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t2 + -> HashAggregate + Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, avg(pg2dim_h2_t3.temp) + Group Key: pg2dim_h2_t3."time", pg2dim_h2_t3.device + -> Foreign Scan on public.pg2dim_h2_t3 + Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, pg2dim_h2_t3.temp + Filter: (pg2dim_h2_t3."time" < (now() - '@ 3 days'::interval)) + Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t3 +(45 rows) + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT time, device, avg(temp) +FROM hyper +WHERE time < Now( ) - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: "time", device, (avg(temp)) + -> Custom Scan (AsyncAppend) + Output: "time", device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper."time", hyper.device, (avg(hyper.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 + -> Custom Scan (DataNodeScan) + Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 +(17 rows) + +-- Verify that repeated runs of the same plan will get different timestamps +PREPARE timestamp_pushdown_test AS +SELECT time, device, avg(temp) +FROM hyper +WHERE time < now() - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE timestamp_pushdown_test; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: "time", device, (avg(temp)) + -> Custom Scan (AsyncAppend) + Output: "time", device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper."time", hyper.device, (avg(hyper.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 + -> Custom Scan (DataNodeScan) + Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 +(17 rows) + +SELECT test_override_pushdown_timestamptz('2019-10-15 00:00'::timestamptz); + test_override_pushdown_timestamptz +------------------------------------ + +(1 row) + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE timestamp_pushdown_test; + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit + Output: "time", device, (avg(temp)) + -> Custom Scan (AsyncAppend) + Output: "time", device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper."time", hyper.device, (avg(hyper.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 + -> Custom Scan (DataNodeScan) + Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk + Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2 +(17 rows) + -- Test one-dimensional push down CREATE TABLE hyper1d (time timestamptz, device int, temp float); SELECT * FROM create_distributed_hypertable('hyper1d', 'time', chunk_time_interval => '3 months'::interval); diff --git a/tsl/test/sql/partitionwise_distributed.sql.in b/tsl/test/sql/partitionwise_distributed.sql.in index 7b0023f2c..040869a37 100644 --- a/tsl/test/sql/partitionwise_distributed.sql.in +++ b/tsl/test/sql/partitionwise_distributed.sql.in @@ -7,6 +7,10 @@ \ir include/remote_exec.sql CREATE EXTENSION IF NOT EXISTS postgres_fdw; +CREATE OR REPLACE FUNCTION test_override_pushdown_timestamptz(new_value TIMESTAMPTZ) RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'test_override_pushdown_timestamptz' +LANGUAGE C VOLATILE STRICT; + -- Cleanup from other potential tests that created these databases SET client_min_messages TO ERROR; DROP DATABASE IF EXISTS data_node_1; @@ -645,6 +649,39 @@ AND time < '2018-06-01 00:00' GROUP BY 1, 2 LIMIT 1; +-- contains whitelisted time expressions +SELECT test_override_pushdown_timestamptz('2018-06-01 00:00'::timestamptz); + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT time, device, avg(temp) +FROM pg2dim +WHERE time < Now( ) - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; + +EXPLAIN (VERBOSE, COSTS OFF) +SELECT time, device, avg(temp) +FROM hyper +WHERE time < Now( ) - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; + +-- Verify that repeated runs of the same plan will get different timestamps +PREPARE timestamp_pushdown_test AS +SELECT time, device, avg(temp) +FROM hyper +WHERE time < now() - INTERVAL '3 days' +GROUP BY 1, 2 +LIMIT 1; + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE timestamp_pushdown_test; + +SELECT test_override_pushdown_timestamptz('2019-10-15 00:00'::timestamptz); + +EXPLAIN (VERBOSE, COSTS OFF) +EXECUTE timestamp_pushdown_test; + -- Test one-dimensional push down CREATE TABLE hyper1d (time timestamptz, device int, temp float); SELECT * FROM create_distributed_hypertable('hyper1d', 'time', chunk_time_interval => '3 months'::interval); diff --git a/tsl/test/src/remote/CMakeLists.txt b/tsl/test/src/remote/CMakeLists.txt index 2d0598884..c12536997 100644 --- a/tsl/test/src/remote/CMakeLists.txt +++ b/tsl/test/src/remote/CMakeLists.txt @@ -10,6 +10,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/txn_persistent_record.c ${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c ${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c + ${CMAKE_CURRENT_SOURCE_DIR}/scan_exec_debug.c ) target_sources(${TSL_TESTS_LIB_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/test/src/remote/scan_exec_debug.c b/tsl/test/src/remote/scan_exec_debug.c new file mode 100644 index 000000000..3f425474b --- /dev/null +++ b/tsl/test/src/remote/scan_exec_debug.c @@ -0,0 +1,23 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include + +#include "export.h" +#include "fdw/scan_exec.h" + +TS_FUNCTION_INFO_V1(test_override_pushdown_timestamptz); + +Datum +test_override_pushdown_timestamptz(PG_FUNCTION_ARGS) +{ +#ifdef TS_DEBUG + fdw_scan_debug_override_pushdown_timestamp(PG_GETARG_INT64(0)); + PG_RETURN_VOID(); +#else + elog(ERROR, "unable to handle test_is_frontend_session without TS_DEBUG flag set"); + PG_RETURN_VOID(); +#endif +}