Support parameterized data node scans in joins

This allows us to perform a nested loop join of a small outer local
table to an inner distributed hypertable, without downloading the
entire hypertable to the access node.
This commit is contained in:
Alexander Kuzmenkov 2022-11-14 18:09:54 +04:00 committed by Alexander Kuzmenkov
parent 9964ba8ba6
commit 121631c70f
9 changed files with 1007 additions and 22 deletions

View File

@ -22,23 +22,27 @@
#include <parser/parsetree.h>
#include <utils/memutils.h>
#include <math.h>
#include <compat/compat.h>
#include <debug.h>
#include <debug_guc.h>
#include <dimension.h>
#include <export.h>
#include <func_cache.h>
#include <hypertable_cache.h>
#include <planner.h>
#include <import/allpaths.h>
#include <import/planner.h>
#include <func_cache.h>
#include <dimension.h>
#include <compat/compat.h>
#include <debug_guc.h>
#include <debug.h>
#include <planner.h>
#include "relinfo.h"
#include "data_node_chunk_assignment.h"
#include "scan_plan.h"
#include "data_node_scan_plan.h"
#include "data_node_chunk_assignment.h"
#include "data_node_scan_exec.h"
#include "deparse.h"
#include "fdw_utils.h"
#include "relinfo.h"
#include "scan_plan.h"
/*
* DataNodeScan is a custom scan implementation for scanning hypertables on
@ -256,6 +260,43 @@ build_data_node_part_rels(PlannerInfo *root, RelOptInfo *hyper_rel, int *nparts)
return part_rels;
}
/* Callback argument for ts_ec_member_matches_foreign */
typedef struct
{
Expr *current; /* current expr, or NULL if not yet found */
List *already_used; /* expressions already dealt with */
} ts_ec_member_foreign_arg;
/*
* Detect whether we want to process an EquivalenceClass member.
*
* This is a callback for use by generate_implied_equalities_for_column.
*/
static bool
ts_ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, EquivalenceClass *ec,
EquivalenceMember *em, void *arg)
{
ts_ec_member_foreign_arg *state = (ts_ec_member_foreign_arg *) arg;
Expr *expr = em->em_expr;
/*
* If we've identified what we're processing in the current scan, we only
* want to match that expression.
*/
if (state->current != NULL)
return equal(expr, state->current);
/*
* Otherwise, ignore anything we've already processed.
*/
if (list_member(state->already_used, expr))
return false;
/* This is the new target to process. */
state->current = expr;
return true;
}
static void
add_data_node_scan_paths(PlannerInfo *root, RelOptInfo *baserel)
{
@ -282,6 +323,222 @@ add_data_node_scan_paths(PlannerInfo *root, RelOptInfo *baserel)
/* Add paths with pathkeys */
fdw_add_paths_with_pathkeys_for_rel(root, baserel, NULL, data_node_scan_path_create);
/*
* Thumb through all join clauses for the rel to identify which outer
* relations could supply one or more safe-to-send-to-remote join clauses.
* We'll build a parameterized path for each such outer relation.
*
* Note that in case we have multiple local tables, this outer relation
* here may be the result of joining the local tables together. For an
* example, see the multiple join in the dist_param test.
*
* It's convenient to represent each candidate outer relation by the
* ParamPathInfo node for it. We can then use the ppi_clauses list in the
* ParamPathInfo node directly as a list of the interesting join clauses for
* that rel. This takes care of the possibility that there are multiple
* safe join clauses for such a rel, and also ensures that we account for
* unsafe join clauses that we'll still have to enforce locally (since the
* parameterized-path machinery insists that we handle all movable clauses).
*/
List *ppi_list = NIL;
ListCell *lc;
foreach (lc, baserel->joininfo)
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
Relids required_outer;
ParamPathInfo *param_info;
/* Check if clause can be moved to this rel */
if (!join_clause_is_movable_to(rinfo, baserel))
{
continue;
}
/* See if it is safe to send to remote */
if (!ts_is_foreign_expr(root, baserel, rinfo->clause))
{
continue;
}
/* Calculate required outer rels for the resulting path */
required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids);
/* We do not want the data node rel itself listed in required_outer */
required_outer = bms_del_member(required_outer, baserel->relid);
/*
* required_outer probably can't be empty here, but if it were, we
* couldn't make a parameterized path.
*/
if (bms_is_empty(required_outer))
{
continue;
}
/* Get the ParamPathInfo */
param_info = get_baserel_parampathinfo(root, baserel, required_outer);
Assert(param_info != NULL);
/*
* Add it to list unless we already have it. Testing pointer equality
* is OK since get_baserel_parampathinfo won't make duplicates.
*/
ppi_list = list_append_unique_ptr(ppi_list, param_info);
}
/*
* The above scan examined only "generic" join clauses, not those that
* were absorbed into EquivalenceClauses. See if we can make anything out
* of EquivalenceClauses.
*/
if (baserel->has_eclass_joins)
{
/*
* We repeatedly scan the eclass list looking for column references
* (or expressions) belonging to the data node rel. Each time we find
* one, we generate a list of equivalence joinclauses for it, and then
* see if any are safe to send to the remote. Repeat till there are
* no more candidate EC members.
*/
ts_ec_member_foreign_arg arg;
arg.already_used = NIL;
for (;;)
{
List *clauses;
/* Make clauses, skipping any that join to lateral_referencers */
arg.current = NULL;
clauses = generate_implied_equalities_for_column(root,
baserel,
ts_ec_member_matches_foreign,
(void *) &arg,
baserel->lateral_referencers);
/* Done if there are no more expressions in the data node rel */
if (arg.current == NULL)
{
Assert(clauses == NIL);
break;
}
/* Scan the extracted join clauses */
foreach (lc, clauses)
{
RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
Relids required_outer;
ParamPathInfo *param_info;
/* Check if clause can be moved to this rel */
if (!join_clause_is_movable_to(rinfo, baserel))
{
continue;
}
/* See if it is safe to send to remote */
if (!ts_is_foreign_expr(root, baserel, rinfo->clause))
{
continue;
}
/* Calculate required outer rels for the resulting path */
required_outer = bms_union(rinfo->clause_relids, baserel->lateral_relids);
required_outer = bms_del_member(required_outer, baserel->relid);
if (bms_is_empty(required_outer))
{
continue;
}
/* Get the ParamPathInfo */
param_info = get_baserel_parampathinfo(root, baserel, required_outer);
Assert(param_info != NULL);
/* Add it to list unless we already have it */
ppi_list = list_append_unique_ptr(ppi_list, param_info);
}
/* Try again, now ignoring the expression we found this time */
arg.already_used = lappend(arg.already_used, arg.current);
}
}
/*
* Now build a path for each useful outer relation.
*/
foreach (lc, ppi_list)
{
ParamPathInfo *param_info = (ParamPathInfo *) lfirst(lc);
Cost startup_cost = 0;
Cost run_cost = 0;
double rows = baserel->tuples > 1 ? baserel->tuples : 123456;
/* Run remote non-join clauses. */
const double remote_sel_sane =
(fpinfo->remote_conds_sel > 0 && fpinfo->remote_conds_sel <= 1) ?
fpinfo->remote_conds_sel :
0.1;
startup_cost += baserel->reltarget->cost.startup;
startup_cost += fpinfo->remote_conds_cost.startup;
run_cost += fpinfo->remote_conds_cost.per_tuple * rows;
run_cost += cpu_tuple_cost * rows;
run_cost += seq_page_cost * baserel->pages;
rows *= remote_sel_sane;
/* Run remote join clauses. */
QualCost remote_join_cost;
cost_qual_eval(&remote_join_cost, param_info->ppi_clauses, root);
/*
* We don't have up to date per-column statistics for distributed
* hypertables currently, so the join estimates are going to be way off.
* The worst is when they are too low and we end up transferring much
* more rows from the data node that we expected. Just hardcode it at
* 0.1 per clause for now.
*/
const double remote_join_sel = pow(0.1, list_length(param_info->ppi_clauses));
startup_cost += remote_join_cost.startup;
run_cost += remote_join_cost.per_tuple * rows;
rows *= remote_join_sel;
/* Transfer the resulting tuples over the network. */
startup_cost += fpinfo->fdw_startup_cost;
run_cost += fpinfo->fdw_tuple_cost * rows;
/* Run local filters. */
const double local_sel_sane =
(fpinfo->local_conds_sel > 0 && fpinfo->local_conds_sel <= 1) ?
fpinfo->local_conds_sel :
0.5;
startup_cost += fpinfo->local_conds_cost.startup;
run_cost += fpinfo->local_conds_cost.per_tuple * rows;
run_cost += cpu_tuple_cost * rows;
rows *= local_sel_sane;
/* Compute the output targetlist. */
run_cost += baserel->reltarget->cost.per_tuple * rows;
/*
* ppi_rows currently won't get looked at by anything, but still we
* may as well ensure that it matches our idea of the rowcount.
*/
param_info->ppi_rows = rows;
/* Make the path */
path = data_node_scan_path_create(root,
baserel,
NULL, /* default pathtarget */
rows,
startup_cost,
startup_cost + run_cost,
NIL, /* no pathkeys */
param_info->ppi_req_outer,
NULL,
NIL); /* no fdw_private list */
add_path(baserel, (Path *) path);
}
}
/*

View File

@ -262,7 +262,7 @@ classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds, L
{
RestrictInfo *ri = lfirst_node(RestrictInfo, lc);
if (is_foreign_expr(root, baserel, ri->clause))
if (ts_is_foreign_expr(root, baserel, ri->clause))
*remote_conds = lappend(*remote_conds, ri);
else
*local_conds = lappend(*local_conds, ri);
@ -391,7 +391,7 @@ foreign_expr_contains_mutable_functions(Node *clause)
* Returns true if given expr is safe to evaluate on the data node.
*/
bool
is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
ts_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr)
{
foreign_glob_cxt glob_cxt;
TsFdwRelInfo *fpinfo = fdw_relinfo_get(baserel);

View File

@ -39,7 +39,7 @@ extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte, Index rtindex,
extern void deparseDeleteSql(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel,
List *returningList, List **retrieved_attrs);
extern bool is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr);
extern bool ts_is_foreign_expr(PlannerInfo *root, RelOptInfo *baserel, Expr *expr);
extern void classify_conditions(PlannerInfo *root, RelOptInfo *baserel, List *input_conds,
List **remote_conds, List **local_conds);

View File

@ -457,15 +457,21 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
}
/*
* Compute the selectivity and cost of the local_conds, so we don't have
* to do it over again for each path. The best we can do for these
* conditions is to estimate selectivity on the basis of local statistics.
* Compute the selectivity and cost of the local and remote conditions, so
* that we don't have to do it over again for each path. The best we can do
* for these conditions is to estimate selectivity on the basis of local
* statistics.
*/
fpinfo->local_conds_sel =
clauselist_selectivity(root, fpinfo->local_conds, rel->relid, JOIN_INNER, NULL);
cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
fpinfo->remote_conds_sel =
clauselist_selectivity(root, fpinfo->remote_conds, rel->relid, JOIN_INNER, NULL);
cost_qual_eval(&fpinfo->remote_conds_cost, fpinfo->remote_conds, root);
/*
* Set cached relation costs to some negative value, so that we can detect
* when they are set to some sensible costs during one (usually the first)

View File

@ -68,6 +68,10 @@ typedef struct TsFdwRelInfo
QualCost local_conds_cost;
Selectivity local_conds_sel;
/* Cost and selectivity of remote_conds. */
QualCost remote_conds_cost;
Selectivity remote_conds_sel;
/* Selectivity of join conditions */
Selectivity joinclause_sel;

View File

@ -75,7 +75,7 @@ get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel)
* checking ec_has_volatile here saves some cycles.
*/
if (pathkey_ec->ec_has_volatile || !(em_expr = find_em_expr_for_rel(pathkey_ec, rel)) ||
!is_foreign_expr(root, rel, em_expr))
!ts_is_foreign_expr(root, rel, em_expr))
{
query_pathkeys_ok = false;
break;
@ -490,7 +490,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path
remote_where = lappend(remote_where, rinfo->clause);
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
else if (is_foreign_expr(root, rel, rinfo->clause))
else if (ts_is_foreign_expr(root, rel, rinfo->clause))
remote_where = lappend(remote_where, rinfo->clause);
else
local_exprs = lappend(local_exprs, rinfo->clause);
@ -706,7 +706,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, GroupPathExtraDa
* If any GROUP BY expression is not shippable, then we cannot
* push down aggregation to the data node.
*/
if (!is_foreign_expr(root, grouped_rel, expr))
if (!ts_is_foreign_expr(root, grouped_rel, expr))
return false;
/*
@ -726,7 +726,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, GroupPathExtraDa
/*
* Non-grouping expression we need to compute. Is it shippable?
*/
if (is_foreign_expr(root, grouped_rel, expr))
if (ts_is_foreign_expr(root, grouped_rel, expr))
{
/* Yes, so add to tlist as-is; OK to suppress duplicates */
tlist = add_to_flat_tlist(tlist, list_make1(expr));
@ -740,7 +740,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, GroupPathExtraDa
* If any aggregate expression is not shippable, then we
* cannot push down aggregation to the data node.
*/
if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
if (!ts_is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
return false;
/*
@ -801,7 +801,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, GroupPathExtraDa
grouped_rel->relids,
NULL,
NULL);
if (is_foreign_expr(root, grouped_rel, expr))
if (ts_is_foreign_expr(root, grouped_rel, expr))
fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
else
fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
@ -837,7 +837,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, GroupPathExtraDa
*/
if (IsA(expr, Aggref))
{
if (!is_foreign_expr(root, grouped_rel, expr))
if (!ts_is_foreign_expr(root, grouped_rel, expr))
return false;
tlist = add_to_flat_tlist(tlist, list_make1(expr));

View File

@ -0,0 +1,483 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test parameterized data node scan.
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
\set DN_DBNAME_1 :TEST_DBNAME _1
-- pg_regress doesn't drop these databases for repeated invocation such as in
-- the flaky check.
set client_min_messages to ERROR;
drop database if exists :"DN_DBNAME_1";
select 1 from add_data_node('data_node_1', host => 'localhost',
database => :'DN_DBNAME_1');
?column?
----------
1
(1 row)
grant usage on foreign server data_node_1 to public;
set role :ROLE_1;
reset client_min_messages;
-- helper function: float -> pseudorandom float [0..1].
create or replace function mix(x float4) returns float4 as $$ select ((hashfloat4(x) / (pow(2., 31) - 1) + 1) / 2)::float4 $$ language sql;
-- distributed hypertable
create table metric_dist(ts timestamptz, id int, value float);
select create_distributed_hypertable('metric_dist', 'ts', 'id');
WARNING: only one data node was assigned to the hypertable
NOTICE: adding not-null constraint to column "ts"
create_distributed_hypertable
-------------------------------
(1,public,metric_dist,t)
(1 row)
insert into metric_dist
select '2022-02-02 02:02:02+03'::timestamptz + interval '1 year' * mix(x),
mix(x + 1.) * 20,
mix(x + 2.) * 50
from generate_series(1, 1000000) x(x)
;
analyze metric_dist;
select count(*) from show_chunks('metric_dist');
count
-------
53
(1 row)
-- dictionary
create table metric_name(id int primary key, name text);
insert into metric_name values (1, 'cpu1'), (3, 'cpu3'), (7, 'cpu7');
analyze metric_name;
-- for predictable plans
set enable_hashjoin to off;
set enable_mergejoin to off;
set enable_hashagg to off;
-- Subquery + IN
select id, max(value), count(*)
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
id | max | count
----+------------------+-------
1 | 49.9941974878311 | 139
3 | 49.3596792221069 | 138
7 | 49.795538187027 | 146
(3 rows)
explain (costs off, verbose)
select id, max(value), count(*)
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_dist.id, max(metric_dist.value), count(*)
Group Key: metric_dist.id
-> Nested Loop
Output: metric_dist.id, metric_dist.value
-> Index Scan using metric_name_pkey on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.id, metric_dist.value
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND (($1::integer = id))
(13 rows)
-- Shippable EC join
select name, max(value), count(*)
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
name | max | count
------+------------------+-------
cpu1 | 49.9941974878311 | 139
cpu3 | 49.3596792221069 | 138
cpu7 | 49.795538187027 | 146
(3 rows)
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_name.name, max(metric_dist.value), count(*)
Group Key: metric_name.name
-> Sort
Output: metric_name.name, metric_dist.value
Sort Key: metric_name.name
-> Nested Loop
Output: metric_name.name, metric_dist.value
-> Seq Scan on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND (($1::integer = id))
(16 rows)
-- Non-shippable EC join
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name on name = concat('cpu', metric_dist.id)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_name.name, max(metric_dist.value), count(*)
Group Key: metric_name.name
-> Sort
Output: metric_name.name, metric_dist.value
Sort Key: metric_name.name
-> Nested Loop
Output: metric_name.name, metric_dist.value
Join Filter: (concat('cpu', metric_dist.id) = metric_name.name)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone))
-> Materialize
Output: metric_name.name
-> Seq Scan on public.metric_name
Output: metric_name.name
(18 rows)
-- Shippable non-EC join. The weird condition is to only use immutable functions
-- that can be shipped to the remote node. `id::text` does CoerceViaIO which is
-- not generally shippable. And `int4out` returns cstring, not text, that's why
-- the `textin` is needed.
select name, max(value), count(*)
from metric_dist join metric_name
on texteq('cpu' || textin(int4out(metric_dist.id)), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
name | max | count
------+------------------+-------
cpu1 | 49.9941974878311 | 139
cpu3 | 49.3596792221069 | 138
cpu7 | 49.795538187027 | 146
(3 rows)
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name
on texteq('cpu' || textin(int4out(metric_dist.id)), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_name.name, max(metric_dist.value), count(*)
Group Key: metric_name.name
-> Sort
Output: metric_name.name, metric_dist.value
Sort Key: metric_name.name
-> Nested Loop
Output: metric_name.name, metric_dist.value
-> Seq Scan on public.metric_name
Output: metric_name.id, metric_name.name
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND (texteq(('cpu'::text || textin(int4out(id))), $1::text))
(15 rows)
-- Non-shippable non-EC join.
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name
on texteq(concat('cpu', textin(int4out(metric_dist.id))), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_name.name, max(metric_dist.value), count(*)
Group Key: metric_name.name
-> Sort
Output: metric_name.name, metric_dist.value
Sort Key: metric_name.name
-> Nested Loop
Output: metric_name.name, metric_dist.value
Join Filter: texteq(concat('cpu', textin(int4out(metric_dist.id))), metric_name.name)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone))
-> Materialize
Output: metric_name.name
-> Seq Scan on public.metric_name
Output: metric_name.name
(18 rows)
-- distinct on, order by, limit 1, with subquery
select distinct on (id)
id, ts, value
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, ts, value
limit 1
;
id | ts | value
----+----------------------------------+------------------
1 | Tue Feb 01 15:03:56.048 2022 PST | 36.1639380455017
(1 row)
explain (costs off, verbose)
select distinct on (id)
id, ts, value
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, ts, value
limit 1
;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: metric_dist.id, metric_dist.ts, metric_dist.value
-> Unique
Output: metric_dist.id, metric_dist.ts, metric_dist.value
-> Nested Loop
Output: metric_dist.id, metric_dist.ts, metric_dist.value
Inner Unique: true
Join Filter: (metric_dist.id = metric_name.id)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.id, metric_dist.ts, metric_dist.value
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT DISTINCT ON (id) ts, id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) ORDER BY id ASC NULLS LAST, ts ASC NULLS LAST, value ASC NULLS LAST
-> Materialize
Output: metric_name.id
-> Seq Scan on public.metric_name
Output: metric_name.id
Filter: (metric_name.name ~~ 'cpu%'::text)
(18 rows)
-- distinct on, order by, limit 1, with explicit join
select distinct on (name)
name, ts, value
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by name, ts, value
limit 1
;
name | ts | value
------+----------------------------------+------------------
cpu1 | Tue Feb 01 15:03:56.048 2022 PST | 36.1639380455017
(1 row)
explain (costs off, verbose)
select distinct on (name)
name, ts, value
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by name, ts, value
limit 1
;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: metric_name.name, metric_dist.ts, metric_dist.value
-> Unique
Output: metric_name.name, metric_dist.ts, metric_dist.value
-> Sort
Output: metric_name.name, metric_dist.ts, metric_dist.value
Sort Key: metric_name.name, metric_dist.ts, metric_dist.value
-> Nested Loop
Output: metric_name.name, metric_dist.ts, metric_dist.value
-> Seq Scan on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.ts, metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT ts, id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND (($1::integer = id))
(17 rows)
-- If the local table is very big, the parameterized nested loop might download
-- the entire dist table or even more than that (in case of not equi-join).
-- Check that the parameterized plan is not chosen in this case.
create table metric_name_big as select * from metric_name;
insert into metric_name_big select x, 'other' || x
from generate_series(1000, 10000) x
;
analyze metric_name_big;
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist
join metric_name_big using (id)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_name_big.name, max(metric_dist.value), count(*)
Group Key: metric_name_big.name
-> Sort
Output: metric_name_big.name, metric_dist.value
Sort Key: metric_name_big.name
-> Nested Loop
Output: metric_name_big.name, metric_dist.value
Join Filter: (metric_dist.id = metric_name_big.id)
-> Seq Scan on public.metric_name_big
Output: metric_name_big.id, metric_name_big.name
-> Materialize
Output: metric_dist.value, metric_dist.id
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.value, metric_dist.id
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone))
(18 rows)
-- An interesting special case is when the remote SQL has a parameter, but it is
-- the result of an initplan. It's not "parameterized" in the join sense, because
-- there is only one param value. This is the most efficient plan for querying a
-- small number of ids.
explain (costs off, verbose)
select id, max(value)
from metric_dist
where id = any((select array_agg(id) from metric_name where name like 'cpu%')::int[])
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_dist.id, max(metric_dist.value)
Group Key: metric_dist.id
InitPlan 1 (returns $0)
-> Aggregate
Output: array_agg(metric_name.id)
-> Seq Scan on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.id, metric_dist.value
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND ((id = ANY ($1::integer[]))) ORDER BY id ASC NULLS LAST
(14 rows)
-- Multiple joins. Test both EC and non-EC (texteq) join in one query.
create table metric_location(id int, location text);
insert into metric_location values (1, 'Yerevan'), (3, 'Dilijan'), (7, 'Stepanakert');
analyze metric_location;
select id, max(value)
from metric_dist natural join metric_location natural join metric_name
where name like 'cpu%' and texteq(location, 'Yerevan')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
;
id | max
----+------------------
1 | 49.9941974878311
(1 row)
explain (costs off, verbose)
select id, max(value)
from metric_dist natural join metric_location natural join metric_name
where name like 'cpu%' and texteq(location, 'Yerevan')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: metric_dist.id, max(metric_dist.value)
Group Key: metric_dist.id
-> Sort
Output: metric_dist.id, metric_dist.value
Sort Key: metric_dist.id
-> Nested Loop
Output: metric_dist.id, metric_dist.value
-> Nested Loop
Output: metric_location.id, metric_name.id
Inner Unique: true
Join Filter: (metric_location.id = metric_name.id)
-> Seq Scan on public.metric_location
Output: metric_location.id, metric_location.location
Filter: texteq(metric_location.location, 'Yerevan'::text)
-> Seq Scan on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.id, metric_dist.value
Data node: data_node_1
Chunks: _dist_hyper_1_52_chunk
Remote SQL: SELECT id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[52]) AND ((ts >= '2022-02-01 15:02:02-08'::timestamp with time zone)) AND ((ts <= '2022-02-02 15:02:02-08'::timestamp with time zone)) AND (($1::integer = id))
(23 rows)
-- Multiple joins on different variables. Use a table instead of a CTE for saner
-- stats.
create table max_value_times as
select distinct on (id) id, ts from metric_dist
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, value desc
;
analyze max_value_times;
explain (costs off, verbose)
select id, value
from metric_dist natural join max_value_times natural join metric_name
where name like 'cpu%'
order by 1
;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Nested Loop
Output: metric_dist.id, metric_dist.value
-> Nested Loop
Output: max_value_times.ts, max_value_times.id, metric_name.id
Join Filter: (max_value_times.id = metric_name.id)
-> Index Scan using metric_name_pkey on public.metric_name
Output: metric_name.id, metric_name.name
Filter: (metric_name.name ~~ 'cpu%'::text)
-> Materialize
Output: max_value_times.ts, max_value_times.id
-> Seq Scan on public.max_value_times
Output: max_value_times.ts, max_value_times.id
-> Custom Scan (DataNodeScan) on public.metric_dist
Output: metric_dist.id, metric_dist.value, metric_dist.ts
Data node: data_node_1
Chunks: _dist_hyper_1_1_chunk, _dist_hyper_1_2_chunk, _dist_hyper_1_3_chunk, _dist_hyper_1_4_chunk, _dist_hyper_1_5_chunk, _dist_hyper_1_6_chunk, _dist_hyper_1_7_chunk, _dist_hyper_1_8_chunk, _dist_hyper_1_9_chunk, _dist_hyper_1_10_chunk, _dist_hyper_1_11_chunk, _dist_hyper_1_12_chunk, _dist_hyper_1_13_chunk, _dist_hyper_1_14_chunk, _dist_hyper_1_15_chunk, _dist_hyper_1_16_chunk, _dist_hyper_1_17_chunk, _dist_hyper_1_18_chunk, _dist_hyper_1_19_chunk, _dist_hyper_1_20_chunk, _dist_hyper_1_21_chunk, _dist_hyper_1_22_chunk, _dist_hyper_1_23_chunk, _dist_hyper_1_24_chunk, _dist_hyper_1_25_chunk, _dist_hyper_1_26_chunk, _dist_hyper_1_27_chunk, _dist_hyper_1_28_chunk, _dist_hyper_1_29_chunk, _dist_hyper_1_30_chunk, _dist_hyper_1_31_chunk, _dist_hyper_1_32_chunk, _dist_hyper_1_33_chunk, _dist_hyper_1_34_chunk, _dist_hyper_1_35_chunk, _dist_hyper_1_36_chunk, _dist_hyper_1_37_chunk, _dist_hyper_1_38_chunk, _dist_hyper_1_39_chunk, _dist_hyper_1_40_chunk, _dist_hyper_1_41_chunk, _dist_hyper_1_42_chunk, _dist_hyper_1_43_chunk, _dist_hyper_1_44_chunk, _dist_hyper_1_45_chunk, _dist_hyper_1_46_chunk, _dist_hyper_1_47_chunk, _dist_hyper_1_48_chunk, _dist_hyper_1_49_chunk, _dist_hyper_1_50_chunk, _dist_hyper_1_51_chunk, _dist_hyper_1_52_chunk, _dist_hyper_1_53_chunk
Remote SQL: SELECT ts, id, value FROM public.metric_dist WHERE _timescaledb_internal.chunks_in(public.metric_dist.*, ARRAY[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53]) AND (($1::timestamp with time zone = ts)) AND (($2::integer = id))
(17 rows)

View File

@ -16,6 +16,7 @@ set(TEST_FILES
compression_bgw.sql
compression_permissions.sql
compression_qualpushdown.sql
dist_param.sql
dist_views.sql
exp_cagg_monthly.sql
exp_cagg_next_gen.sql

234
tsl/test/sql/dist_param.sql Normal file
View File

@ -0,0 +1,234 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Test parameterized data node scan.
\c :TEST_DBNAME :ROLE_CLUSTER_SUPERUSER;
\set DN_DBNAME_1 :TEST_DBNAME _1
-- pg_regress doesn't drop these databases for repeated invocation such as in
-- the flaky check.
set client_min_messages to ERROR;
drop database if exists :"DN_DBNAME_1";
select 1 from add_data_node('data_node_1', host => 'localhost',
database => :'DN_DBNAME_1');
grant usage on foreign server data_node_1 to public;
set role :ROLE_1;
reset client_min_messages;
-- helper function: float -> pseudorandom float [0..1].
create or replace function mix(x float4) returns float4 as $$ select ((hashfloat4(x) / (pow(2., 31) - 1) + 1) / 2)::float4 $$ language sql;
-- distributed hypertable
create table metric_dist(ts timestamptz, id int, value float);
select create_distributed_hypertable('metric_dist', 'ts', 'id');
insert into metric_dist
select '2022-02-02 02:02:02+03'::timestamptz + interval '1 year' * mix(x),
mix(x + 1.) * 20,
mix(x + 2.) * 50
from generate_series(1, 1000000) x(x)
;
analyze metric_dist;
select count(*) from show_chunks('metric_dist');
-- dictionary
create table metric_name(id int primary key, name text);
insert into metric_name values (1, 'cpu1'), (3, 'cpu3'), (7, 'cpu7');
analyze metric_name;
-- for predictable plans
set enable_hashjoin to off;
set enable_mergejoin to off;
set enable_hashagg to off;
-- Subquery + IN
select id, max(value), count(*)
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
explain (costs off, verbose)
select id, max(value), count(*)
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
-- Shippable EC join
select name, max(value), count(*)
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
-- Non-shippable EC join
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name on name = concat('cpu', metric_dist.id)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
-- Shippable non-EC join. The weird condition is to only use immutable functions
-- that can be shipped to the remote node. `id::text` does CoerceViaIO which is
-- not generally shippable. And `int4out` returns cstring, not text, that's why
-- the `textin` is needed.
select name, max(value), count(*)
from metric_dist join metric_name
on texteq('cpu' || textin(int4out(metric_dist.id)), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name
on texteq('cpu' || textin(int4out(metric_dist.id)), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
-- Non-shippable non-EC join.
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist join metric_name
on texteq(concat('cpu', textin(int4out(metric_dist.id))), name)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
-- distinct on, order by, limit 1, with subquery
select distinct on (id)
id, ts, value
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, ts, value
limit 1
;
explain (costs off, verbose)
select distinct on (id)
id, ts, value
from metric_dist
where id in (select id from metric_name where name like 'cpu%')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, ts, value
limit 1
;
-- distinct on, order by, limit 1, with explicit join
select distinct on (name)
name, ts, value
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by name, ts, value
limit 1
;
explain (costs off, verbose)
select distinct on (name)
name, ts, value
from metric_dist join metric_name using (id)
where name like 'cpu%'
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by name, ts, value
limit 1
;
-- If the local table is very big, the parameterized nested loop might download
-- the entire dist table or even more than that (in case of not equi-join).
-- Check that the parameterized plan is not chosen in this case.
create table metric_name_big as select * from metric_name;
insert into metric_name_big select x, 'other' || x
from generate_series(1000, 10000) x
;
analyze metric_name_big;
explain (costs off, verbose)
select name, max(value), count(*)
from metric_dist
join metric_name_big using (id)
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by name
order by name
;
-- An interesting special case is when the remote SQL has a parameter, but it is
-- the result of an initplan. It's not "parameterized" in the join sense, because
-- there is only one param value. This is the most efficient plan for querying a
-- small number of ids.
explain (costs off, verbose)
select id, max(value)
from metric_dist
where id = any((select array_agg(id) from metric_name where name like 'cpu%')::int[])
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
order by id
;
-- Multiple joins. Test both EC and non-EC (texteq) join in one query.
create table metric_location(id int, location text);
insert into metric_location values (1, 'Yerevan'), (3, 'Dilijan'), (7, 'Stepanakert');
analyze metric_location;
select id, max(value)
from metric_dist natural join metric_location natural join metric_name
where name like 'cpu%' and texteq(location, 'Yerevan')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
;
explain (costs off, verbose)
select id, max(value)
from metric_dist natural join metric_location natural join metric_name
where name like 'cpu%' and texteq(location, 'Yerevan')
and ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
group by id
;
-- Multiple joins on different variables. Use a table instead of a CTE for saner
-- stats.
create table max_value_times as
select distinct on (id) id, ts from metric_dist
where ts between '2022-02-02 02:02:02+03' and '2022-02-03 02:02:02+03'
order by id, value desc
;
analyze max_value_times;
explain (costs off, verbose)
select id, value
from metric_dist natural join max_value_times natural join metric_name
where name like 'cpu%'
order by 1
;