mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-18 19:59:48 +08:00
Allow pushdown of reference table joins
This patch adds the functionality that is needed to perform distributed, parallel joins on reference tables on access nodes. This code allows the pushdown of a join if: * (1) The setting "ts_guc_enable_per_data_node_queries" is enabled * (2) The outer relation is a distributed hypertable * (3) The inner relation is marked as a reference table * (4) The join is a left join or an inner join
This commit is contained in:
parent
f12a361ef7
commit
e0be9eaa28
@ -42,6 +42,7 @@ Sooner to that time, we will announce the specific version of TimescaleDB in whi
|
||||
* #5262 Extend enabling compression on a continuous aggregrate with 'compress_segmentby' and 'compress_orderby' parameters
|
||||
* #5343 Set PortalContext when starting job
|
||||
* #5312 Add timeout support to the ping_data_node()
|
||||
* #5212 Allow pushdown of reference table joins
|
||||
|
||||
**Bugfixes**
|
||||
* #5214 Fix use of prepared statement in async module
|
||||
|
@ -402,6 +402,13 @@ ts_tsl_loaded(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_BOOL(ts_cm_functions != &ts_cm_functions_default);
|
||||
}
|
||||
|
||||
static void
|
||||
mn_get_foreign_join_path_default_fn_pg_community(PlannerInfo *root, RelOptInfo *joinrel,
|
||||
RelOptInfo *outerrel, RelOptInfo *innerrel,
|
||||
JoinType jointype, JoinPathExtraData *extra)
|
||||
{
|
||||
}
|
||||
|
||||
/*
|
||||
* Define cross-module functions' default values:
|
||||
* If the submodule isn't activated, using one of the cm functions will throw an
|
||||
@ -555,6 +562,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.hypertable_distributed_set_replication_factor = error_no_default_fn_pg_community,
|
||||
.update_compressed_chunk_relstats = update_compressed_chunk_relstats_default,
|
||||
.health_check = error_no_default_fn_pg_community,
|
||||
.mn_get_foreign_join_paths = mn_get_foreign_join_path_default_fn_pg_community,
|
||||
};
|
||||
|
||||
TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
|
||||
|
@ -203,6 +203,9 @@ typedef struct CrossModuleFunctions
|
||||
PGFunction chunks_drop_stale;
|
||||
void (*update_compressed_chunk_relstats)(Oid uncompressed_relid, Oid compressed_relid);
|
||||
PGFunction health_check;
|
||||
void (*mn_get_foreign_join_paths)(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
|
||||
RelOptInfo *innerrel, JoinType jointype,
|
||||
JoinPathExtraData *extra);
|
||||
} CrossModuleFunctions;
|
||||
|
||||
extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
|
||||
|
@ -110,6 +110,7 @@ void _planner_fini(void);
|
||||
|
||||
static planner_hook_type prev_planner_hook;
|
||||
static set_rel_pathlist_hook_type prev_set_rel_pathlist_hook;
|
||||
static set_join_pathlist_hook_type prev_set_join_pathlist_hook;
|
||||
static get_relation_info_hook_type prev_get_relation_info_hook;
|
||||
static create_upper_paths_hook_type prev_create_upper_paths_hook;
|
||||
static void cagg_reorder_groupby_clause(RangeTblEntry *subq_rte, Index rtno, List *outer_sortcl,
|
||||
@ -1662,13 +1663,36 @@ cagg_reorder_groupby_clause(RangeTblEntry *subq_rte, Index rtno, List *outer_sor
|
||||
}
|
||||
}
|
||||
|
||||
/* Register our join pushdown hook. Becuase for PostgreSQL the tables we are operating
|
||||
* on are local tables. So, the FDW hooks are not called. Register our join path
|
||||
* generation as a generic planer hook.
|
||||
*/
|
||||
|
||||
static void
|
||||
timescaledb_set_join_pathlist_hook(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
|
||||
RelOptInfo *innerrel, JoinType jointype,
|
||||
JoinPathExtraData *extra)
|
||||
{
|
||||
/* Left table has to be a distributed hypertable */
|
||||
TimescaleDBPrivate *outerrel_private = outerrel->fdw_private;
|
||||
if (outerrel_private != NULL && outerrel_private->fdw_relation_info != NULL)
|
||||
ts_cm_functions
|
||||
->mn_get_foreign_join_paths(root, joinrel, outerrel, innerrel, jointype, extra);
|
||||
|
||||
/* Call next hook in chain */
|
||||
if (prev_set_join_pathlist_hook != NULL)
|
||||
(*prev_set_join_pathlist_hook)(root, joinrel, outerrel, innerrel, jointype, extra);
|
||||
}
|
||||
|
||||
void
|
||||
_planner_init(void)
|
||||
{
|
||||
prev_planner_hook = planner_hook;
|
||||
planner_hook = timescaledb_planner;
|
||||
prev_set_rel_pathlist_hook = set_rel_pathlist_hook;
|
||||
prev_set_join_pathlist_hook = set_join_pathlist_hook;
|
||||
set_rel_pathlist_hook = timescaledb_set_rel_pathlist;
|
||||
set_join_pathlist_hook = timescaledb_set_join_pathlist_hook;
|
||||
|
||||
prev_get_relation_info_hook = get_relation_info_hook;
|
||||
get_relation_info_hook = timescaledb_get_relation_info_hook;
|
||||
@ -1682,6 +1706,7 @@ _planner_fini(void)
|
||||
{
|
||||
planner_hook = prev_planner_hook;
|
||||
set_rel_pathlist_hook = prev_set_rel_pathlist_hook;
|
||||
set_join_pathlist_hook = prev_set_join_pathlist_hook;
|
||||
get_relation_info_hook = prev_get_relation_info_hook;
|
||||
create_upper_paths_hook = prev_create_upper_paths_hook;
|
||||
}
|
||||
|
@ -43,6 +43,10 @@
|
||||
#include "fdw_utils.h"
|
||||
#include "relinfo.h"
|
||||
#include "scan_plan.h"
|
||||
#include "estimate.h"
|
||||
#include "planner/planner.h"
|
||||
#include "chunk.h"
|
||||
#include "debug_assert.h"
|
||||
|
||||
/*
|
||||
* DataNodeScan is a custom scan implementation for scanning hypertables on
|
||||
@ -70,11 +74,21 @@ static Path *data_node_scan_path_create(PlannerInfo *root, RelOptInfo *rel, Path
|
||||
double rows, Cost startup_cost, Cost total_cost,
|
||||
List *pathkeys, Relids required_outer, Path *fdw_outerpath,
|
||||
List *private);
|
||||
|
||||
static Path *data_node_join_path_create(PlannerInfo *root, RelOptInfo *rel, PathTarget *target,
|
||||
double rows, Cost startup_cost, Cost total_cost,
|
||||
List *pathkeys, Relids required_outer, Path *fdw_outerpath,
|
||||
List *private);
|
||||
|
||||
static Path *data_node_scan_upper_path_create(PlannerInfo *root, RelOptInfo *rel,
|
||||
PathTarget *target, double rows, Cost startup_cost,
|
||||
Cost total_cost, List *pathkeys, Path *fdw_outerpath,
|
||||
List *private);
|
||||
|
||||
static bool fdw_pushdown_foreign_join(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
|
||||
RelOptInfo *outerrel, RelOptInfo *innerrel,
|
||||
JoinPathExtraData *extra);
|
||||
|
||||
static AppendRelInfo *
|
||||
create_append_rel_info(PlannerInfo *root, Index childrelid, Index parentrelid)
|
||||
{
|
||||
@ -741,6 +755,658 @@ push_down_group_bys(PlannerInfo *root, RelOptInfo *hyper_rel, Hyperspace *hs,
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the query performs a join between a hypertable (outer) and a reference
|
||||
* table (inner) and the join type is a LEFT JOIN, an INNER JOIN, or an implicit
|
||||
* join.
|
||||
*/
|
||||
static bool
|
||||
is_safe_to_pushdown_reftable_join(PlannerInfo *root, List *join_reference_tables,
|
||||
RangeTblEntry *innertableref, JoinType jointype)
|
||||
{
|
||||
Assert(root != NULL);
|
||||
Assert(innertableref != NULL);
|
||||
|
||||
/*
|
||||
* We support pushing down of INNER and LEFT joins only.
|
||||
*
|
||||
* Constructing queries representing partitioned FULL, SEMI, and ANTI
|
||||
* joins is hard, hence not considered right now.
|
||||
*/
|
||||
if (jointype != JOIN_INNER && jointype != JOIN_LEFT)
|
||||
return false;
|
||||
|
||||
/* Check that at least one reference table is defined. */
|
||||
if (join_reference_tables == NIL)
|
||||
return false;
|
||||
|
||||
/* Only queries with two tables are supported. */
|
||||
if (bms_num_members(root->all_baserels) != 2)
|
||||
return false;
|
||||
|
||||
/* Right table has to be a distributed hypertable */
|
||||
if (!list_member_oid(join_reference_tables, innertableref->relid))
|
||||
return false;
|
||||
|
||||
/* Join can be pushed down */
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Assess whether the join between inner and outer relations can be pushed down
|
||||
* to the foreign server. As a side effect, save information we obtain in this
|
||||
* function to TsFdwRelInfo passed in.
|
||||
*
|
||||
* The code is based on PostgreSQL's foreign_join_ok function (version 15.1).
|
||||
*/
|
||||
static bool
|
||||
fdw_pushdown_foreign_join(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
|
||||
RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra)
|
||||
{
|
||||
TsFdwRelInfo *fpinfo;
|
||||
TsFdwRelInfo *fpinfo_o;
|
||||
TsFdwRelInfo *fpinfo_i;
|
||||
ListCell *lc;
|
||||
List *joinclauses;
|
||||
|
||||
/*
|
||||
* If either of the joining relations is marked as unsafe to pushdown, the
|
||||
* join can not be pushed down.
|
||||
*/
|
||||
fpinfo = fdw_relinfo_get(joinrel);
|
||||
fpinfo_o = fdw_relinfo_get(outerrel);
|
||||
fpinfo_i = fdw_relinfo_get(innerrel);
|
||||
|
||||
Assert(fpinfo_o != NULL);
|
||||
Assert(fpinfo_o->pushdown_safe);
|
||||
Assert(fpinfo_i != NULL);
|
||||
Assert(fpinfo_i->pushdown_safe);
|
||||
|
||||
/*
|
||||
* If joining relations have local conditions, those conditions are
|
||||
* required to be applied before joining the relations. Hence the join can
|
||||
* not be pushed down (shouldn't happen in the current implementation).
|
||||
*/
|
||||
Assert(fpinfo_o->local_conds == NULL);
|
||||
Assert(fpinfo_i->local_conds == NULL);
|
||||
|
||||
fpinfo->server = fpinfo_o->server;
|
||||
|
||||
/*
|
||||
* Separate restrict list into join quals and pushed-down (other) quals.
|
||||
*
|
||||
* Join quals belonging to an outer join must all be shippable, else we
|
||||
* cannot execute the join remotely. Add such quals to 'joinclauses'.
|
||||
*
|
||||
* Add other quals to fpinfo->remote_conds if they are shippable, else to
|
||||
* fpinfo->local_conds. In an inner join it's okay to execute conditions
|
||||
* either locally or remotely; the same is true for pushed-down conditions
|
||||
* at an outer join.
|
||||
*
|
||||
* Note we might return failure after having already scribbled on
|
||||
* fpinfo->remote_conds and fpinfo->local_conds. That's okay because we
|
||||
* won't consult those lists again if we deem the join unshippable.
|
||||
*/
|
||||
joinclauses = NIL;
|
||||
foreach (lc, extra->restrictlist)
|
||||
{
|
||||
RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
|
||||
bool is_remote_clause = ts_is_foreign_expr(root, joinrel, rinfo->clause);
|
||||
|
||||
if (IS_OUTER_JOIN(jointype) && !RINFO_IS_PUSHED_DOWN(rinfo, joinrel->relids))
|
||||
{
|
||||
if (!is_remote_clause)
|
||||
return false;
|
||||
joinclauses = lappend(joinclauses, rinfo);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (is_remote_clause)
|
||||
fpinfo->remote_conds = lappend(fpinfo->remote_conds, rinfo);
|
||||
else
|
||||
fpinfo->local_conds = lappend(fpinfo->local_conds, rinfo);
|
||||
}
|
||||
}
|
||||
|
||||
if (fpinfo->local_conds != NIL)
|
||||
return false;
|
||||
|
||||
/* Save the join clauses, for later use. */
|
||||
fpinfo->joinclauses = joinclauses;
|
||||
|
||||
/*
|
||||
* deparseExplicitTargetList() isn't smart enough to handle anything other
|
||||
* than a Var. In particular, if there's some PlaceHolderVar that would
|
||||
* need to be evaluated within this join tree (because there's an upper
|
||||
* reference to a quantity that may go to NULL as a result of an outer
|
||||
* join), then we can't try to push the join down because we'll fail when
|
||||
* we get to deparseExplicitTargetList(). However, a PlaceHolderVar that
|
||||
* needs to be evaluated *at the top* of this join tree is OK, because we
|
||||
* can do that locally after fetching the results from the remote side.
|
||||
*
|
||||
* Note: At the moment, the placeholder code is not used in our current join
|
||||
* pushdown implementation.
|
||||
*/
|
||||
#ifdef ENABLE_DEAD_CODE
|
||||
foreach (lc, root->placeholder_list)
|
||||
{
|
||||
PlaceHolderInfo *phinfo = lfirst(lc);
|
||||
Relids relids;
|
||||
|
||||
/* PlaceHolderInfo refers to parent relids, not child relids. */
|
||||
relids = IS_OTHER_REL(joinrel) ? joinrel->top_parent_relids : joinrel->relids;
|
||||
|
||||
if (bms_is_subset(phinfo->ph_eval_at, relids) &&
|
||||
bms_nonempty_difference(relids, phinfo->ph_eval_at))
|
||||
return false;
|
||||
}
|
||||
#endif
|
||||
|
||||
fpinfo->outerrel = outerrel;
|
||||
fpinfo->innerrel = innerrel;
|
||||
fpinfo->jointype = jointype;
|
||||
|
||||
/*
|
||||
* By default, both the input relations are not required to be deparsed as
|
||||
* subqueries, but there might be some relations covered by the input
|
||||
* relations that are required to be deparsed as subqueries, so save the
|
||||
* relids of those relations for later use by the deparser.
|
||||
*/
|
||||
fpinfo->make_outerrel_subquery = false;
|
||||
fpinfo->make_innerrel_subquery = false;
|
||||
Assert(bms_is_subset(fpinfo_o->lower_subquery_rels, outerrel->relids));
|
||||
Assert(bms_is_subset(fpinfo_i->lower_subquery_rels, innerrel->relids));
|
||||
fpinfo->lower_subquery_rels =
|
||||
bms_union(fpinfo_o->lower_subquery_rels, fpinfo_i->lower_subquery_rels);
|
||||
|
||||
/*
|
||||
* Pull the other remote conditions from the joining relations into join
|
||||
* clauses or other remote clauses (remote_conds) of this relation
|
||||
* wherever possible. This avoids building subqueries at every join step.
|
||||
*
|
||||
* For an inner join, clauses from both the relations are added to the
|
||||
* other remote clauses. For LEFT and RIGHT OUTER join, the clauses from
|
||||
* the outer side are added to remote_conds since those can be evaluated
|
||||
* after the join is evaluated. The clauses from inner side are added to
|
||||
* the joinclauses, since they need to be evaluated while constructing the
|
||||
* join.
|
||||
*
|
||||
* For a FULL OUTER JOIN, the other clauses from either relation can not
|
||||
* be added to the joinclauses or remote_conds, since each relation acts
|
||||
* as an outer relation for the other.
|
||||
*
|
||||
* The joining sides can not have local conditions, thus no need to test
|
||||
* shippability of the clauses being pulled up.
|
||||
*/
|
||||
switch (jointype)
|
||||
{
|
||||
case JOIN_INNER:
|
||||
#if PG14_GE
|
||||
fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_i->remote_conds);
|
||||
fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_o->remote_conds);
|
||||
#else
|
||||
fpinfo->remote_conds =
|
||||
list_concat(fpinfo->remote_conds, list_copy(fpinfo_i->remote_conds));
|
||||
fpinfo->remote_conds =
|
||||
list_concat(fpinfo->remote_conds, list_copy(fpinfo_o->remote_conds));
|
||||
#endif
|
||||
break;
|
||||
|
||||
case JOIN_LEFT:
|
||||
#if PG14_GE
|
||||
fpinfo->joinclauses = list_concat(fpinfo->joinclauses, fpinfo_i->remote_conds);
|
||||
fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_o->remote_conds);
|
||||
#else
|
||||
fpinfo->joinclauses =
|
||||
list_concat(fpinfo->joinclauses, list_copy(fpinfo_i->remote_conds));
|
||||
fpinfo->remote_conds =
|
||||
list_concat(fpinfo->remote_conds, list_copy(fpinfo_o->remote_conds));
|
||||
#endif
|
||||
break;
|
||||
|
||||
/* Right and full joins are not supported at the moment */
|
||||
#ifdef ENABLE_DEAD_CODE
|
||||
case JOIN_RIGHT:
|
||||
#if PG14_GE
|
||||
fpinfo->joinclauses = list_concat(fpinfo->joinclauses, fpinfo_o->remote_conds);
|
||||
fpinfo->remote_conds = list_concat(fpinfo->remote_conds, fpinfo_i->remote_conds);
|
||||
#else
|
||||
fpinfo->joinclauses =
|
||||
list_concat(fpinfo->joinclauses, list_copy(fpinfo_o->remote_conds));
|
||||
fpinfo->remote_conds =
|
||||
list_concat(fpinfo->remote_conds, list_copy(fpinfo_i->remote_conds));
|
||||
#endif
|
||||
break;
|
||||
|
||||
case JOIN_FULL:
|
||||
|
||||
/*
|
||||
* In this case, if any of the input relations has conditions, we
|
||||
* need to deparse that relation as a subquery so that the
|
||||
* conditions can be evaluated before the join. Remember it in
|
||||
* the fpinfo of this relation so that the deparser can take
|
||||
* appropriate action. Also, save the relids of base relations
|
||||
* covered by that relation for later use by the deparser.
|
||||
*/
|
||||
if (fpinfo_o->remote_conds)
|
||||
{
|
||||
fpinfo->make_outerrel_subquery = true;
|
||||
fpinfo->lower_subquery_rels =
|
||||
bms_add_members(fpinfo->lower_subquery_rels, outerrel->relids);
|
||||
}
|
||||
if (fpinfo_i->remote_conds)
|
||||
{
|
||||
fpinfo->make_innerrel_subquery = true;
|
||||
fpinfo->lower_subquery_rels =
|
||||
bms_add_members(fpinfo->lower_subquery_rels, innerrel->relids);
|
||||
}
|
||||
break;
|
||||
#endif
|
||||
|
||||
default:
|
||||
/* Should not happen, we have just checked this above */
|
||||
elog(ERROR, "unsupported join type %d", jointype);
|
||||
}
|
||||
|
||||
/*
|
||||
* For an inner join, all restrictions can be treated alike. Treating the
|
||||
* pushed down conditions as join conditions allows a top level full outer
|
||||
* join to be deparsed without requiring subqueries.
|
||||
*/
|
||||
if (jointype == JOIN_INNER)
|
||||
{
|
||||
Assert(!fpinfo->joinclauses);
|
||||
fpinfo->joinclauses = fpinfo->remote_conds;
|
||||
fpinfo->remote_conds = NIL;
|
||||
}
|
||||
/* Mark that this join can be pushed down safely */
|
||||
fpinfo->pushdown_safe = true;
|
||||
|
||||
/*
|
||||
* Set the string describing this join relation to be used in EXPLAIN
|
||||
* output of corresponding ForeignScan. Note that the decoration we add
|
||||
* to the base relation names mustn't include any digits, or it'll confuse
|
||||
* postgresExplainForeignScan.
|
||||
*/
|
||||
fpinfo->relation_name = makeStringInfo();
|
||||
appendStringInfo(fpinfo->relation_name,
|
||||
"(%s) %s JOIN (%s)",
|
||||
fpinfo_o->relation_name->data,
|
||||
get_jointype_name(fpinfo->jointype),
|
||||
fpinfo_i->relation_name->data);
|
||||
|
||||
/*
|
||||
* Set the relation index. This is defined as the position of this
|
||||
* joinrel in the join_rel_list list plus the length of the rtable list.
|
||||
* Note that since this joinrel is at the end of the join_rel_list list
|
||||
* when we are called, we can get the position by list_length.
|
||||
*/
|
||||
fpinfo->relation_index = list_length(root->parse->rtable) + list_length(root->join_rel_list);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if the given hypertable is a distributed hypertable.
|
||||
*/
|
||||
static bool
|
||||
is_distributed_hypertable(Oid hypertable_reloid)
|
||||
{
|
||||
Cache *hcache;
|
||||
|
||||
Hypertable *ht =
|
||||
ts_hypertable_cache_get_cache_and_entry(hypertable_reloid, CACHE_FLAG_MISSING_OK, &hcache);
|
||||
|
||||
/* perform check before cache is released */
|
||||
bool ht_is_distributed = (ht != NULL && hypertable_is_distributed(ht));
|
||||
ts_cache_release(hcache);
|
||||
|
||||
return ht_is_distributed;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a new join partition RelOptInfo data structure for a partition. The data
|
||||
* structure is based on the parameter joinrel. The paramater is taken as template
|
||||
* and adjusted for the partition provided by the parameter data_node_rel.
|
||||
*/
|
||||
static RelOptInfo *
|
||||
create_data_node_joinrel(PlannerInfo *root, RelOptInfo *innerrel, RelOptInfo *joinrel,
|
||||
RelOptInfo *data_node_rel, AppendRelInfo *appinfo)
|
||||
{
|
||||
RelOptInfo *join_partition = palloc(sizeof(RelOptInfo));
|
||||
memcpy(join_partition, joinrel, sizeof(RelOptInfo));
|
||||
|
||||
/* Create a new relinfo for the join partition */
|
||||
join_partition->fdw_private = NULL;
|
||||
TsFdwRelInfo *join_part_fpinfo = fdw_relinfo_create(root,
|
||||
join_partition,
|
||||
data_node_rel->serverid,
|
||||
InvalidOid,
|
||||
TS_FDW_RELINFO_REFERENCE_JOIN_PARTITION);
|
||||
|
||||
Assert(join_part_fpinfo != NULL);
|
||||
|
||||
TsFdwRelInfo *data_node_rel_fpinfo = fdw_relinfo_get(data_node_rel);
|
||||
Assert(data_node_rel_fpinfo != NULL);
|
||||
|
||||
/* Copy chunk assignment from hypertable */
|
||||
join_part_fpinfo->sca = data_node_rel_fpinfo->sca;
|
||||
|
||||
/* Set parameters of the join partition */
|
||||
join_partition->relid = data_node_rel->relid;
|
||||
join_partition->relids = bms_copy(data_node_rel->relids);
|
||||
join_partition->relids = bms_add_members(join_partition->relids, innerrel->relids);
|
||||
join_partition->pathlist = NIL;
|
||||
join_partition->partial_pathlist = NIL;
|
||||
|
||||
/* Set the reltarget expressions of the partition based on the reltarget expressions
|
||||
* of the join and adjust them for the partition */
|
||||
join_partition->reltarget = create_empty_pathtarget();
|
||||
join_partition->reltarget->sortgrouprefs = joinrel->reltarget->sortgrouprefs;
|
||||
join_partition->reltarget->cost = joinrel->reltarget->cost;
|
||||
join_partition->reltarget->width = joinrel->reltarget->width;
|
||||
#if PG14_GE
|
||||
join_partition->reltarget->has_volatile_expr = joinrel->reltarget->has_volatile_expr;
|
||||
#endif
|
||||
join_partition->reltarget->exprs =
|
||||
castNode(List,
|
||||
adjust_appendrel_attrs(root, (Node *) joinrel->reltarget->exprs, 1, &appinfo));
|
||||
|
||||
return join_partition;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create a JoinPathExtraData data structure for a partition. The new struct is based on the
|
||||
* original JoinPathExtraData of the join and the AppendRelInfo of the partition.
|
||||
*/
|
||||
static JoinPathExtraData *
|
||||
create_data_node_joinrel_extra(PlannerInfo *root, JoinPathExtraData *extra, AppendRelInfo *appinfo)
|
||||
{
|
||||
JoinPathExtraData *partition_extra = palloc(sizeof(JoinPathExtraData));
|
||||
partition_extra->inner_unique = extra->inner_unique;
|
||||
partition_extra->sjinfo = extra->sjinfo;
|
||||
partition_extra->semifactors = extra->semifactors;
|
||||
partition_extra->param_source_rels = extra->param_source_rels;
|
||||
partition_extra->mergeclause_list =
|
||||
castNode(List, adjust_appendrel_attrs(root, (Node *) extra->mergeclause_list, 1, &appinfo));
|
||||
partition_extra->restrictlist =
|
||||
castNode(List, adjust_appendrel_attrs(root, (Node *) extra->restrictlist, 1, &appinfo));
|
||||
|
||||
return partition_extra;
|
||||
}
|
||||
|
||||
/*
|
||||
* Generate the paths for a pushed down join. Each data node will be considered as a partition
|
||||
* of the join. The join can be pushed down if:
|
||||
*
|
||||
* (1) The setting "ts_guc_enable_per_data_node_queries" is enabled
|
||||
* (2) The outer relation is a distributed hypertable
|
||||
* (3) The inner relation is marked as a reference table
|
||||
* (4) The join is a left join or an inner join
|
||||
*
|
||||
* The join will be performed between the multiple DataNodeRels (see function
|
||||
* build_data_node_part_rels) and the original innerrel of the join (the reftable).
|
||||
*
|
||||
* The code is based on PostgreSQL's postgresGetForeignJoinPaths function
|
||||
* (version 15.1).
|
||||
*/
|
||||
void
|
||||
data_node_generate_pushdown_join_paths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
|
||||
RelOptInfo *innerrel, JoinType jointype,
|
||||
JoinPathExtraData *extra)
|
||||
{
|
||||
TsFdwRelInfo *fpinfo;
|
||||
Path *joinpath;
|
||||
double rows = 0;
|
||||
int width = 0;
|
||||
Cost startup_cost = 0;
|
||||
Cost total_cost = 0;
|
||||
Path *epq_path = NULL;
|
||||
RelOptInfo **hyper_table_rels;
|
||||
RelOptInfo **join_partition_rels;
|
||||
int nhyper_table_rels;
|
||||
List *join_part_rels_list = NIL;
|
||||
#if PG15_GE
|
||||
Bitmapset *data_node_live_rels = NULL;
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Skip check if the join result has been considered already.
|
||||
*/
|
||||
if (joinrel->fdw_private)
|
||||
return;
|
||||
|
||||
#ifdef ENABLE_DEAD_CODE
|
||||
/*
|
||||
* This code does not work for joins with lateral references, since those
|
||||
* must have parameterized paths, which we don't generate yet.
|
||||
*/
|
||||
if (!bms_is_empty(joinrel->lateral_relids))
|
||||
return;
|
||||
#endif
|
||||
|
||||
/* Get the hypertable from the outer relation. */
|
||||
RangeTblEntry *rte_outer = planner_rt_fetch(outerrel->relid, root);
|
||||
|
||||
/* Test that the fetched outer relation is an actual RTE and a
|
||||
* distributed hypertable. */
|
||||
if (rte_outer == NULL || !is_distributed_hypertable(rte_outer->relid))
|
||||
return;
|
||||
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
/* The outerrel has to be distributed. This condition should be always hold
|
||||
* because otherwise we should not start the planning for distributed tables
|
||||
* (see timescaledb_set_join_pathlist_hook).
|
||||
*/
|
||||
TimescaleDBPrivate *outerrel_private = outerrel->fdw_private;
|
||||
Assert(outerrel_private != NULL);
|
||||
Assert(outerrel_private->fdw_relation_info != NULL);
|
||||
#endif
|
||||
|
||||
/* We know at this point that outerrel is a distributed hypertable.
|
||||
* So, outerrel has to be partitioned. */
|
||||
Assert(outerrel->nparts > 0);
|
||||
|
||||
/* Test if inner table has a range table. */
|
||||
RangeTblEntry *rte_inner = planner_rt_fetch(innerrel->relid, root);
|
||||
if (rte_inner == NULL)
|
||||
return;
|
||||
|
||||
/* Get current partitioning of the outerrel. */
|
||||
hyper_table_rels = outerrel->part_rels;
|
||||
nhyper_table_rels = outerrel->nparts;
|
||||
|
||||
Assert(nhyper_table_rels > 0);
|
||||
Assert(hyper_table_rels != NULL);
|
||||
|
||||
/*
|
||||
* Create an PgFdwRelationInfo entry that is used to indicate
|
||||
* that the join relation is already considered, so that we won't waste
|
||||
* time in judging safety of join pushdown and adding the same paths again
|
||||
* if found safe. Once we know that this join can be pushed down, we fill
|
||||
* the entry.
|
||||
*/
|
||||
fpinfo = fdw_relinfo_create(root, joinrel, InvalidOid, InvalidOid, TS_FDW_RELINFO_JOIN);
|
||||
Assert(fpinfo->type == TS_FDW_RELINFO_JOIN);
|
||||
|
||||
/* attrs_used is only for base relations. */
|
||||
fpinfo->attrs_used = NULL;
|
||||
fpinfo->pushdown_safe = false;
|
||||
|
||||
/*
|
||||
* We need the FDW information to get retrieve the information about the
|
||||
* configured reference join tables. So, create the data structure for
|
||||
* the first server. The reference tables are the same for all servers.
|
||||
*/
|
||||
Oid server_oid = hyper_table_rels[0]->serverid;
|
||||
fpinfo->server = GetForeignServer(server_oid);
|
||||
apply_fdw_and_server_options(fpinfo);
|
||||
|
||||
if (!is_safe_to_pushdown_reftable_join(root,
|
||||
fpinfo->join_reference_tables,
|
||||
rte_inner,
|
||||
jointype))
|
||||
{
|
||||
/*
|
||||
* Reset fdw_private to allow further planner calls with different arguments
|
||||
* (e.g., swapped inner and outer relation) to replan the pushdown.
|
||||
*/
|
||||
pfree(joinrel->fdw_private);
|
||||
joinrel->fdw_private = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Join pushdown only works if the data node rels are created in
|
||||
* data_node_scan_add_node_paths during scan planning.
|
||||
*/
|
||||
if (!ts_guc_enable_per_data_node_queries)
|
||||
{
|
||||
ereport(DEBUG1,
|
||||
(errmsg("join on reference table is not considered to be pushed down because "
|
||||
"'enable_per_data_node_queries' GUC is disabled")));
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
/* The inner table can be a distributed hypertable or a plain table. Plain tables don't have
|
||||
* a TsFdwRelInfo at this point. So, it needs to be created.
|
||||
*/
|
||||
if (innerrel->fdw_private == NULL)
|
||||
fdw_relinfo_create(root, innerrel, InvalidOid, InvalidOid, TS_FDW_RELINFO_REFERENCE_TABLE);
|
||||
|
||||
/* Allow pushdown of the inner rel (the reference table) */
|
||||
TsFdwRelInfo *fpinfo_i = fdw_relinfo_get(innerrel);
|
||||
fpinfo_i->pushdown_safe = true;
|
||||
|
||||
ereport(DEBUG1, (errmsg("try to push down a join on a reference table")));
|
||||
|
||||
join_partition_rels = palloc(sizeof(RelOptInfo *) * nhyper_table_rels);
|
||||
|
||||
/* Create join paths and cost estimations per data node / join relation. */
|
||||
for (int i = 0; i < nhyper_table_rels; i++)
|
||||
{
|
||||
RelOptInfo *data_node_rel = hyper_table_rels[i];
|
||||
Assert(data_node_rel);
|
||||
|
||||
/* Adjust join target expression list */
|
||||
AppendRelInfo *appinfo = root->append_rel_array[data_node_rel->relid];
|
||||
Assert(appinfo != NULL);
|
||||
|
||||
RelOptInfo *join_partition =
|
||||
create_data_node_joinrel(root, innerrel, joinrel, data_node_rel, appinfo);
|
||||
join_partition_rels[i] = join_partition;
|
||||
fpinfo = fdw_relinfo_get(join_partition);
|
||||
|
||||
/* Create a new join path extra for this join partition */
|
||||
JoinPathExtraData *partition_extra = create_data_node_joinrel_extra(root, extra, appinfo);
|
||||
|
||||
/* Pushdown the join expressions */
|
||||
bool join_pushdown_ok = fdw_pushdown_foreign_join(root,
|
||||
join_partition,
|
||||
jointype,
|
||||
data_node_rel,
|
||||
innerrel,
|
||||
partition_extra);
|
||||
|
||||
/* Join cannot be pushed down */
|
||||
if (!join_pushdown_ok)
|
||||
{
|
||||
ereport(DEBUG1,
|
||||
(errmsg(
|
||||
"join pushdown on reference table is not supported for the used query")));
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Compute the selectivity and cost of the local_conds, so we don't have
|
||||
* to do it over again for each path. The best we can do for these
|
||||
* conditions is to estimate selectivity on the basis of local statistics.
|
||||
* The local conditions are applied after the join has been computed on
|
||||
* the remote side like quals in WHERE clause, so pass jointype as
|
||||
* JOIN_INNER.
|
||||
*/
|
||||
fpinfo->local_conds_sel =
|
||||
clauselist_selectivity(root, fpinfo->local_conds, 0, JOIN_INNER, NULL);
|
||||
cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
|
||||
|
||||
/*
|
||||
* If we are going to estimate costs locally, estimate the join clause
|
||||
* selectivity here while we have special join info.
|
||||
*/
|
||||
fpinfo->joinclause_sel =
|
||||
clauselist_selectivity(root, fpinfo->joinclauses, 0, fpinfo->jointype, extra->sjinfo);
|
||||
|
||||
/* Estimate costs for bare join relation */
|
||||
fdw_estimate_path_cost_size(root,
|
||||
join_partition,
|
||||
NIL,
|
||||
&rows,
|
||||
&width,
|
||||
&startup_cost,
|
||||
&total_cost);
|
||||
|
||||
/* Now update this information in the joinrel */
|
||||
join_partition->rows = rows;
|
||||
join_partition->reltarget->width = width;
|
||||
fpinfo->rows = rows;
|
||||
fpinfo->width = width;
|
||||
fpinfo->startup_cost = startup_cost;
|
||||
fpinfo->total_cost = total_cost;
|
||||
|
||||
/*
|
||||
* Create a new join path and add it to the joinrel which represents a
|
||||
* join between foreign tables.
|
||||
*/
|
||||
joinpath = data_node_join_path_create(root,
|
||||
join_partition,
|
||||
NULL, /* default pathtarget */
|
||||
rows,
|
||||
startup_cost,
|
||||
total_cost,
|
||||
NIL, /* no pathkeys */
|
||||
join_partition->lateral_relids,
|
||||
epq_path,
|
||||
NIL); /* no fdw_private */
|
||||
|
||||
Assert(joinpath != NULL);
|
||||
|
||||
if (!bms_is_empty(fpinfo->sca->chunk_relids))
|
||||
{
|
||||
/* Add generated path into joinrel by add_path(). */
|
||||
fdw_utils_add_path(join_partition, (Path *) joinpath);
|
||||
join_part_rels_list = lappend(join_part_rels_list, join_partition);
|
||||
|
||||
#if PG15_GE
|
||||
data_node_live_rels = bms_add_member(data_node_live_rels, i);
|
||||
#endif
|
||||
|
||||
/* Consider pathkeys for the join relation */
|
||||
fdw_add_paths_with_pathkeys_for_rel(root,
|
||||
join_partition,
|
||||
epq_path,
|
||||
data_node_join_path_create);
|
||||
}
|
||||
else
|
||||
ts_set_dummy_rel_pathlist(join_partition);
|
||||
|
||||
set_cheapest(join_partition);
|
||||
}
|
||||
|
||||
Assert(list_length(join_part_rels_list) > 0);
|
||||
|
||||
/* Must keep partitioning info consistent with the join partition paths we have created */
|
||||
joinrel->part_rels = join_partition_rels;
|
||||
joinrel->nparts = nhyper_table_rels;
|
||||
#if PG15_GE
|
||||
joinrel->live_parts = data_node_live_rels;
|
||||
#endif
|
||||
|
||||
add_paths_to_append_rel(root, joinrel, join_part_rels_list);
|
||||
|
||||
/* XXX Consider parameterized paths for the join relation */
|
||||
}
|
||||
|
||||
/*
|
||||
* Turn chunk append paths into data node append paths.
|
||||
*
|
||||
@ -1042,6 +1708,57 @@ data_node_scan_path_create(PlannerInfo *root, RelOptInfo *rel, PathTarget *targe
|
||||
return &scanpath->cpath.path;
|
||||
}
|
||||
|
||||
/*
|
||||
* data_node_join_path_create
|
||||
* Creates a path corresponding to a scan of a foreign join,
|
||||
* returning the pathnode.
|
||||
*
|
||||
* There is a usually-sane default for the pathtarget (rel->reltarget),
|
||||
* so we let a NULL for "target" select that.
|
||||
*
|
||||
* The code is based on PostgreSQL's create_foreign_join_path function
|
||||
* (version 15.1).
|
||||
*/
|
||||
static Path *
|
||||
data_node_join_path_create(PlannerInfo *root, RelOptInfo *rel, PathTarget *target, double rows,
|
||||
Cost startup_cost, Cost total_cost, List *pathkeys,
|
||||
Relids required_outer, Path *fdw_outerpath, List *private)
|
||||
{
|
||||
DataNodeScanPath *scanpath = palloc0(sizeof(DataNodeScanPath));
|
||||
|
||||
#ifdef ENABLE_DEAD_CODE
|
||||
if (rel->lateral_relids && !bms_is_subset(rel->lateral_relids, required_outer))
|
||||
required_outer = bms_union(required_outer, rel->lateral_relids);
|
||||
|
||||
/*
|
||||
* We should use get_joinrel_parampathinfo to handle parameterized paths,
|
||||
* but the API of this function doesn't support it, and existing
|
||||
* extensions aren't yet trying to build such paths anyway. For the
|
||||
* moment just throw an error if someone tries it; eventually we should
|
||||
* revisit this.
|
||||
*/
|
||||
if (!bms_is_empty(required_outer) || !bms_is_empty(rel->lateral_relids))
|
||||
elog(ERROR, "parameterized foreign joins are not supported yet");
|
||||
#endif
|
||||
|
||||
scanpath->cpath.path.type = T_CustomPath;
|
||||
scanpath->cpath.path.pathtype = T_CustomScan;
|
||||
scanpath->cpath.custom_paths = fdw_outerpath == NULL ? NIL : list_make1(fdw_outerpath);
|
||||
scanpath->cpath.methods = &data_node_scan_path_methods;
|
||||
scanpath->cpath.path.parent = rel;
|
||||
scanpath->cpath.path.pathtarget = target ? target : rel->reltarget;
|
||||
scanpath->cpath.path.param_info = NULL; /* XXX see above */
|
||||
scanpath->cpath.path.parallel_aware = false;
|
||||
scanpath->cpath.path.parallel_safe = rel->consider_parallel;
|
||||
scanpath->cpath.path.parallel_workers = 0;
|
||||
scanpath->cpath.path.rows = rows;
|
||||
scanpath->cpath.path.startup_cost = startup_cost;
|
||||
scanpath->cpath.path.total_cost = total_cost;
|
||||
scanpath->cpath.path.pathkeys = pathkeys;
|
||||
|
||||
return &scanpath->cpath.path;
|
||||
}
|
||||
|
||||
static Path *
|
||||
data_node_scan_upper_path_create(PlannerInfo *root, RelOptInfo *rel, PathTarget *target,
|
||||
double rows, Cost startup_cost, Cost total_cost, List *pathkeys,
|
||||
|
@ -17,6 +17,10 @@ extern void data_node_scan_create_upper_paths(PlannerInfo *root, UpperRelationKi
|
||||
RelOptInfo *input_rel, RelOptInfo *output_rel,
|
||||
void *extra);
|
||||
|
||||
extern void data_node_generate_pushdown_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
|
||||
RelOptInfo *outerrel, RelOptInfo *innerrel,
|
||||
JoinType jointype, JoinPathExtraData *extra);
|
||||
|
||||
/* Indexes of fields in ForeignScan->custom_private */
|
||||
typedef enum
|
||||
{
|
||||
|
@ -372,6 +372,17 @@ get_foreign_upper_paths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *
|
||||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* get_foreign_join_paths
|
||||
* Add possible ForeignPath to joinrel, if join is safe to push down.
|
||||
*/
|
||||
void
|
||||
tsl_mn_get_foreign_join_paths(PlannerInfo *root, RelOptInfo *joinrel, RelOptInfo *outerrel,
|
||||
RelOptInfo *innerrel, JoinType jointype, JoinPathExtraData *extra)
|
||||
{
|
||||
data_node_generate_pushdown_join_paths(root, joinrel, outerrel, innerrel, jointype, extra);
|
||||
}
|
||||
|
||||
static FdwRoutine timescaledb_fdw_routine = {
|
||||
.type = T_FdwRoutine,
|
||||
/* scan (mandatory) */
|
||||
|
@ -10,6 +10,10 @@
|
||||
#include <fmgr.h>
|
||||
#include <extension_constants.h>
|
||||
|
||||
extern void tsl_mn_get_foreign_join_paths(PlannerInfo *root, RelOptInfo *joinrel,
|
||||
RelOptInfo *outerrel, RelOptInfo *innerrel,
|
||||
JoinType jointype, JoinPathExtraData *extra);
|
||||
|
||||
extern Datum timescaledb_fdw_handler(PG_FUNCTION_ARGS);
|
||||
extern Datum timescaledb_fdw_validator(PG_FUNCTION_ARGS);
|
||||
|
||||
|
@ -55,7 +55,7 @@
|
||||
*
|
||||
* New options might also require tweaking merge_fdw_options().
|
||||
*/
|
||||
static void
|
||||
void
|
||||
apply_fdw_and_server_options(TsFdwRelInfo *fpinfo)
|
||||
{
|
||||
ListCell *lc;
|
||||
@ -93,6 +93,7 @@ TsFdwRelInfo *
|
||||
fdw_relinfo_get(RelOptInfo *rel)
|
||||
{
|
||||
TimescaleDBPrivate *rel_private = rel->fdw_private;
|
||||
Assert(rel_private);
|
||||
TsFdwRelInfo *fdw_relation_info = rel_private->fdw_relation_info;
|
||||
|
||||
/*
|
||||
@ -393,9 +394,13 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
|
||||
* planning.
|
||||
*/
|
||||
fpinfo = fdw_relinfo_alloc_or_get(rel);
|
||||
Assert(fpinfo->type == TS_FDW_RELINFO_UNINITIALIZED || fpinfo->type == type);
|
||||
Assert(fpinfo->type == TS_FDW_RELINFO_UNINITIALIZED || fpinfo->type == TS_FDW_RELINFO_JOIN ||
|
||||
fpinfo->type == type);
|
||||
fpinfo->type = type;
|
||||
|
||||
if (type == TS_FDW_RELINFO_UNINITIALIZED || type == TS_FDW_RELINFO_JOIN)
|
||||
return fpinfo;
|
||||
|
||||
/*
|
||||
* Set the name of relation in fpinfo, while we are constructing it here.
|
||||
* It will be used to build the string describing the join relation in
|
||||
@ -404,26 +409,24 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
|
||||
*/
|
||||
|
||||
fpinfo->relation_name = makeStringInfo();
|
||||
refname = rte->eref->aliasname;
|
||||
appendStringInfo(fpinfo->relation_name,
|
||||
"%s.%s",
|
||||
quote_identifier(get_namespace_name(get_rel_namespace(rte->relid))),
|
||||
quote_identifier(get_rel_name(rte->relid)));
|
||||
if (*refname && strcmp(refname, get_rel_name(rte->relid)) != 0)
|
||||
appendStringInfo(fpinfo->relation_name, " %s", quote_identifier(rte->eref->aliasname));
|
||||
|
||||
if (type == TS_FDW_RELINFO_HYPERTABLE)
|
||||
if (rte != NULL)
|
||||
{
|
||||
/* nothing more to do for hypertables */
|
||||
Assert(!OidIsValid(server_oid));
|
||||
|
||||
return fpinfo;
|
||||
refname = rte->eref->aliasname;
|
||||
appendStringInfo(fpinfo->relation_name,
|
||||
"%s.%s",
|
||||
quote_identifier(get_namespace_name(get_rel_namespace(rte->relid))),
|
||||
quote_identifier(get_rel_name(rte->relid)));
|
||||
if (*refname && strcmp(refname, get_rel_name(rte->relid)) != 0)
|
||||
appendStringInfo(fpinfo->relation_name, " %s", quote_identifier(rte->eref->aliasname));
|
||||
}
|
||||
/* Base foreign tables need to be pushed down always. */
|
||||
fpinfo->pushdown_safe = true;
|
||||
|
||||
/* Look up foreign-table catalog info. */
|
||||
fpinfo->server = GetForeignServer(server_oid);
|
||||
if (OidIsValid(server_oid))
|
||||
{
|
||||
fpinfo->server = GetForeignServer(server_oid);
|
||||
apply_fdw_and_server_options(fpinfo);
|
||||
}
|
||||
|
||||
/*
|
||||
* Extract user-settable option values. Note that per-table setting
|
||||
@ -434,8 +437,6 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
|
||||
fpinfo->shippable_extensions = list_make1_oid(ts_extension_get_oid());
|
||||
fpinfo->fetch_size = DEFAULT_FDW_FETCH_SIZE;
|
||||
|
||||
apply_fdw_and_server_options(fpinfo);
|
||||
|
||||
/*
|
||||
* Identify which baserestrictinfo clauses can be sent to the data
|
||||
* node and which can't.
|
||||
@ -446,6 +447,16 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
|
||||
&fpinfo->remote_conds,
|
||||
&fpinfo->local_conds);
|
||||
|
||||
if (type == TS_FDW_RELINFO_HYPERTABLE)
|
||||
{
|
||||
/* nothing more to do for hypertables */
|
||||
Assert(!OidIsValid(server_oid));
|
||||
|
||||
return fpinfo;
|
||||
}
|
||||
/* Base foreign tables need to be pushed down always. */
|
||||
fpinfo->pushdown_safe = true;
|
||||
|
||||
/*
|
||||
* Identify which attributes will need to be retrieved from the data
|
||||
* node. These include all attrs needed for joins or final output, plus
|
||||
@ -502,7 +513,7 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local
|
||||
* rels (there's no corresponding table in the system to associate
|
||||
* stats with). Instead, data node rels already have basic stats set
|
||||
* at creation time based on data-node-chunk assignment. */
|
||||
if (fpinfo->type != TS_FDW_RELINFO_HYPERTABLE_DATA_NODE)
|
||||
if (fpinfo->type != TS_FDW_RELINFO_HYPERTABLE_DATA_NODE && OidIsValid(rel->relid))
|
||||
set_baserel_size_estimates(root, rel);
|
||||
|
||||
/* Fill in basically-bogus cost estimates for use later. */
|
||||
|
@ -29,6 +29,9 @@ typedef enum
|
||||
TS_FDW_RELINFO_HYPERTABLE_DATA_NODE,
|
||||
TS_FDW_RELINFO_HYPERTABLE,
|
||||
TS_FDW_RELINFO_FOREIGN_TABLE,
|
||||
TS_FDW_RELINFO_REFERENCE_JOIN_PARTITION,
|
||||
TS_FDW_RELINFO_REFERENCE_TABLE,
|
||||
TS_FDW_RELINFO_JOIN
|
||||
} TsFdwRelInfoType;
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
@ -156,5 +159,6 @@ extern TsFdwRelInfo *fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid
|
||||
Oid local_table_id, TsFdwRelInfoType type);
|
||||
extern TsFdwRelInfo *fdw_relinfo_alloc_or_get(RelOptInfo *rel);
|
||||
extern TsFdwRelInfo *fdw_relinfo_get(RelOptInfo *rel);
|
||||
extern void apply_fdw_and_server_options(TsFdwRelInfo *fpinfo);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_FDW_RELINFO_H */
|
||||
|
@ -193,6 +193,10 @@ is_shippable(Oid objectId, Oid classId, TsFdwRelInfo *fpinfo)
|
||||
if (fpinfo->shippable_extensions == NIL)
|
||||
return false;
|
||||
|
||||
/* Give up if we don't have a remote server. */
|
||||
if (fpinfo->server == NULL)
|
||||
return false;
|
||||
|
||||
/* Initialize cache if first time through. */
|
||||
if (!ShippableCacheHash)
|
||||
InitializeShippableCache();
|
||||
|
@ -36,6 +36,7 @@
|
||||
#include "dist_util.h"
|
||||
#include "export.h"
|
||||
#include "fdw/fdw.h"
|
||||
#include "fdw/relinfo.h"
|
||||
#include "hypertable.h"
|
||||
#include "license_guc.h"
|
||||
#include "nodes/decompress_chunk/planner.h"
|
||||
@ -235,6 +236,7 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.cache_syscache_invalidate = cache_syscache_invalidate,
|
||||
.update_compressed_chunk_relstats = update_compressed_chunk_relstats,
|
||||
.health_check = ts_dist_health_check,
|
||||
.mn_get_foreign_join_paths = tsl_mn_get_foreign_join_paths,
|
||||
};
|
||||
|
||||
static void
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -106,3 +106,346 @@ SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw'
|
||||
-- Set options again
|
||||
ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD reference_tables 'metric_name, metric_name_dht, reference_table2');
|
||||
SELECT fdwoptions FROM pg_foreign_data_wrapper WHERE fdwname = 'timescaledb_fdw';
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
|
||||
\set PREFIX 'EXPLAIN (analyze, verbose, costs off, timing off, summary off)'
|
||||
|
||||
-- Analyze tables
|
||||
ANALYZE metric;
|
||||
ANALYZE metric_name;
|
||||
ANALYZE metric_name_dht;
|
||||
|
||||
-------
|
||||
-- Tests based on results
|
||||
-------
|
||||
|
||||
-- Simple join
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
||||
-- Filter
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu1';
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu1' AND name LIKE 'cpu2';
|
||||
|
||||
-- Ordering
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) order by metric_name.name ASC;
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) order by metric_name.name DESC;
|
||||
|
||||
-- Aggregations
|
||||
|
||||
SELECT SUM(metric.value) FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu1';
|
||||
|
||||
SELECT MAX(metric.value), MIN(metric.value) FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu1';
|
||||
|
||||
SELECT COUNT(*) FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu1';
|
||||
|
||||
-- Aggregations and Renaming
|
||||
|
||||
SELECT SUM(m1.value) FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name = 'cpu1';
|
||||
|
||||
SELECT MAX(m1.value), MIN(m1.value) FROM metric AS m1 LEFT JOIN metric_name AS m2 USING (id) WHERE name = 'cpu1';
|
||||
|
||||
SELECT COUNT(*) FROM metric AS ma LEFT JOIN metric_name as m2 USING (id) WHERE name = 'cpu1';
|
||||
|
||||
-- Grouping
|
||||
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' GROUP BY name;
|
||||
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' GROUP BY name ORDER BY name DESC;
|
||||
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' GROUP BY name HAVING min(value) > 60 ORDER BY name DESC;
|
||||
|
||||
-------
|
||||
-- Tests based on query plans
|
||||
-------
|
||||
|
||||
-- Tests without filter (vanilla PostgreSQL reftable)
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name ON metric.id = metric_name.id;
|
||||
|
||||
-- Tests without filter (DHT reftable)
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name_dht USING (id);
|
||||
|
||||
-- Tests with filter pushdown
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE value > 10;
|
||||
|
||||
PREPARE prepared_join_pushdown_value (int) AS
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE value > $1;
|
||||
|
||||
:PREFIX
|
||||
EXECUTE prepared_join_pushdown_value(10);
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE ts > '2022-02-02 02:02:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu2';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name_dht USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Tests with an expression that evaluates to false
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu1' AND name LIKE 'cpu2';
|
||||
|
||||
-- Tests with aliases
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric_name m2 USING (id);
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric_name m2 ON m1.id = m2.id;
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric_name m2 ON m1.id = m2.id WHERE m1.value > 10;
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric_name m2 ON m1.id = m2.id WHERE m1.value > 10 AND m2.name LIKE 'cpu%';
|
||||
|
||||
-- Tests with projections
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT m1.ts, m1.value FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT m1.id, m1.id FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT m1.id, m2.id FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT m1.*, m2.* FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03';
|
||||
|
||||
-- Ordering
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name ASC;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name DESC;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name ASC NULLS first;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name ASC NULLS last;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name DESC NULLS first;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name DESC NULLS last;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY name, value DESC;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY value, name DESC;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY value ASC, name DESC;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY value ASC NULLS last, name DESC NULLS first;
|
||||
|
||||
-- Ordering with explicit table qualification
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY value, name, metric_name.id;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric LEFT JOIN metric_name USING (id) ORDER BY value, name, metric_name.id, metric.id;
|
||||
|
||||
-- Ordering with explicit table qualification and aliases
|
||||
:PREFIX
|
||||
SELECT name, value FROM metric m1 LEFT JOIN metric_name m2 USING (id) ORDER BY value, name, m1.id, m2.id;
|
||||
|
||||
-- Grouping
|
||||
:PREFIX
|
||||
SELECT name FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' GROUP BY name;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' GROUP BY name;
|
||||
|
||||
:PREFIX
|
||||
SELECT name, max(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' AND ts BETWEEN '2022-02-02 02:02:02+03' AND '2022-02-02 02:12:02+03' GROUP BY name;
|
||||
|
||||
-- Grouping and sorting
|
||||
:PREFIX
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' and ts BETWEEN '2000-02-02 02:02:02+03' and '2022-02-02 02:12:02+03' GROUP BY name ORDER BY name DESC;
|
||||
|
||||
-- Having
|
||||
:PREFIX
|
||||
SELECT name, max(value), min(value) FROM metric LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' and ts BETWEEN '2000-02-02 02:02:02+03' and '2022-02-02 02:12:02+03' GROUP BY name having min(value) > 0 ORDER BY name DESC;
|
||||
|
||||
-- Rank
|
||||
:PREFIX
|
||||
SELECT name, value, RANK () OVER (ORDER by value) from metric join metric_name_local USING (id);
|
||||
|
||||
-- Check returned types
|
||||
SELECT pg_typeof("name"), pg_typeof("id"), pg_typeof("value"), name, id, value FROM metric
|
||||
LEFT JOIN metric_name USING (id) WHERE name LIKE 'cpu%' LIMIT 1;
|
||||
|
||||
-- Left join and reference table on the left hypertable on the right (no pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric_name LEFT JOIN metric USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Right join reference table on the left, hypertable on the right (can be converted into a left join by PostgreSQL, pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric_name RIGHT JOIN metric USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Right join hypertable on the left, reference table on the right (no pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric RIGHT JOIN metric_name USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Inner join and reference table left, hypertable on the right (pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric_name INNER JOIN metric USING (id) WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Implicit join on two tables, hypertable left, reference table right (pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1, metric_name m2 WHERE m1.id=m2.id AND name LIKE 'cpu%';
|
||||
|
||||
-- Implicit join on two tables, reference table left, hypertable right (pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric m2, metric_name m1 WHERE m1.id=m2.id AND name LIKE 'cpu%';
|
||||
|
||||
-- Implicit join on three tables (no pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1, metric_name m2, metric_name m3 WHERE m1.id=m2.id AND m2.id = m3.id AND m3.name LIKE 'cpu%';
|
||||
|
||||
-- Left join on a DHT and a subselect on a reference table (subselect can be removed, pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN (SELECT * FROM metric_name) AS sub ON metric.id=sub.id;
|
||||
|
||||
-- Left join on a DHT and a subselect with filter on a reference table (subselect can be removed, pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN (SELECT * FROM metric_name WHERE name LIKE 'cpu%') AS sub ON metric.id=sub.id;
|
||||
|
||||
-- Left join on a subselect on a DHT and a reference table (subselect can be removed, pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM (SELECT * FROM metric) as sub LEFT JOIN metric_name ON sub.id=metric_name.id WHERE name LIKE 'cpu%';
|
||||
|
||||
-- Left join and hypertable on left and right (no pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric m2 USING (id) WHERE m1.id = 2;
|
||||
|
||||
-- Left join and reference table on left and right
|
||||
:PREFIX
|
||||
SELECT * FROM metric_name m1 LEFT JOIN metric_name m2 USING (id) WHERE m1.name LIKE 'cpu%';
|
||||
|
||||
-- Only aggregation no values needs to be transferred
|
||||
:PREFIX
|
||||
SELECT count(*) FROM metric m1 LEFT JOIN metric_name m2 USING (id) WHERE m2.name LIKE 'cpu%';
|
||||
|
||||
-- Lateral joins that can be converted into regular joins
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN LATERAL (SELECT * FROM metric_name m2 WHERE m1.id = m2.id) t ON TRUE;
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN LATERAL (SELECT * FROM metric_name m2 WHERE m1.id > m2.id) t ON TRUE;
|
||||
|
||||
-- Lateral join that can not be converted and pushed down
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN LATERAL (SELECT * FROM metric_name m2 WHERE m1.id > m2.id ORDER BY m2.name LIMIT 1) t ON TRUE;
|
||||
|
||||
-- Two left joins (no pushdown)
|
||||
:PREFIX
|
||||
SELECT * FROM metric m1 LEFT JOIN metric m2 USING (id) LEFT JOIN metric_name mn USING(id);
|
||||
|
||||
-------
|
||||
-- Tests with shippable and non-shippable joins / EquivalenceClass
|
||||
-- See 'dist_param.sql' for an explanation of the used textin / int4out
|
||||
-- functions.
|
||||
-------
|
||||
|
||||
-- Shippable non-EquivalenceClass join
|
||||
:PREFIX
|
||||
SELECT name, max(value), count(*)
|
||||
FROM metric JOIN metric_name ON texteq('cpu' || textin(int4out(metric.id)), name)
|
||||
GROUP BY name
|
||||
ORDER BY name;
|
||||
|
||||
-- Non-shippable equality class join
|
||||
:PREFIX
|
||||
SELECT name, max(value), count(*)
|
||||
FROM metric JOIN metric_name ON name = concat('cpu', metric.id)
|
||||
GROUP BY name
|
||||
ORDER BY name;
|
||||
|
||||
-- Non-shippable non-EquivalenceClass join
|
||||
:PREFIX
|
||||
SELECT name, max(value), count(*)
|
||||
FROM metric JOIN metric_name ON texteq(concat('cpu', textin(int4out(metric.id))), name)
|
||||
GROUP BY name
|
||||
ORDER BY name;
|
||||
|
||||
-------
|
||||
-- Tests without enable_per_data_node_queries (no pushdown supported)
|
||||
-------
|
||||
SET timescaledb.enable_per_data_node_queries = false;
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
||||
SET timescaledb.enable_per_data_node_queries = true;
|
||||
|
||||
-------
|
||||
-- Tests with empty reftable
|
||||
-------
|
||||
RESET client_min_messages;
|
||||
TRUNCATE metric_name;
|
||||
CALL distributed_exec($$TRUNCATE metric_name;$$);
|
||||
|
||||
-- Left join
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
||||
-- Inner join
|
||||
SELECT * FROM metric JOIN metric_name USING (id);
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric JOIN metric_name USING (id);
|
||||
|
||||
-- Filter on the NULL column
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name IS NOT NULL;
|
||||
|
||||
:PREFIX
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id) WHERE name = 'cpu1';
|
||||
|
||||
-------
|
||||
-- Drop reftable on DNs and check proper error reporting
|
||||
-------
|
||||
\set ON_ERROR_STOP 0
|
||||
CALL distributed_exec($$DROP table metric_name;$$);
|
||||
|
||||
SELECT * FROM metric LEFT JOIN metric_name USING (id);
|
||||
|
Loading…
x
Reference in New Issue
Block a user