mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-26 00:00:54 +08:00
Improve pushdown handling of time functions
This change allows certain mutable functions to be whitelisted so that they can be safely pushed down to data nodes. Additionally, this change will no longer prevent queries containing the `now` function from being pushed down to data nodes, but will instead replace the function call with the transaction start time (which is the same value which would be used had the query been run solely on the access node).
This commit is contained in:
parent
a443fe5ba9
commit
5910f68305
@ -102,13 +102,15 @@ typedef struct foreign_glob_cxt
|
||||
*/
|
||||
typedef struct deparse_expr_cxt
|
||||
{
|
||||
PlannerInfo *root; /* global planner state */
|
||||
RelOptInfo *foreignrel; /* the foreign relation we are planning for */
|
||||
RelOptInfo *scanrel; /* the underlying scan relation. Same as
|
||||
* foreignrel, when that represents a join or
|
||||
* a base relation. */
|
||||
StringInfo buf; /* output buffer to append to */
|
||||
List **params_list; /* exprs that will become remote Params */
|
||||
PlannerInfo *root; /* global planner state */
|
||||
RelOptInfo *foreignrel; /* the foreign relation we are planning for */
|
||||
RelOptInfo *scanrel; /* the underlying scan relation. Same as
|
||||
* foreignrel, when that represents a join or
|
||||
* a base relation. */
|
||||
StringInfo buf; /* output buffer to append to */
|
||||
List **params_list; /* exprs that will become remote Params */
|
||||
List **current_time_idx; /* locations in the sql output that need to
|
||||
* have the current time appended */
|
||||
DataNodeChunkAssignment *sca;
|
||||
} deparse_expr_cxt;
|
||||
|
||||
@ -118,6 +120,33 @@ typedef struct deparse_expr_cxt
|
||||
#define SUBQUERY_REL_ALIAS_PREFIX "s"
|
||||
#define SUBQUERY_COL_ALIAS_PREFIX "c"
|
||||
|
||||
/* Oids of mutable functions determined to safe to pushdown to data nodes */
|
||||
static Oid PushdownSafeFunctionOIDs[] = {
|
||||
F_TIMESTAMPTZ_PL_INTERVAL,
|
||||
F_TIMESTAMPTZ_MI_INTERVAL,
|
||||
F_NOW, /* Special case, this will be evaluated prior to pushdown */
|
||||
F_TIMESTAMPTZ_TIMESTAMP,
|
||||
F_TIMESTAMP_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_PL_INTERVAL,
|
||||
F_TIMESTAMP_MI_INTERVAL,
|
||||
F_TIMESTAMP_LT_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_LE_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_EQ_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_GT_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_GE_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_NE_TIMESTAMPTZ,
|
||||
F_TIMESTAMP_CMP_TIMESTAMPTZ,
|
||||
F_TIMESTAMPTZ_LT_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_LE_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_EQ_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_GT_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_GE_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_NE_TIMESTAMP,
|
||||
F_TIMESTAMPTZ_CMP_TIMESTAMP,
|
||||
};
|
||||
static const int NumPushdownSafeOIDs =
|
||||
sizeof(PushdownSafeFunctionOIDs) / sizeof(PushdownSafeFunctionOIDs[0]);
|
||||
|
||||
/*
|
||||
* Functions to determine whether an expression can be evaluated safely on
|
||||
* data node.
|
||||
@ -235,6 +264,35 @@ classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, L
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
oid_comparator(const void *a, const void *b)
|
||||
{
|
||||
if (*(Oid *) a == *(Oid *) b)
|
||||
return 0;
|
||||
else if (*(Oid *) a < *(Oid *) b)
|
||||
return -1;
|
||||
else
|
||||
return 1;
|
||||
}
|
||||
|
||||
static bool
|
||||
function_is_whitelisted(Oid func_id)
|
||||
{
|
||||
static bool PushdownOIDsSorted = false;
|
||||
|
||||
if (!PushdownOIDsSorted)
|
||||
{
|
||||
qsort(PushdownSafeFunctionOIDs, NumPushdownSafeOIDs, sizeof(Oid), oid_comparator);
|
||||
PushdownOIDsSorted = true;
|
||||
}
|
||||
|
||||
return bsearch(&func_id,
|
||||
PushdownSafeFunctionOIDs,
|
||||
NumPushdownSafeOIDs,
|
||||
sizeof(Oid),
|
||||
oid_comparator) != NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check for mutable functions in an expression.
|
||||
*
|
||||
@ -255,7 +313,14 @@ contain_mutable_functions_checker(Oid func_id, void *context)
|
||||
if (NULL != finfo)
|
||||
return false;
|
||||
|
||||
return (func_volatile(func_id) != PROVOLATILE_IMMUTABLE);
|
||||
if (func_volatile(func_id) == PROVOLATILE_IMMUTABLE)
|
||||
return false;
|
||||
|
||||
/* Certain functions are mutable but are known to safe to push down to the data node. */
|
||||
if (function_is_whitelisted(func_id))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -720,7 +785,8 @@ build_tlist_to_deparse(RelOptInfo *foreignrel)
|
||||
void
|
||||
deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist,
|
||||
List *remote_conds, List *pathkeys, bool is_subquery,
|
||||
List **retrieved_attrs, List **params_list, DataNodeChunkAssignment *sca)
|
||||
List **retrieved_attrs, List **params_list, DataNodeChunkAssignment *sca,
|
||||
List **current_time_idx)
|
||||
{
|
||||
deparse_expr_cxt context;
|
||||
TsFdwRelInfo *fpinfo = fdw_relinfo_get(rel);
|
||||
@ -738,6 +804,7 @@ deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List
|
||||
context.foreignrel = rel;
|
||||
context.scanrel = IS_UPPER_REL(rel) ? fpinfo->outerrel : rel;
|
||||
context.params_list = params_list;
|
||||
context.current_time_idx = current_time_idx;
|
||||
context.sca = sca;
|
||||
|
||||
/* Construct SELECT clause */
|
||||
@ -2772,8 +2839,17 @@ appendFunctionName(Oid funcid, deparse_expr_cxt *context)
|
||||
appendStringInfo(buf, "%s.", quote_identifier(schemaname));
|
||||
}
|
||||
|
||||
/* Always print the function name */
|
||||
proname = NameStr(procform->proname);
|
||||
|
||||
/*
|
||||
* If the function is the 'now' function, we'll have to replace it before pushing
|
||||
* it down to the data node. For now just make a note of the index at which we're
|
||||
* inserting it into the sql statement.
|
||||
*/
|
||||
if (funcid == F_NOW && context->current_time_idx)
|
||||
*context->current_time_idx = lappend_int(*context->current_time_idx, buf->len);
|
||||
|
||||
/* Always print the function name */
|
||||
appendStringInfoString(buf, quote_identifier(proname));
|
||||
|
||||
ReleaseSysCache(proctup);
|
||||
|
@ -50,7 +50,7 @@ extern List *build_tlist_to_deparse(RelOptInfo *foreignrel);
|
||||
extern void deparseSelectStmtForRel(StringInfo buf, PlannerInfo *root, RelOptInfo *rel, List *tlist,
|
||||
List *remote_conds, List *pathkeys, bool is_subquery,
|
||||
List **retrieved_attrs, List **params_list,
|
||||
DataNodeChunkAssignment *swa);
|
||||
DataNodeChunkAssignment *swa, List **current_time_idx);
|
||||
|
||||
extern const char *get_jointype_name(JoinType jointype);
|
||||
extern void deparseStringLiteral(StringInfo buf, const char *val);
|
||||
|
@ -153,7 +153,8 @@ get_remote_estimate(PlannerInfo *root, RelOptInfo *rel, List *param_join_conds,
|
||||
false,
|
||||
&retrieved_attrs,
|
||||
NULL,
|
||||
fpinfo->sca);
|
||||
fpinfo->sca,
|
||||
NULL);
|
||||
|
||||
/* Get the remote estimate */
|
||||
conn = remote_dist_txn_get_connection(fpinfo->cid, REMOTE_TXN_NO_PREP_STMT);
|
||||
|
@ -42,6 +42,8 @@ enum FdwScanPrivateIndex
|
||||
FdwScanPrivateServerId,
|
||||
/* OID list of chunk oids, used by EXPLAIN */
|
||||
FdwScanPrivateChunkOids,
|
||||
/* Places in the remote query that need to have the current timestamp inserted */
|
||||
FdwScanCurrentTimeIndexes,
|
||||
/*
|
||||
* String describing join i.e. names of relations being joined and types
|
||||
* of join, added when the scan is join
|
||||
@ -180,6 +182,54 @@ prepare_query_params(PlanState *node, List *fdw_exprs, int num_params, FmgrInfo
|
||||
*param_values = (const char **) palloc0(num_params * sizeof(char *));
|
||||
}
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
/* Allow tests to specify the time to push down in place of now() */
|
||||
TimestampTz ts_current_timestamp_override_value = -1;
|
||||
|
||||
extern void
|
||||
fdw_scan_debug_override_pushdown_timestamp(TimestampTz time)
|
||||
{
|
||||
ts_current_timestamp_override_value = time;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* This function takes a sql statement char string and list of indicies to occurrences of `now()`
|
||||
* within that string and then returns a new string which will be the same sql statement, only with
|
||||
* the now calls replaced with the current transaction timestamp.
|
||||
*/
|
||||
static char *
|
||||
generate_updated_sql_using_current_timestamp(const char *original_sql, List *now_indicies)
|
||||
{
|
||||
static const char string_to_replace[] = "now()";
|
||||
int replace_length = strlen(string_to_replace);
|
||||
StringInfoData new_query;
|
||||
ListCell *lc;
|
||||
int curr_index = 0;
|
||||
TimestampTz now;
|
||||
|
||||
initStringInfo(&new_query);
|
||||
now = GetSQLCurrentTimestamp(-1);
|
||||
#ifdef TS_DEBUG
|
||||
if (ts_current_timestamp_override_value >= 0)
|
||||
now = ts_current_timestamp_override_value;
|
||||
#endif
|
||||
|
||||
foreach (lc, now_indicies)
|
||||
{
|
||||
int next_index = lfirst_int(lc);
|
||||
|
||||
Assert(next_index < strlen(original_sql) &&
|
||||
strncmp(string_to_replace, original_sql + next_index, replace_length) == 0);
|
||||
appendBinaryStringInfo(&new_query, original_sql + curr_index, next_index - curr_index);
|
||||
appendStringInfo(&new_query, "('%s'::timestamptz)", timestamptz_to_str(now));
|
||||
curr_index = next_index + replace_length;
|
||||
}
|
||||
|
||||
appendStringInfo(&new_query, "%s", original_sql + curr_index);
|
||||
return new_query.data;
|
||||
}
|
||||
|
||||
static TSConnection *
|
||||
get_connection(ScanState *ss, Oid const server_id, Bitmapset *scanrelids, List *exprs)
|
||||
{
|
||||
@ -228,7 +278,14 @@ fdw_scan_init(ScanState *ss, TsFdwScanState *fsstate, Bitmapset *scanrelids, Lis
|
||||
fdw_exprs);
|
||||
|
||||
/* Get private info created by planner functions. */
|
||||
fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
|
||||
if (list_nth(fdw_private, FdwScanCurrentTimeIndexes) == NIL)
|
||||
fsstate->query = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
|
||||
else
|
||||
fsstate->query =
|
||||
generate_updated_sql_using_current_timestamp(strVal(list_nth(fdw_private,
|
||||
FdwScanPrivateSelectSql)),
|
||||
list_nth(fdw_private,
|
||||
FdwScanCurrentTimeIndexes));
|
||||
fsstate->retrieved_attrs = (List *) list_nth(fdw_private, FdwScanPrivateRetrievedAttrs);
|
||||
fsstate->fetch_size = intVal(list_nth(fdw_private, FdwScanPrivateFetchSize));
|
||||
|
||||
@ -401,6 +458,7 @@ fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, TsFdwScanSt
|
||||
Oid server_id = intVal(list_nth(fdw_private, FdwScanPrivateServerId));
|
||||
ForeignServer *server = GetForeignServer(server_id);
|
||||
List *chunk_oids = (List *) list_nth(fdw_private, FdwScanPrivateChunkOids);
|
||||
char *sql;
|
||||
|
||||
ExplainPropertyText("Data node", server->servername, es);
|
||||
|
||||
@ -423,9 +481,17 @@ fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, TsFdwScanSt
|
||||
ExplainPropertyText("Chunks", chunk_names.data, es);
|
||||
}
|
||||
|
||||
ExplainPropertyText("Remote SQL",
|
||||
strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)),
|
||||
es);
|
||||
if (list_nth(fdw_private, FdwScanCurrentTimeIndexes) != NIL)
|
||||
sql =
|
||||
generate_updated_sql_using_current_timestamp(strVal(
|
||||
list_nth(fdw_private,
|
||||
FdwScanPrivateSelectSql)),
|
||||
list_nth(fdw_private,
|
||||
FdwScanCurrentTimeIndexes));
|
||||
else
|
||||
sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
|
||||
|
||||
ExplainPropertyText("Remote SQL", sql, es);
|
||||
|
||||
if (ts_guc_enable_remote_explain)
|
||||
{
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include <utils/palloc.h>
|
||||
#include <fmgr.h>
|
||||
#include <access/htup.h>
|
||||
#include <commands/explain.h>
|
||||
|
||||
/*
|
||||
* Execution state of a foreign scan using timescaledb_fdw.
|
||||
@ -47,4 +48,9 @@ extern void fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es,
|
||||
|
||||
extern void create_cursor(ScanState *ss, TsFdwScanState *fsstate, bool block);
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
/* Allow tests to specify the time to push down in place of now() */
|
||||
extern void fdw_scan_debug_override_pushdown_timestamp(TimestampTz time);
|
||||
#endif
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_FDW_SCAN_EXEC_H */
|
||||
|
@ -340,6 +340,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
|
||||
List *remote_exprs = NIL;
|
||||
List *local_exprs = NIL;
|
||||
List *params_list = NIL;
|
||||
List *current_time_idx = NIL;
|
||||
List *fdw_scan_tlist = NIL;
|
||||
List *fdw_recheck_quals = NIL;
|
||||
List *retrieved_attrs;
|
||||
@ -454,7 +455,8 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
|
||||
false,
|
||||
&retrieved_attrs,
|
||||
¶ms_list,
|
||||
fpinfo->sca);
|
||||
fpinfo->sca,
|
||||
¤t_time_idx);
|
||||
|
||||
/* Remember remote_exprs for possible use by PlanDirectModify */
|
||||
fpinfo->final_remote_exprs = remote_exprs;
|
||||
@ -468,6 +470,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
|
||||
makeInteger(fpinfo->fetch_size),
|
||||
makeInteger(fpinfo->server->serverid),
|
||||
(fpinfo->sca != NULL ? list_copy(fpinfo->sca->chunk_oids) : NIL));
|
||||
fdw_private = lappend(fdw_private, current_time_idx);
|
||||
Assert(!IS_JOIN_REL(rel));
|
||||
|
||||
if (IS_UPPER_REL(rel))
|
||||
|
@ -15,6 +15,9 @@ RETURNS VOID
|
||||
AS :TSL_MODULE_PATHNAME, 'ts_remote_exec'
|
||||
LANGUAGE C;
|
||||
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
|
||||
CREATE OR REPLACE FUNCTION test_override_pushdown_timestamptz(new_value TIMESTAMPTZ) RETURNS VOID
|
||||
AS :TSL_MODULE_PATHNAME, 'test_override_pushdown_timestamptz'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
-- Cleanup from other potential tests that created these databases
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP DATABASE IF EXISTS data_node_1;
|
||||
@ -2116,6 +2119,154 @@ LIMIT 1;
|
||||
Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[2, 1]) AND (("time" < '2018-06-01 00:00:00-07'::timestamp with time zone)) ORDER BY "time" ASC NULLS LAST, device ASC NULLS LAST
|
||||
(21 rows)
|
||||
|
||||
-- contains whitelisted time expressions
|
||||
SELECT test_override_pushdown_timestamptz('2018-06-01 00:00'::timestamptz);
|
||||
test_override_pushdown_timestamptz
|
||||
------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT time, device, avg(temp)
|
||||
FROM pg2dim
|
||||
WHERE time < Now( ) - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
QUERY PLAN
|
||||
-----------------------------------------------------------------------------------------
|
||||
Limit
|
||||
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, (avg(pg2dim_h1_t1.temp))
|
||||
-> Append
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, avg(pg2dim_h1_t1.temp)
|
||||
Group Key: pg2dim_h1_t1."time", pg2dim_h1_t1.device
|
||||
-> Foreign Scan on public.pg2dim_h1_t1
|
||||
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, pg2dim_h1_t1.temp
|
||||
Filter: (pg2dim_h1_t1."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t1
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, avg(pg2dim_h1_t2.temp)
|
||||
Group Key: pg2dim_h1_t2."time", pg2dim_h1_t2.device
|
||||
-> Foreign Scan on public.pg2dim_h1_t2
|
||||
Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, pg2dim_h1_t2.temp
|
||||
Filter: (pg2dim_h1_t2."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t2
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, avg(pg2dim_h1_t3.temp)
|
||||
Group Key: pg2dim_h1_t3."time", pg2dim_h1_t3.device
|
||||
-> Foreign Scan on public.pg2dim_h1_t3
|
||||
Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, pg2dim_h1_t3.temp
|
||||
Filter: (pg2dim_h1_t3."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t3
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, avg(pg2dim_h2_t1.temp)
|
||||
Group Key: pg2dim_h2_t1."time", pg2dim_h2_t1.device
|
||||
-> Foreign Scan on public.pg2dim_h2_t1
|
||||
Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, pg2dim_h2_t1.temp
|
||||
Filter: (pg2dim_h2_t1."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t1
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, avg(pg2dim_h2_t2.temp)
|
||||
Group Key: pg2dim_h2_t2."time", pg2dim_h2_t2.device
|
||||
-> Foreign Scan on public.pg2dim_h2_t2
|
||||
Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, pg2dim_h2_t2.temp
|
||||
Filter: (pg2dim_h2_t2."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t2
|
||||
-> HashAggregate
|
||||
Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, avg(pg2dim_h2_t3.temp)
|
||||
Group Key: pg2dim_h2_t3."time", pg2dim_h2_t3.device
|
||||
-> Foreign Scan on public.pg2dim_h2_t3
|
||||
Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, pg2dim_h2_t3.temp
|
||||
Filter: (pg2dim_h2_t3."time" < (now() - '@ 3 days'::interval))
|
||||
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t3
|
||||
(45 rows)
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT time, device, avg(temp)
|
||||
FROM hyper
|
||||
WHERE time < Now( ) - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Custom Scan (AsyncAppend)
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Append
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper."time", hyper.device, (avg(hyper.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_1
|
||||
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_2
|
||||
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
(17 rows)
|
||||
|
||||
-- Verify that repeated runs of the same plan will get different timestamps
|
||||
PREPARE timestamp_pushdown_test AS
|
||||
SELECT time, device, avg(temp)
|
||||
FROM hyper
|
||||
WHERE time < now() - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE timestamp_pushdown_test;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Custom Scan (AsyncAppend)
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Append
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper."time", hyper.device, (avg(hyper.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_1
|
||||
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_2
|
||||
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
(17 rows)
|
||||
|
||||
SELECT test_override_pushdown_timestamptz('2019-10-15 00:00'::timestamptz);
|
||||
test_override_pushdown_timestamptz
|
||||
------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE timestamp_pushdown_test;
|
||||
QUERY PLAN
|
||||
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Limit
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Custom Scan (AsyncAppend)
|
||||
Output: "time", device, (avg(temp))
|
||||
-> Append
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper."time", hyper.device, (avg(hyper.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_1
|
||||
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
-> Custom Scan (DataNodeScan)
|
||||
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
|
||||
Relations: Aggregate on (public.hyper)
|
||||
Data node: data_node_2
|
||||
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
|
||||
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
|
||||
(17 rows)
|
||||
|
||||
-- Test one-dimensional push down
|
||||
CREATE TABLE hyper1d (time timestamptz, device int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('hyper1d', 'time', chunk_time_interval => '3 months'::interval);
|
||||
|
@ -7,6 +7,10 @@
|
||||
\ir include/remote_exec.sql
|
||||
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
|
||||
|
||||
CREATE OR REPLACE FUNCTION test_override_pushdown_timestamptz(new_value TIMESTAMPTZ) RETURNS VOID
|
||||
AS :TSL_MODULE_PATHNAME, 'test_override_pushdown_timestamptz'
|
||||
LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
-- Cleanup from other potential tests that created these databases
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP DATABASE IF EXISTS data_node_1;
|
||||
@ -645,6 +649,39 @@ AND time < '2018-06-01 00:00'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
|
||||
-- contains whitelisted time expressions
|
||||
SELECT test_override_pushdown_timestamptz('2018-06-01 00:00'::timestamptz);
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT time, device, avg(temp)
|
||||
FROM pg2dim
|
||||
WHERE time < Now( ) - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
SELECT time, device, avg(temp)
|
||||
FROM hyper
|
||||
WHERE time < Now( ) - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
|
||||
-- Verify that repeated runs of the same plan will get different timestamps
|
||||
PREPARE timestamp_pushdown_test AS
|
||||
SELECT time, device, avg(temp)
|
||||
FROM hyper
|
||||
WHERE time < now() - INTERVAL '3 days'
|
||||
GROUP BY 1, 2
|
||||
LIMIT 1;
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE timestamp_pushdown_test;
|
||||
|
||||
SELECT test_override_pushdown_timestamptz('2019-10-15 00:00'::timestamptz);
|
||||
|
||||
EXPLAIN (VERBOSE, COSTS OFF)
|
||||
EXECUTE timestamp_pushdown_test;
|
||||
|
||||
-- Test one-dimensional push down
|
||||
CREATE TABLE hyper1d (time timestamptz, device int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('hyper1d', 'time', chunk_time_interval => '3 months'::interval);
|
||||
|
@ -10,6 +10,7 @@ set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_persistent_record.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/scan_exec_debug.c
|
||||
)
|
||||
|
||||
target_sources(${TSL_TESTS_LIB_NAME} PRIVATE ${SOURCES})
|
||||
|
23
tsl/test/src/remote/scan_exec_debug.c
Normal file
23
tsl/test/src/remote/scan_exec_debug.c
Normal file
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
|
||||
#include "export.h"
|
||||
#include "fdw/scan_exec.h"
|
||||
|
||||
TS_FUNCTION_INFO_V1(test_override_pushdown_timestamptz);
|
||||
|
||||
Datum
|
||||
test_override_pushdown_timestamptz(PG_FUNCTION_ARGS)
|
||||
{
|
||||
#ifdef TS_DEBUG
|
||||
fdw_scan_debug_override_pushdown_timestamp(PG_GETARG_INT64(0));
|
||||
PG_RETURN_VOID();
|
||||
#else
|
||||
elog(ERROR, "unable to handle test_is_frontend_session without TS_DEBUG flag set");
|
||||
PG_RETURN_VOID();
|
||||
#endif
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user