Push down partitionwise aggregates to servers

This change adds support for pushing down FULL partitionwise
aggregates to remote servers. Partial partitionwise aggregates cannot
yet be pushed down since that requires a way to tell the remote server
to compute a specific partial.

NOTE: Push-down aggregates are a PG11 only feature as it builds on top
of partitionwise aggregate push-down only available in
PG11. Therefore, a number of query-running tests now only run on PG11,
since these have different output on PG10.

To make push downs work on a per-server basis, hypertables are now
first expended into chunk append plans. This is useful to let the
planner do chunk exclusion and cost estimation of individual
chunks. The append path is then converted into a per-server plan by
grouping chunks by servers, with reduced cost because there is only
one startup cost per server instead of per chunk.

Future optimizations might consider avoiding the original per-chunk
plan computation, in order to increase planning spead.

To make use of existing PostgreSQL planning code for partitionwise
aggregates, we need to create range table entries for the server
relations even though these aren't "real" tables in the system. This
is because the planner code expects those entries to be present for
any "partitions" it is planning aggregates on (note that in
"declarative partitioning" all partitions are system tables). For this
purpose, we create range table entries for each server that points to
the root hypertable relation. This is in a sense "correct" since each
per-server relation is an identical (albeit partial) hypertable on the
remote server. The upside of pointing the server rel's range table
entry to the root hypertable is that the planner can make use of the
indexes on the hypertable for planning purposes. This leads to more
efficient remote queries when, e.g., ordering is important (i.e., we
get push down sorts for free).
This commit is contained in:
Erik Nordström 2019-04-12 16:23:35 +02:00 committed by Erik Nordström
parent e517d1d1a9
commit 2f43408eb5
22 changed files with 2414 additions and 393 deletions

View File

@ -525,7 +525,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.set_rel_pathlist_dml = NULL,
.set_rel_pathlist_query = NULL,
.set_rel_pathlist = NULL,
.hypertable_should_be_expanded = NULL,
.gapfill_marker = error_no_default_fn_pg_community,
.gapfill_int16_time_bucket = error_no_default_fn_pg_community,
.gapfill_int32_time_bucket = error_no_default_fn_pg_community,

View File

@ -57,8 +57,6 @@ typedef struct CrossModuleFunctions
void (*set_rel_pathlist_query)(PlannerInfo *, RelOptInfo *, Index, RangeTblEntry *,
Hypertable *);
void (*set_rel_pathlist)(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
bool (*hypertable_should_be_expanded)(RelOptInfo *rel, RangeTblEntry *rte, Hypertable *ht,
List *chunk_oids);
PGFunction gapfill_marker;
PGFunction gapfill_int16_time_bucket;
PGFunction gapfill_int32_time_bucket;

View File

@ -2425,3 +2425,19 @@ ts_hypertable_get_servername_list(Hypertable *ht)
return list;
}
List *
ts_hypertable_get_serverids_list(Hypertable *ht)
{
List *serverids = NIL;
ListCell *lc;
foreach (lc, ht->servers)
{
HypertableServer *server = lfirst(lc);
serverids = lappend_oid(serverids, server->foreign_server_oid);
}
return serverids;
}

View File

@ -134,6 +134,7 @@ extern TSDLLEXPORT void ts_hypertable_clone_constraints_to_compressed(Hypertable
List *constraint_list);
extern List *ts_hypertable_assign_chunk_servers(Hypertable *ht, Hypercube *cube);
extern TSDLLEXPORT List *ts_hypertable_get_servername_list(Hypertable *ht);
extern TSDLLEXPORT List *ts_hypertable_get_serverids_list(Hypertable *ht);
#define hypertable_scan(schema, table, tuple_found, data, lockmode, tuplock) \
ts_hypertable_scan_with_memory_context(schema, \

View File

@ -277,7 +277,11 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *r
}
#if PG12_GE
/* copied from allpaths.c */
/*
* set_dummy_rel_pathlist, copied from allpaths.c.
*
* This was a public function prior to PG12.
*/
static void
set_dummy_rel_pathlist(RelOptInfo *rel)
{
@ -310,8 +314,20 @@ set_dummy_rel_pathlist(RelOptInfo *rel)
*/
set_cheapest(rel);
}
#endif /* PG12_GE */
/*
* Exported version of set_dummy_rel_pathlist.
*
* Note that in PostgreSQLs prior to PG12, set_dummy_rel_pathlist was public.
*/
void
ts_set_dummy_rel_pathlist(RelOptInfo *rel)
{
return set_dummy_rel_pathlist(rel);
}
/* copied from allpaths.c */
static void
set_rel_consider_parallel(PlannerInfo *root, RelOptInfo *rel, RangeTblEntry *rte)

View File

@ -9,6 +9,7 @@
#include <postgres.h>
#include "compat.h"
#include "export.h"
#if PG12_GE
#include <nodes/pathnodes.h>
@ -19,5 +20,11 @@
extern void ts_set_rel_size(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte);
extern void ts_set_append_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti,
RangeTblEntry *rte);
#if PG12_GE
extern TSDLLEXPORT void ts_set_dummy_rel_pathlist(RelOptInfo *rel);
#else
/* Prior to PostgreSQL 12, set_dummy_rel_pathlist was public. */
#define ts_set_dummy_rel_pathlist(rel) set_dummy_rel_pathlist(rel)
#endif
#endif /* TIMESCALEDB_IMPORT_ALLPATHS_H */

View File

@ -24,8 +24,8 @@
#include "export.h"
extern void ts_make_inh_translation_list(Relation oldrelation, Relation newrelation, Index newvarno,
List **translated_vars);
extern TSDLLEXPORT void ts_make_inh_translation_list(Relation oldrelation, Relation newrelation,
Index newvarno, List **translated_vars);
extern size_t ts_estimate_hashagg_tablesize(struct Path *path,
const struct AggClauseCosts *agg_costs,
double dNumGroups);

View File

@ -1005,6 +1005,7 @@ ts_plan_expand_timebucket_annotate(PlannerInfo *root, RelOptInfo *rel)
void
ts_plan_expand_hypertable_chunks(Hypertable *ht, PlannerInfo *root, RelOptInfo *rel)
{
TimescaleDBPrivate *priv = rel->fdw_private;
RangeTblEntry *rte = rt_fetch(rel->relid, root->parse->rtable);
Oid parent_oid = rte->relid;
List *inh_oids;
@ -1055,19 +1056,15 @@ ts_plan_expand_hypertable_chunks(Hypertable *ht, PlannerInfo *root, RelOptInfo *
propagate_join_quals(root, rel, &ctx);
inh_oids = get_chunk_oids(&ctx, root, rel, ht);
if (ts_cm_functions->hypertable_should_be_expanded != NULL &&
!ts_cm_functions->hypertable_should_be_expanded(rel, rte, ht, inh_oids))
return;
oldrelation = table_open(parent_oid, NoLock);
/*
* the simple_*_array structures have already been set, we need to add the
* children to them
* children to them. We include potential server rels we might need to
* create in case of a distributed hypertable.
*/
old_rel_array_len = root->simple_rel_array_size;
root->simple_rel_array_size += list_length(inh_oids);
root->simple_rel_array_size += (list_length(inh_oids) + list_length(ht->servers));
root->simple_rel_array =
repalloc(root->simple_rel_array, root->simple_rel_array_size * sizeof(RelOptInfo *));
/* postgres expects these arrays to be 0'ed until intialized */
@ -1086,7 +1083,7 @@ ts_plan_expand_hypertable_chunks(Hypertable *ht, PlannerInfo *root, RelOptInfo *
/* Adding partition info will make PostgreSQL consider the inheritance
* children as part of a partitioned relation. This will enable
* partitionwise aggregation. */
if (enable_partitionwise_aggregate || ht->fd.replication_factor > 0)
if (enable_partitionwise_aggregate || hypertable_is_distributed(ht))
build_hypertable_partition_info(ht, root, rel, list_length(inh_oids));
#endif
@ -1161,6 +1158,37 @@ ts_plan_expand_hypertable_chunks(Hypertable *ht, PlannerInfo *root, RelOptInfo *
table_close(oldrelation, NoLock);
priv->serverids = ts_hypertable_get_serverids_list(ht);
/* For distributed hypertables, we'd like to turn per-chunk plans into
* per-server plans. We proactively add RTEs for the per-server rels here
* because the PostgreSQL planning code that we call to replan the
* per-server queries assumes there are RTEs for each rel that is considered
* a "partition."
*
* Note that each per-server RTE reuses the relid (OID) of the parent
* hypertable relation. This makes sense since each remote server's
* hypertable is an identical (albeit partial) version of the frontend's
* hypertable. The upside of this is that the planner can plan remote
* queries to take into account the indexes on the hypertable to produce
* more efficient remote queries. In contrast, chunks are foreign tables so
* they do not have indexes.
*/
foreach (l, priv->serverids)
{
RangeTblEntry *server_rte = copyObject(rte);
server_rte->inh = false;
server_rte->ctename = NULL;
server_rte->requiredPerms = 0;
server_rte->securityQuals = NIL;
parse->rtable = lappend(parse->rtable, server_rte);
rti = list_length(parse->rtable);
root->simple_rte_array[rti] = server_rte;
root->simple_rel_array[rti] = NULL;
priv->server_relids = bms_add_member(priv->server_relids, rti);
}
root->append_rel_list = list_concat(root->append_rel_list, appinfos);
#if PG11_GE

View File

@ -461,9 +461,11 @@ classify_relation(const PlannerInfo *root, const RelOptInfo *rel, Hypertable **p
extern void ts_sort_transform_optimization(PlannerInfo *root, RelOptInfo *rel);
static inline bool
should_chunk_append(PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered, int order_attno)
should_chunk_append(Hypertable *ht, PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered,
int order_attno)
{
if (root->parse->commandType != CMD_SELECT || !ts_guc_enable_chunk_append)
if (root->parse->commandType != CMD_SELECT || !ts_guc_enable_chunk_append ||
hypertable_is_distributed(ht))
return false;
switch (nodeTag(path))
@ -540,6 +542,21 @@ should_chunk_append(PlannerInfo *root, RelOptInfo *rel, Path *path, bool ordered
}
}
static inline bool
should_constraint_aware_append(Hypertable *ht, Path *path)
{
/* Constraint-aware append currently expects children that scans a real
* "relation" (e.g., not an "upper" relation). So, we do not run it on a
* distributed hypertable because the append children are typically
* per-server relations without a corresponding "real" table in the
* system. Further, per-server appends shouldn't need runtime pruning in any
* case. */
if (hypertable_is_distributed(ht))
return false;
return ts_constraint_aware_append_possible(path);
}
#if PG12_GE
static bool
@ -688,7 +705,7 @@ apply_optimizations(PlannerInfo *root, TsRelType reltype, RelOptInfo *rel, Range
{
case T_AppendPath:
case T_MergeAppendPath:
if (should_chunk_append(root, rel, *pathptr, ordered, order_attno))
if (should_chunk_append(ht, root, rel, *pathptr, ordered, order_attno))
*pathptr = ts_chunk_append_path_create(root,
rel,
ht,
@ -696,7 +713,7 @@ apply_optimizations(PlannerInfo *root, TsRelType reltype, RelOptInfo *rel, Range
false,
ordered,
nested_oids);
else if (ts_constraint_aware_append_possible(*pathptr))
else if (should_constraint_aware_append(ht, *pathptr))
*pathptr = ts_constraint_aware_append_path_create(root, ht, *pathptr);
break;
default:
@ -712,10 +729,10 @@ apply_optimizations(PlannerInfo *root, TsRelType reltype, RelOptInfo *rel, Range
{
case T_AppendPath:
case T_MergeAppendPath:
if (should_chunk_append(root, rel, *pathptr, false, 0))
if (should_chunk_append(ht, root, rel, *pathptr, false, 0))
*pathptr =
ts_chunk_append_path_create(root, rel, ht, *pathptr, true, false, NIL);
else if (ts_constraint_aware_append_possible(*pathptr))
else if (should_constraint_aware_append(ht, *pathptr))
*pathptr = ts_constraint_aware_append_path_create(root, ht, *pathptr);
break;
default:

View File

@ -27,6 +27,8 @@ typedef struct TimescaleDBPrivate
List *nested_oids;
bool compressed;
List *chunk_oids;
List *serverids;
Relids server_relids;
TsFdwRelationInfo *fdw_relation_info;
} TimescaleDBPrivate;

View File

@ -878,7 +878,6 @@ build_tlist_to_deparse(RelOptInfo *foreignrel)
*/
if (IS_UPPER_REL(foreignrel))
return fpinfo->grouped_tlist;
/*
* We require columns specified in foreignrel->reltarget->exprs and those
* required for evaluating the local conditions.
@ -1068,7 +1067,8 @@ deparseFromExpr(List *quals, deparse_expr_cxt *context)
{
StringInfo buf = context->buf;
RelOptInfo *scanrel = context->scanrel;
bool use_alias = bms_num_members(scanrel->relids) > 1;
/* Use alias if scan is on multiple rels, unless a per-server scan */
bool use_alias = bms_num_members(scanrel->relids) > 1 && context->sca == NULL;
/* For upper relations, scanrel must be either a joinrel or a baserel */
Assert(!IS_UPPER_REL(context->foreignrel) || IS_JOIN_REL(scanrel) || IS_SIMPLE_REL(scanrel));
@ -2130,8 +2130,9 @@ deparseVar(Var *node, deparse_expr_cxt *context)
int relno;
int colno;
/* Qualify columns when multiple relations are involved. */
bool qualify_col = (bms_num_members(relids) > 1);
/* Qualify columns when multiple relations are involved, unless it is a
* per-server scan. */
bool qualify_col = (bms_num_members(relids) > 1 && context->sca == NULL);
/*
* If the Var belongs to the foreign relation that is deparsed as a

View File

@ -7,6 +7,8 @@
#include <foreign/foreign.h>
#include <utils/hsearch.h>
#include <utils/fmgrprotos.h>
#include <parser/parsetree.h>
#include <nodes/bitmapset.h>
#include "server_chunk_assignment.h"
#include "chunk.h"
@ -28,53 +30,121 @@ get_remote_chunk_id_from_relid(Oid server_oid, Oid chunk_relid)
return cs->fd.server_chunk_id;
}
static List *
server_chunk_assignment_list_add_chunk(List *server_chunk_assignment_list, Oid server_oid,
Oid chunk_oid)
/*
* Find an existing server chunk assignment or initialize a new one.
*/
static ServerChunkAssignment *
get_or_create_sca(ServerChunkAssignments *scas, Oid serverid, RelOptInfo *rel)
{
ListCell *lc;
ServerChunkAssignment *sca_match = NULL;
ServerChunkAssignment *sca;
bool found;
foreach (lc, server_chunk_assignment_list)
Assert(rel == NULL || rel->serverid == serverid);
sca = hash_search(scas->assignments, &serverid, HASH_ENTER, &found);
if (!found)
{
ServerChunkAssignment *sca = lfirst(lc);
/* New entry */
memset(sca, 0, sizeof(*sca));
sca->server_oid = serverid;
}
if (sca->server_oid == server_oid)
return sca;
}
/*
* Assign the given chunk relation to a server.
*
* The chunk is assigned according to the strategy set in the
* ServerChunkAssignments state.
*/
ServerChunkAssignment *
server_chunk_assignment_assign_chunk(ServerChunkAssignments *scas, RelOptInfo *chunkrel)
{
ServerChunkAssignment *sca = get_or_create_sca(scas, chunkrel->serverid, NULL);
if (!bms_is_member(chunkrel->relid, sca->chunk_relids))
{
RangeTblEntry *rte = planner_rt_fetch(chunkrel->relid, scas->root);
Path *path = chunkrel->cheapest_total_path;
MemoryContext old = MemoryContextSwitchTo(scas->mctx);
sca->chunk_relids = bms_add_member(sca->chunk_relids, chunkrel->relid);
sca->chunk_oids = lappend_oid(sca->chunk_oids, rte->relid);
sca->remote_chunk_ids =
lappend_int(sca->remote_chunk_ids,
get_remote_chunk_id_from_relid(chunkrel->serverid, rte->relid));
sca->rows += chunkrel->rows;
if (path != NULL)
{
sca_match = sca;
break;
if (sca->startup_cost == 0.0)
{
sca->startup_cost = path->startup_cost;
sca->total_cost = path->startup_cost;
}
sca->total_cost += (path->total_cost - path->startup_cost);
}
MemoryContextSwitchTo(old);
}
if (sca_match == NULL)
{
sca_match = palloc0(sizeof(*sca_match));
sca_match->server_oid = server_oid;
server_chunk_assignment_list = lappend(server_chunk_assignment_list, sca_match);
}
sca_match->chunk_oids = lappend_oid(sca_match->chunk_oids, chunk_oid);
sca_match->remote_chunk_ids =
lappend_int(sca_match->remote_chunk_ids,
get_remote_chunk_id_from_relid(server_oid, chunk_oid));
return server_chunk_assignment_list;
return sca;
}
List *
server_chunk_assignment_by_using_attached_server(List *chunk_oids)
/*
* Initialize a new chunk assignment state with a specific assignment strategy.
*/
void
server_chunk_assignments_init(ServerChunkAssignments *scas, ServerChunkAssignmentStrategy strategy,
PlannerInfo *root, unsigned int nrels_hint)
{
ListCell *lc;
List *server_chunk_assignment_list = NIL;
HASHCTL hctl = {
.keysize = sizeof(Oid),
.entrysize = sizeof(ServerChunkAssignment),
.hcxt = CurrentMemoryContext,
};
foreach (lc, chunk_oids)
{
Oid chunk_oid = lfirst_oid(lc);
ForeignTable *ftable = GetForeignTable(chunk_oid);
server_chunk_assignment_list =
server_chunk_assignment_list_add_chunk(server_chunk_assignment_list,
ftable->serverid,
chunk_oid);
}
return server_chunk_assignment_list;
scas->strategy = strategy;
scas->root = root;
scas->mctx = hctl.hcxt;
scas->assignments = hash_create("server chunk assignments",
nrels_hint,
&hctl,
HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
}
/*
* Assign chunks to servers.
*
* Each chunk in the chunkrels array is a assigned a server using the strategy
* set in the ServerChunkAssignments state.
*/
ServerChunkAssignments *
server_chunk_assignment_assign_chunks(ServerChunkAssignments *scas, RelOptInfo **chunkrels,
unsigned int nrels)
{
unsigned int i;
Assert(scas->assignments != NULL && scas->root != NULL);
for (i = 0; i < nrels; i++)
{
RelOptInfo *chunkrel = chunkrels[i];
Assert(IS_SIMPLE_REL(chunkrel) && chunkrel->fdw_private != NULL);
server_chunk_assignment_assign_chunk(scas, chunkrel);
}
return scas;
}
/*
* Get the server assignment for the given relation (chunk).
*/
ServerChunkAssignment *
server_chunk_assignment_get_or_create(ServerChunkAssignments *scas, RelOptInfo *rel)
{
return get_or_create_sca(scas, rel->serverid, rel);
}

View File

@ -5,9 +5,18 @@
*/
#ifndef TIMESCALEDB_TSL_SERVER_CHUNK_ASSIGNMENT
#define TIMESCALEDB_TSL_SERVER_CHUNK_ASSIGNMENT
#include <postgres.h>
#include <nodes/pg_list.h>
#include "compat.h"
#if PG12_GE
#include <nodes/pathnodes.h>
#else
#include <nodes/relation.h>
#endif
/*
* server-chunk assignments map chunks to the servers that will be responsible
* for handling those chunks. For replicated chunks several such strategies
@ -19,14 +28,43 @@
typedef struct ServerChunkAssignment
{
Oid server_oid;
double rows;
Cost startup_cost;
Cost total_cost;
Relids chunk_relids;
List *chunk_oids;
List *remote_chunk_ids;
} ServerChunkAssignment;
/*
* Use the attached foreign server on a chunk to decide which server is used for a chunk.
* Returns a list of ServerChunkAssignments.
* Only "attached server" strategy is supported at this time. This strategy
* picks the server that is associated with a chunk's foreign table
*/
extern List *server_chunk_assignment_by_using_attached_server(List *chunk_oids);
typedef enum ServerChunkAssignmentStrategy
{
SCA_STRATEGY_ATTACHED_SERVER,
} ServerChunkAssignmentStrategy;
typedef struct ServerChunkAssignments
{
ServerChunkAssignmentStrategy strategy;
PlannerInfo *root;
HTAB *assignments;
MemoryContext mctx;
} ServerChunkAssignments;
extern ServerChunkAssignment *server_chunk_assignment_assign_chunk(ServerChunkAssignments *scas,
RelOptInfo *chunkrel);
extern ServerChunkAssignments *server_chunk_assignment_assign_chunks(ServerChunkAssignments *scas,
RelOptInfo **chunkrels,
unsigned int nrels);
extern ServerChunkAssignment *server_chunk_assignment_get_or_create(ServerChunkAssignments *scas,
RelOptInfo *rel);
extern void server_chunk_assignments_init(ServerChunkAssignments *scas,
ServerChunkAssignmentStrategy strategy, PlannerInfo *root,
unsigned int nrels_hint);
#endif /* TIMESCALEDB_TSL_SERVER_CHUNK_ASSIGNMENT */

View File

@ -34,6 +34,7 @@
#include <utils/selfuncs.h>
#include <commands/defrem.h>
#include <commands/explain.h>
#include <rewrite/rewriteManip.h>
#include <miscadmin.h>
#include <libpq-fe.h>
@ -56,6 +57,7 @@
#include <remote/dist_txn.h>
#include <remote/async.h>
#include <remote/stmt_params.h>
#include <import/allpaths.h>
#include <compat.h>
#include <planner.h>
@ -637,11 +639,28 @@ fdw_relation_info_get(RelOptInfo *baserel)
return (TsFdwRelationInfo *) rel_private->fdw_relation_info;
}
static void
static TsFdwRelationInfo *
fdw_relation_info_alloc(RelOptInfo *baserel)
{
TimescaleDBPrivate *rel_private;
TsFdwRelationInfo *fpinfo;
if (NULL == baserel->fdw_private)
baserel->fdw_private = palloc0(sizeof(*rel_private));
rel_private = baserel->fdw_private;
fpinfo = (TsFdwRelationInfo *) palloc0(sizeof(*fpinfo));
rel_private->fdw_relation_info = (void *) fpinfo;
fpinfo->type = TS_FDW_RELATION_INFO_FOREIGN_TABLE;
return fpinfo;
}
static TsFdwRelationInfo *
fdw_relation_info_create(PlannerInfo *root, RelOptInfo *baserel, Oid server_oid, Oid local_table_id,
TsFdwRelationInfoType type)
{
TimescaleDBPrivate *rel_private = baserel->fdw_private;
TsFdwRelationInfo *fpinfo;
ListCell *lc;
RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
@ -653,20 +672,15 @@ fdw_relation_info_create(PlannerInfo *root, RelOptInfo *baserel, Oid server_oid,
* We use TsFdwRelationInfo to pass various information to subsequent
* functions.
*/
if (baserel->fdw_private == NULL)
baserel->fdw_private = palloc0(sizeof(*rel_private));
rel_private = baserel->fdw_private;
fpinfo = (TsFdwRelationInfo *) palloc0(sizeof(*fpinfo));
rel_private->fdw_relation_info = (void *) fpinfo;
fpinfo = fdw_relation_info_alloc(baserel);
fpinfo->type = type;
if (type == TS_FDW_RELATION_INFO_HYPERTABLE)
{
/* nothing more to do for hypertables */
Assert(!OidIsValid(server_oid));
return;
return fpinfo;
}
/* Base foreign tables need to be pushed down always. */
fpinfo->pushdown_safe = true;
@ -828,7 +842,8 @@ fdw_relation_info_create(PlannerInfo *root, RelOptInfo *baserel, Oid server_oid,
fpinfo->lower_subquery_rels = NULL;
/* Set the relation index. */
fpinfo->relation_index = baserel->relid;
return;
return fpinfo;
}
/* This creates the fdw_relation_info object for hypertables and foreign table type objects. */
@ -1124,84 +1139,287 @@ add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_pa
}
}
/*
* This function adds hypertable-server paths for hypertables when the per-server queries
* optimization is enabled. In this case, the base hypertable will not have its chunk's expanded and
* so will be a plain relation. Here, we determine server-chunk assignment and create an append
* relation with children corresponding to each hypertable-server. In the future, different
* assignments can create their own append paths and have the cost optimizer pick the best one.
*/
static void
add_remote_hypertable_paths(PlannerInfo *root, RelOptInfo *hypertable_rel, Oid hypertable_relid)
static AppendRelInfo *
create_append_rel_info(PlannerInfo *root, Index childrelid, Index parentrelid)
{
RangeTblEntry *parent_rte = planner_rt_fetch(parentrelid, root);
Relation relation = table_open(parent_rte->relid, NoLock);
AppendRelInfo *appinfo;
appinfo = makeNode(AppendRelInfo);
appinfo->parent_relid = parentrelid;
appinfo->child_relid = childrelid;
appinfo->parent_reltype = relation->rd_rel->reltype;
appinfo->child_reltype = relation->rd_rel->reltype;
ts_make_inh_translation_list(relation, relation, childrelid, &appinfo->translated_vars);
appinfo->parent_reloid = parent_rte->relid;
table_close(relation, NoLock);
return appinfo;
}
static void
add_append_rel_info_to_plannerinfo(PlannerInfo *root, Index childrelid, Index parentrelid)
{
AppendRelInfo *appinfo = create_append_rel_info(root, childrelid, parentrelid);
root->append_rel_list = lappend(root->append_rel_list, appinfo);
#if PG11_GE
root->append_rel_array[childrelid] = appinfo;
#endif
}
/*
* Build a new RelOptInfo representing a server.
*
* Note that the relid index should point to the corresponding range table
* entry (RTE) we created for the server rel when expanding the
* hypertable. Each such RTE's relid (OID) refers to the hypertable's root
* table. This has the upside that the planner can use the hypertable's
* indexes to plan remote queries more efficiently. In contrast, chunks are
* foreign tables and they cannot have indexes.
*/
static RelOptInfo *
build_server_rel(PlannerInfo *root, Index relid, Oid serverid, RelOptInfo *parent)
{
RelOptInfo *rel = build_simple_rel(root, relid, parent);
/* Use relevant exprs and restrictinfos from the parent rel */
rel->reltarget->exprs = copyObject(parent->reltarget->exprs);
rel->baserestrictinfo = parent->baserestrictinfo;
rel->baserestrictcost = parent->baserestrictcost;
rel->baserestrict_min_security = parent->baserestrict_min_security;
rel->lateral_vars = parent->lateral_vars;
rel->lateral_referencers = parent->lateral_referencers;
rel->lateral_relids = parent->lateral_relids;
rel->fdwroutine = GetFdwRoutineByServerId(serverid);
rel->serverid = serverid;
/* Add the hypertable relid to the relids set. This is mostly to beautify
* the explain output by referencing the same table when projecting */
rel->relids = bms_add_member(rel->relids, parent->relid);
Assert(rel->fdwroutine == &timescaledb_fdw_routine);
return rel;
}
/*
* Build new RelOptInfos for each server.
*
* Each server rel will point to the root hypertable table, which is
* conceptually correct since we query the identical (partial) hypertables on
* the servers.
*/
static RelOptInfo **
build_server_part_rels(PlannerInfo *root, RelOptInfo *hypertable_rel, int *nparts)
{
TimescaleDBPrivate *priv = hypertable_rel->fdw_private;
RangeTblEntry *hypertable_rte = planner_rt_fetch(hypertable_rel->relid, root);
/* Update the partitioning to reflect the new per-server plan */
RelOptInfo **part_rels = palloc(sizeof(RelOptInfo *) * list_length(priv->serverids));
ListCell *lc;
List *subpaths = NIL;
AppendPath *appendpath;
List *server_chunk_assignment_list;
TimescaleDBPrivate *rel_info = hypertable_rel->fdw_private;
List *chunk_oids = rel_info->chunk_oids;
int n = 0;
int i;
/* nothing to do for empty hypertables */
if (chunk_oids == NIL)
return;
Assert(list_length(priv->serverids) == bms_num_members(priv->server_relids));
i = -1;
/* remove the parent if it is in there */
chunk_oids = list_delete_oid(chunk_oids, hypertable_relid);
server_chunk_assignment_list = server_chunk_assignment_by_using_attached_server(chunk_oids);
/* Each per-server rel actually references the real "root" hypertable rel
* in their relids set. This is to, among other things, have per-server
* rels reference the root hypertable in, e.g., EXPLAIN output. Howevever,
* find_appinfos_by_relids() (prepunion.c) assumes that all relids in that
* set are also in the append_rel_array, so we need to also add the root
* hypertable rel to that array. */
add_append_rel_info_to_plannerinfo(root, hypertable_rel->relid, hypertable_rel->relid);
/* remove all existing paths, they are not valid because the contain the un-expanded parent */
hypertable_rel->pathlist = NIL;
foreach (lc, server_chunk_assignment_list)
foreach (lc, priv->serverids)
{
ForeignPath *subpath;
RelOptInfo *hypertable_server_rel;
TsFdwRelationInfo *hypertable_server_fpinfo;
ServerChunkAssignment *sca = lfirst(lc);
Oid serverid = lfirst_oid(lc);
RelOptInfo *server_rel;
Assert(list_length(sca->chunk_oids) == list_length(sca->remote_chunk_ids));
i = bms_next_member(priv->server_relids, i);
hypertable_server_rel = palloc(sizeof(RelOptInfo));
memcpy(hypertable_server_rel, hypertable_rel, sizeof(*hypertable_server_rel));
hypertable_server_rel->serverid = sca->server_oid;
Assert(i > 0);
/* set the fdwroutine, which should always be the timescale_fdw routine because
* hypertables should always be associated with the timescale_fdw */
hypertable_server_rel->fdwroutine = GetFdwRoutineByServerId(sca->server_oid);
Assert(hypertable_server_rel->fdwroutine == &timescaledb_fdw_routine);
/* The planner expects an AppendRelInfo for any part_rels. Needs to be
* added prior to creating the rel because build_simple_rel will
* invoke our planner hooks that classify relations using this
* information. */
add_append_rel_info_to_plannerinfo(root, i, hypertable_rel->relid);
server_rel = build_server_rel(root, i, serverid, hypertable_rel);
hypertable_server_rel->fdw_private = NULL;
fdw_relation_info_create(root,
hypertable_server_rel,
sca->server_oid,
hypertable_relid,
server_rel,
serverid,
hypertable_rte->relid,
TS_FDW_RELATION_INFO_HYPERTABLE_SERVER);
hypertable_server_fpinfo = fdw_relation_info_get(hypertable_server_rel);
hypertable_server_fpinfo->sca = sca;
subpath = create_foreignscan_path(root,
hypertable_server_rel,
NULL, /* default pathtarget */
hypertable_server_fpinfo->rows,
hypertable_server_fpinfo->startup_cost,
hypertable_server_fpinfo->total_cost,
NIL, /* no pathkeys */
NULL, /* no outer rel either */
NULL, /* no extra plan */
NIL); /* no fdw_private list */
subpaths = lappend(subpaths, subpath);
part_rels[n++] = server_rel;
}
appendpath = create_append_path_compat(root,
hypertable_rel,
subpaths,
NIL,
NULL,
NULL,
0,
false,
NIL,
-1);
add_path(hypertable_rel, (Path *) appendpath);
if (nparts != NULL)
*nparts = n;
return part_rels;
}
/*
* Get all chunk rels in an array.
*
* This provides backwards compatibility for PG10, which does not maintain a
* part_rels array for partitioned rels.
*/
static RelOptInfo **
get_chunk_part_rels(RelOptInfo *hypertable_rel, int *nrels)
{
RelOptInfo **part_rels = NULL;
int nparts = 0;
#if PG10
ListCell *lc;
List *chunk_paths = NIL;
foreach (lc, hypertable_rel->pathlist)
{
Path *path = lfirst(lc);
switch (path->type)
{
case T_AppendPath:
chunk_paths = ((AppendPath *) path)->subpaths;
break;
case T_MergeAppendPath:
chunk_paths = ((MergeAppendPath *) path)->subpaths;
break;
default:
break;
}
if (chunk_paths != NIL)
{
ListCell *lc_path;
part_rels = palloc(sizeof(RelOptInfo *) * list_length(chunk_paths));
foreach (lc_path, chunk_paths)
{
Path *path = lfirst(lc_path);
part_rels[nparts++] = path->parent;
}
break;
}
}
#else
part_rels = hypertable_rel->part_rels;
nparts = hypertable_rel->nparts;
#endif
if (nrels != NULL)
*nrels = nparts;
return part_rels;
}
#if PG10
/*
* Naive implementation of PG11-only add_paths_to_append_rel().
*
* This version only produces a basic append plan, while the PG11 equivalent
* produces append, merge append, partial and parallel append plans.
*/
static void
add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *parent, List *childrels)
{
AppendPath *appendpath;
List *subpaths = NIL;
ListCell *lc;
foreach (lc, childrels)
{
RelOptInfo *rel = lfirst(lc);
subpaths = lappend(subpaths, rel->cheapest_total_path);
}
appendpath = create_append_path_compat(root, parent, subpaths, NIL, NULL, 0, false, NIL, -1);
add_path(parent, (Path *) appendpath);
}
#endif /* PG10 */
/*
* Turn chunk append paths into server append paths.
*
* By default, a hypertable produces append plans where each child is a chunk
* to be scanned. This function computes alternative append plans where each
* child corresponds to a server.
*
* In the future, additional assignment algorithms can create their own
* append paths and have the cost optimizer pick the best one.
*/
static void
add_per_server_paths(PlannerInfo *root, RelOptInfo *hypertable_rel, Oid hypertable_relid)
{
RelOptInfo **server_rels;
RelOptInfo **chunk_rels;
int nserver_rels;
int nchunk_rels;
List *server_rels_list = NIL;
ServerChunkAssignments scas;
int i;
/* Get all chunks rels */
chunk_rels = get_chunk_part_rels(hypertable_rel, &nchunk_rels);
if (nchunk_rels <= 0)
return;
/* Create the RelOptInfo for each server */
server_rels = build_server_part_rels(root, hypertable_rel, &nserver_rels);
Assert(nserver_rels > 0);
server_chunk_assignments_init(&scas, SCA_STRATEGY_ATTACHED_SERVER, root, nserver_rels);
/* Assign chunks to servers */
server_chunk_assignment_assign_chunks(&scas, chunk_rels, nchunk_rels);
/* Create paths for each server rel and set server chunk assignments */
for (i = 0; i < nserver_rels; i++)
{
RelOptInfo *server_rel = server_rels[i];
RangeTblEntry *rte = planner_rt_fetch(server_rel->relid, root);
TsFdwRelationInfo *fpinfo = fdw_relation_info_get(server_rel);
ServerChunkAssignment *sca = server_chunk_assignment_get_or_create(&scas, server_rel);
fpinfo->sca = sca;
if (!bms_is_empty(sca->chunk_relids))
{
server_rel->rows = sca->rows;
fpinfo->rows = sca->rows;
fpinfo->startup_cost = sca->startup_cost;
fpinfo->total_cost = sca->total_cost;
server_rel->fdwroutine->GetForeignPaths(root, server_rel, rte->relid);
server_rels_list = lappend(server_rels_list, server_rel);
}
else
ts_set_dummy_rel_pathlist(server_rel);
set_cheapest(server_rel);
}
hypertable_rel->pathlist = NIL;
#if !PG10
/* Must keep partitioning info consistent with the append paths we create */
hypertable_rel->part_rels = server_rels;
hypertable_rel->nparts = nserver_rels;
#endif
Assert(list_length(server_rels_list) > 0);
add_paths_to_append_rel(root, hypertable_rel, server_rels_list);
}
static void
@ -1211,7 +1429,11 @@ get_foreign_paths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid)
ForeignPath *path;
if (fpinfo->type == TS_FDW_RELATION_INFO_HYPERTABLE)
return add_remote_hypertable_paths(root, baserel, foreigntableid);
{
if (ts_guc_enable_per_server_queries)
add_per_server_paths(root, baserel, foreigntableid);
return;
}
if (baserel->reloptkind == RELOPT_JOINREL)
ereport(ERROR,
@ -1275,9 +1497,9 @@ get_foreign_plan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid,
* to check for remote-safety.
*
* Note: the join clauses we see here should be the exact same ones
* previously examined by postgresGetForeignPaths. Possibly it'd be
* worth passing forward the classification work done then, rather
* than repeating it here.
* previously examined by GetForeignPaths. Possibly it'd be worth
* passing forward the classification work done then, rather than
* repeating it here.
*
* This code must match "extract_actual_clauses(scan_clauses, false)"
* except for the additional decision about remote versus local
@ -1319,6 +1541,7 @@ get_foreign_plan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid,
* there can be several associated with the chunk).
*/
scan_relid = 0;
/* since the relation is 0, have to build the tlist explicitly */
fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
}
@ -1353,8 +1576,8 @@ get_foreign_plan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid,
/*
* We leave fdw_recheck_quals empty in this case, since we never need
* to apply EPQ recheck clauses. In the case of a joinrel, EPQ
* recheck is handled elsewhere --- see postgresGetForeignJoinPaths().
* If we're planning an upperrel (ie, remote grouping or aggregation)
* recheck is handled elsewhere --- see GetForeignJoinPaths(). If
* we're planning an upperrel (ie, remote grouping or aggregation)
* then there's no EPQ to do because SELECT FOR UPDATE wouldn't be
* allowed, and indeed we *can't* put the remote clauses into
* fdw_recheck_quals because the unaggregated Vars won't be available
@ -1381,7 +1604,7 @@ get_foreign_plan(PlannerInfo *root, RelOptInfo *foreignrel, Oid foreigntableid,
&params_list,
fpinfo->sca);
/* Remember remote_exprs for possible use by postgresPlanDirectModify */
/* Remember remote_exprs for possible use by PlanDirectModify */
fpinfo->final_remote_exprs = remote_exprs;
/*
@ -1548,8 +1771,8 @@ prepare_query_params(PlanState *node, List *fdw_exprs, int num_params, FmgrInfo
* practice, we expect that all these expressions will be just Params, so
* we could possibly do something more efficient than using the full
* expression-eval machinery for this. But probably there would be little
* benefit, and it'd require postgres_fdw to know more than is desirable
* about Param evaluation.)
* benefit, and it'd require the foreign data wrapper to know more than is
* desirable about Param evaluation.)
*/
*param_exprs = ExecInitExprList(fdw_exprs, node);
@ -2141,7 +2364,7 @@ plan_foreign_modify(PlannerInfo *root, ModifyTable *plan, Index result_relation,
* Core code already has some lock on each rel being planned, so we can
* use NoLock here.
*/
rel = heap_open(rte->relid, NoLock);
rel = table_open(rte->relid, NoLock);
/*
* Construct the SQL command string
@ -2706,6 +2929,7 @@ analyze_foreign_table(Relation relation, AcquireSampleRowsFunc *func, BlockNumbe
return false;
}
#if !PG10
/*
* Merge FDW options from input relations into a new set of options for a join
* or an upper rel.
@ -2716,16 +2940,18 @@ analyze_foreign_table(Relation relation, AcquireSampleRowsFunc *func, BlockNumbe
* expected to NULL.
*/
static void
merge_fdw_options(TsFdwRelationInfo *fpinfo,
const TsFdwRelationInfo *fpinfo_o,
merge_fdw_options(TsFdwRelationInfo *fpinfo, const TsFdwRelationInfo *fpinfo_o,
const TsFdwRelationInfo *fpinfo_i)
{
/* We must always have fpinfo_o. */
Assert(fpinfo_o);
/* fpinfo_i may be NULL, but if present the servers must both match. */
Assert(!fpinfo_i ||
fpinfo_i->server->serverid == fpinfo_o->server->serverid);
Assert(!fpinfo_i || fpinfo_i->server->serverid == fpinfo_o->server->serverid);
/* Currently, we don't support JOINs, so Asserting fpinfo_i is NULL here
* in the meantime. */
Assert(fpinfo_i == NULL);
/*
* Copy the server specific FDW options. (For a join, both relations come
@ -2737,28 +2963,6 @@ merge_fdw_options(TsFdwRelationInfo *fpinfo,
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
{
/*
* We'll prefer to use remote estimates for this join if any table
* from either side of the join is using remote estimates. This is
* most likely going to be preferred since they're already willing to
* pay the price of a round trip to get the remote EXPLAIN. In any
* case it's not entirely clear how we might otherwise handle this
* best.
*/
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
fpinfo_i->use_remote_estimate;
/*
* Set fetch size to maximum of the joining sides, since we are
* expecting the rows returned by the join to be proportional to the
* relation sizes.
*/
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
}
}
/*
@ -2767,24 +2971,23 @@ merge_fdw_options(TsFdwRelationInfo *fpinfo,
* this function to TsFdwRelationInfo of the input relation.
*/
static bool
foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
Node *havingQual)
foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel, Node *havingQual)
{
Query *query = root->parse;
TsFdwRelationInfo *fpinfo = (TsFdwRelationInfo *) grouped_rel->fdw_private;
Query *query = root->parse;
TsFdwRelationInfo *fpinfo = fdw_relation_info_get(grouped_rel);
PathTarget *grouping_target = grouped_rel->reltarget;
TsFdwRelationInfo *ofpinfo;
List *aggvars;
ListCell *lc;
int i;
List *tlist = NIL;
List *aggvars;
ListCell *lc;
int i;
List *tlist = NIL;
/* We currently don't support pushing Grouping Sets. */
if (query->groupingSets)
return false;
/* Cannot have grouping sets since that wouldn't be a distinct coverage of
* all partition keys */
Assert(query->groupingSets == NIL);
/* Get the fpinfo of the underlying scan relation. */
ofpinfo = (TsFdwRelationInfo *) fpinfo->outerrel->fdw_private;
ofpinfo = (TsFdwRelationInfo *) fdw_relation_info_get(fpinfo->outerrel);
/*
* If underlying scan relation has any local conditions, those conditions
@ -2802,11 +3005,11 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
* expressions into target list which will be passed to foreign server.
*/
i = 0;
foreach(lc, grouping_target->exprs)
foreach (lc, grouping_target->exprs)
{
Expr *expr = (Expr *) lfirst(lc);
Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
ListCell *l;
Expr *expr = (Expr *) lfirst(lc);
Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
ListCell *l;
/* Check whether this expression is part of GROUP BY clause */
if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
@ -2845,8 +3048,7 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
else
{
/* Not pushable as a whole; extract its Vars and aggregates */
aggvars = pull_var_clause((Node *) expr,
PVC_INCLUDE_AGGREGATES);
aggvars = pull_var_clause((Node *) expr, PVC_INCLUDE_AGGREGATES);
/*
* If any aggregate expression is not shippable, then we
@ -2865,9 +3067,9 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
* GROUP BY column would cause the foreign server to complain
* that the shipped query is invalid.
*/
foreach(l, aggvars)
foreach (l, aggvars)
{
Expr *expr = (Expr *) lfirst(l);
Expr *expr = (Expr *) lfirst(l);
if (IsA(expr, Aggref))
tlist = add_to_flat_tlist(tlist, list_make1(expr));
@ -2884,11 +3086,11 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
*/
if (havingQual)
{
ListCell *lc;
ListCell *lc;
foreach(lc, (List *) havingQual)
foreach (lc, (List *) havingQual)
{
Expr *expr = (Expr *) lfirst(lc);
Expr *expr = (Expr *) lfirst(lc);
RestrictInfo *rinfo;
/*
@ -2917,21 +3119,20 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
*/
if (fpinfo->local_conds)
{
List *aggvars = NIL;
ListCell *lc;
List *aggvars = NIL;
ListCell *lc;
foreach(lc, fpinfo->local_conds)
foreach (lc, fpinfo->local_conds)
{
RestrictInfo *rinfo = lfirst_node(RestrictInfo, lc);
aggvars = list_concat(aggvars,
pull_var_clause((Node *) rinfo->clause,
PVC_INCLUDE_AGGREGATES));
pull_var_clause((Node *) rinfo->clause, PVC_INCLUDE_AGGREGATES));
}
foreach(lc, aggvars)
foreach (lc, aggvars)
{
Expr *expr = (Expr *) lfirst(lc);
Expr *expr = (Expr *) lfirst(lc);
/*
* If aggregates within local conditions are not safe to push
@ -2968,12 +3169,33 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
* output of corresponding ForeignScan.
*/
fpinfo->relation_name = makeStringInfo();
appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
ofpinfo->relation_name->data);
appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)", ofpinfo->relation_name->data);
return true;
}
#if PG12_LT
#define create_foreign_upper_path(root, \
rel, \
target, \
rows, \
startup_cost, \
total_cost, \
pathkeys, \
fdw_outerpath, \
fdw_private) \
create_foreignscan_path(root, \
rel, \
(rel)->reltarget, \
rows, \
startup_cost, \
total_cost, \
pathkeys, \
(rel)->lateral_relids, \
fdw_outerpath, \
fdw_private);
#endif
/*
* add_foreign_grouping_paths
* Add foreign path for grouping and/or aggregation.
@ -2982,22 +3204,20 @@ foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel,
* given grouped_rel.
*/
static void
add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
RelOptInfo *grouped_rel,
add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, RelOptInfo *grouped_rel,
GroupPathExtraData *extra)
{
Query *parse = root->parse;
TsFdwRelationInfo *ifpinfo = input_rel->fdw_private;
TsFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
Query *parse = root->parse;
TsFdwRelationInfo *ifpinfo = fdw_relation_info_get(input_rel);
TsFdwRelationInfo *fpinfo = fdw_relation_info_get(grouped_rel);
ForeignPath *grouppath;
double rows;
int width;
Cost startup_cost;
Cost total_cost;
double rows;
int width;
Cost startup_cost;
Cost total_cost;
/* Nothing to be done, if there is no grouping or aggregation required. */
if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
!root->hasHavingQual)
if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && !root->hasHavingQual)
return;
Assert(extra->patype == PARTITIONWISE_AGGREGATE_NONE ||
@ -3013,6 +3233,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
fpinfo->table = ifpinfo->table;
fpinfo->server = ifpinfo->server;
fpinfo->user = ifpinfo->user;
fpinfo->sca = ifpinfo->sca;
merge_fdw_options(fpinfo, ifpinfo, NULL);
/*
@ -3025,8 +3246,7 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
return;
/* Estimate the cost of push down */
estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
&width, &startup_cost, &total_cost);
estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows, &width, &startup_cost, &total_cost);
/* Now update this information in the fpinfo */
fpinfo->rows = rows;
@ -3035,54 +3255,50 @@ add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
fpinfo->total_cost = total_cost;
/* Create and add foreign path to the grouping relation. */
grouppath = create_foreignscan_path(root,
grouped_rel,
grouped_rel->reltarget,
rows,
startup_cost,
total_cost,
NIL, /* no pathkeys */
grouped_rel->lateral_relids,
NULL,
NIL); /* no fdw_private */
grouppath = create_foreign_upper_path(root,
grouped_rel,
grouped_rel->reltarget,
rows,
startup_cost,
total_cost,
NIL, /* no pathkeys */
NULL,
NIL); /* no fdw_private */
/* Add generated path into grouped_rel by add_path(). */
add_path(grouped_rel, (Path *) grouppath);
}
/*
* postgresGetForeignUpperPaths
* get_foreign_upper_paths
* Add paths for post-join operations like aggregation, grouping etc. if
* corresponding operations are safe to push down.
*
* Right now, we only support aggregate, grouping and having clause pushdown.
*/
static void
get_foreign_upper_paths(PlannerInfo *root, UpperRelationKind stage,
RelOptInfo *input_rel, RelOptInfo *output_rel,
void *extra)
get_foreign_upper_paths(PlannerInfo *root, UpperRelationKind stage, RelOptInfo *input_rel,
RelOptInfo *output_rel, void *extra)
{
TsFdwRelationInfo *fpinfo;
TsFdwRelationInfo *fpinfo = input_rel->fdw_private ? fdw_relation_info_get(input_rel) : NULL;
/*
* If input rel is not safe to pushdown, then simply return as we cannot
* perform any post-join operations on the foreign server.
*/
if (!input_rel->fdw_private ||
!((TsFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
if (NULL == fpinfo || !fpinfo->pushdown_safe)
return;
/* Ignore stages we don't support; and skip any duplicate calls. */
if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
return;
fpinfo = (TsFdwRelationInfo *) palloc0(sizeof(TsFdwRelationInfo));
fpinfo = fdw_relation_info_alloc(output_rel);
fpinfo->pushdown_safe = false;
output_rel->fdw_private = fpinfo;
add_foreign_grouping_paths(root, input_rel, output_rel,
(GroupPathExtraData *) extra);
add_foreign_grouping_paths(root, input_rel, output_rel, (GroupPathExtraData *) extra);
}
#endif /* !PG10 */
static FdwRoutine timescaledb_fdw_routine = {
.type = T_FdwRoutine,
@ -3094,7 +3310,9 @@ static FdwRoutine timescaledb_fdw_routine = {
.IterateForeignScan = iterate_foreign_scan,
.EndForeignScan = end_foreign_scan,
.ReScanForeignScan = rescan_foreign_scan,
#if !PG10
.GetForeignUpperPaths = get_foreign_upper_paths,
#endif
/* update */
.IsForeignRelUpdatable = is_foreign_rel_updatable,
.PlanForeignModify = plan_foreign_modify,

View File

@ -230,7 +230,6 @@ CrossModuleFunctions tsl_cm_functions = {
.remote_txn_id_in = error_not_supported_default_fn,
.remote_txn_id_out = error_not_supported_default_fn,
.set_rel_pathlist = NULL,
.hypertable_should_be_expanded = NULL,
.server_dispatch_path_create = error_server_dispatch_path_create_not_supported,
.distributed_copy = error_distributed_copy_not_supported,
.ddl_command_start = NULL,
@ -256,7 +255,6 @@ CrossModuleFunctions tsl_cm_functions = {
.remote_txn_id_out = remote_txn_id_out_pg,
.remote_txn_heal_server = remote_txn_heal_server,
.set_rel_pathlist = tsl_set_rel_pathlist,
.hypertable_should_be_expanded = tsl_hypertable_should_be_expanded,
.server_dispatch_path_create = server_dispatch_path_create,
.distributed_copy = remote_distributed_copy,
.ddl_command_start = tsl_ddl_command_start,

View File

@ -94,27 +94,4 @@ tsl_set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntr
ts_cache_release(hcache);
}
/*
* For distributed hypertables, we skip expanding the hypertable chunks when using the
* per-server-queries optimization. This is because we don't want the chunk relations in the plan.
* Instead the fdw will create paths for hypertable-server relations during path creation.
*/
bool
tsl_hypertable_should_be_expanded(RelOptInfo *rel, RangeTblEntry *rte, Hypertable *ht,
List *chunk_oids)
{
if (ts_guc_enable_per_server_queries && hypertable_is_distributed(ht))
{
TimescaleDBPrivate *rel_info;
Assert(rel->fdw_private != NULL);
rel_info = rel->fdw_private;
rel_info->chunk_oids = chunk_oids;
/* turn off expansion */
rte->inh = false;
return false;
}
return true;
}
#endif

View File

@ -247,17 +247,17 @@ SELECT * FROM disttable;
----------------------------------------------------------------------------------------------------------------------------------
Append
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
@ -277,33 +277,31 @@ EXPLAIN (VERBOSE, COSTS FALSE)
SELECT time_bucket('3 hours', time) AS time, device, avg(temp) AS avg_temp
FROM disttable GROUP BY 1, 2
ORDER BY 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: (time_bucket('@ 3 hours'::interval, "time")), device, (avg(temp))
Sort Key: (time_bucket('@ 3 hours'::interval, disttable."time"))
-> HashAggregate
Output: (time_bucket('@ 3 hours'::interval, "time")), device, avg(temp)
Group Key: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device
-> Result
Output: time_bucket('@ 3 hours'::interval, "time"), device, temp
-> Append
-> Foreign Scan
Output: "time", device, temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
(24 rows)
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device, avg(disttable.temp)
Group Key: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device
-> Sort
Output: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device, disttable.temp
Sort Key: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device
-> Append
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
(22 rows)
-- Execute some queries on the frontend and return the results
SELECT * FROM disttable;
@ -1098,17 +1096,17 @@ SELECT * FROM disttable_replicated;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=8 loops=1)
-> Foreign Scan (actual rows=4 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5])
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7])
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6])
@ -1155,17 +1153,17 @@ SELECT * FROM disttable_replicated WHERE temp > 2.0;
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=2 loops=1)
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5]) AND ((temp > 2::double precision))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7]) AND ((temp > 2::double precision))
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6]) AND ((temp > 2::double precision))
@ -1178,17 +1176,17 @@ SELECT * FROM disttable_replicated WHERE temp > 2.0 or "Color" = 11;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=3 loops=1)
-> Foreign Scan (actual rows=1 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5]) AND (((temp > 2::double precision) OR ("Color" = 11)))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7]) AND (((temp > 2::double precision) OR ("Color" = 11)))
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6]) AND (((temp > 2::double precision) OR ("Color" = 11)))
@ -1201,12 +1199,12 @@ SELECT * FROM disttable_replicated WHERE time < '2018-01-01 09:11';
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=2 loops=1)
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3]) AND (("time" < '2018-01-01 09:11:00-08'::timestamp with time zone))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5]) AND (("time" < '2018-01-01 09:11:00-08'::timestamp with time zone))
@ -1215,11 +1213,11 @@ SELECT * FROM disttable_replicated WHERE time < '2018-01-01 09:11';
--test all chunks excluded
EXPLAIN (VERBOSE, ANALYZE, COSTS FALSE, TIMING FALSE, SUMMARY FALSE)
SELECT * FROM disttable_replicated WHERE time < '2002-01-01 09:11';
QUERY PLAN
----------------------------------------------------------------------------------------------------
Seq Scan on public.disttable_replicated (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Filter: (disttable_replicated."time" < 'Tue Jan 01 09:11:00 2002 PST'::timestamp with time zone)
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------
Result (actual rows=0 loops=1)
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
One-Time Filter: false
(3 rows)
--test cte

View File

@ -247,17 +247,17 @@ SELECT * FROM disttable;
----------------------------------------------------------------------------------------------------------------------------------
Append
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Output: disttable."time", disttable.device, disttable.temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
@ -277,33 +277,31 @@ EXPLAIN (VERBOSE, COSTS FALSE)
SELECT time_bucket('3 hours', time) AS time, device, avg(temp) AS avg_temp
FROM disttable GROUP BY 1, 2
ORDER BY 1;
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: (time_bucket('@ 3 hours'::interval, "time")), device, (avg(temp))
Sort Key: (time_bucket('@ 3 hours'::interval, disttable."time"))
-> HashAggregate
Output: (time_bucket('@ 3 hours'::interval, "time")), device, avg(temp)
Group Key: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device
-> Result
Output: time_bucket('@ 3 hours'::interval, "time"), device, temp
-> Append
-> Foreign Scan
Output: "time", device, temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: "time", device, temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
(24 rows)
QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------------------
GroupAggregate
Output: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device, avg(disttable.temp)
Group Key: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device
-> Sort
Output: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device, disttable.temp
Sort Key: (time_bucket('@ 3 hours'::interval, disttable."time")), disttable.device
-> Append
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_1
Chunks: _hyper_1_1_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_2
Chunks: _hyper_1_2_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
-> Foreign Scan
Output: time_bucket('@ 3 hours'::interval, disttable."time"), disttable.device, disttable.temp
Server: server_3
Chunks: _hyper_1_3_dist_chunk
Remote SQL: SELECT "time", device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1])
(22 rows)
-- Execute some queries on the frontend and return the results
SELECT * FROM disttable;
@ -1098,17 +1096,17 @@ SELECT * FROM disttable_replicated;
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=8 loops=1)
-> Foreign Scan (actual rows=4 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5])
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7])
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6])
@ -1155,17 +1153,17 @@ SELECT * FROM disttable_replicated WHERE temp > 2.0;
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=2 loops=1)
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5]) AND ((temp > 2::double precision))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7]) AND ((temp > 2::double precision))
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6]) AND ((temp > 2::double precision))
@ -1178,17 +1176,17 @@ SELECT * FROM disttable_replicated WHERE temp > 2.0 or "Color" = 11;
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=3 loops=1)
-> Foreign Scan (actual rows=1 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk, _hyper_6_9_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3, 5]) AND (((temp > 2::double precision) OR ("Color" = 11)))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk, _hyper_6_10_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5, 7]) AND (((temp > 2::double precision) OR ("Color" = 11)))
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_3
Chunks: _hyper_6_8_dist_chunk, _hyper_6_11_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[4, 6]) AND (((temp > 2::double precision) OR ("Color" = 11)))
@ -1201,12 +1199,12 @@ SELECT * FROM disttable_replicated WHERE time < '2018-01-01 09:11';
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Append (actual rows=2 loops=1)
-> Foreign Scan (actual rows=2 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_1
Chunks: _hyper_6_6_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[3]) AND (("time" < '2018-01-01 09:11:00-08'::timestamp with time zone))
-> Foreign Scan (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
Server: server_2
Chunks: _hyper_6_7_dist_chunk
Remote SQL: SELECT "time", device, temp, "Color" FROM public.disttable_replicated WHERE _timescaledb_internal.chunks_in(disttable_replicated, ARRAY[5]) AND (("time" < '2018-01-01 09:11:00-08'::timestamp with time zone))
@ -1215,10 +1213,10 @@ SELECT * FROM disttable_replicated WHERE time < '2018-01-01 09:11';
--test all chunks excluded
EXPLAIN (VERBOSE, ANALYZE, COSTS FALSE, TIMING FALSE, SUMMARY FALSE)
SELECT * FROM disttable_replicated WHERE time < '2002-01-01 09:11';
QUERY PLAN
-----------------------------------------
QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------------
Result (actual rows=0 loops=1)
Output: "time", device, temp, "Color"
Output: disttable_replicated."time", disttable_replicated.device, disttable_replicated.temp, disttable_replicated."Color"
One-Time Filter: false
(3 rows)

File diff suppressed because it is too large Load Diff

View File

@ -75,16 +75,14 @@ EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM test_ft;
(4 rows)
EXPLAIN (VERBOSE, COSTS FALSE) SELECT avg(c0) FROM test_ft GROUP BY c1;
QUERY PLAN
-------------------------------------------------------
HashAggregate
Output: avg(c0), c1
Group Key: test_ft.c1
-> Foreign Scan on public.test_ft
Output: c0, c1
Server: server_1
Remote SQL: SELECT c0, c1 FROM public.test_ft
(7 rows)
QUERY PLAN
-----------------------------------------------------------------
Foreign Scan
Output: (avg(c0)), c1
Relations: Aggregate on (public.test_ft)
Server: server_1
Remote SQL: SELECT avg(c0), c1 FROM public.test_ft GROUP BY 2
(5 rows)
SELECT * FROM test_ft;
c0 | c1
@ -97,8 +95,8 @@ SELECT * FROM test_ft;
SELECT avg(c0) FROM test_ft GROUP BY c1;
avg
--------------------
1.5000000000000000
2.0000000000000000
1.5000000000000000
(2 rows)
-- Update rows

View File

@ -83,6 +83,7 @@ if (PG_VERSION_SUPPORTS_MULTINODE)
list(APPEND TEST_FILES
chunk_api.sql
timescaledb_fdw.sql
partitionwise_distributed.sql
)
list(APPEND TEST_FILES_DEBUG
deparse.sql
@ -90,7 +91,6 @@ if (PG_VERSION_SUPPORTS_MULTINODE)
dist_commands.sql
dist_ddl.sql
dist_util.sql
partitionwise_distributed.sql
remote_connection.sql
remote_connection_cache.sql
remote_copy.sql

View File

@ -4,7 +4,7 @@
-- Need to be super user to create extension and add servers
\c :TEST_DBNAME :ROLE_SUPERUSER;
\ir include/remote_exec.sql
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
-- Need explicit password for non-super users to connect
@ -17,47 +17,390 @@ SET ROLE :ROLE_DEFAULT_CLUSTER_USER;
SET client_min_messages TO ERROR;
DROP DATABASE IF EXISTS server_1;
DROP DATABASE IF EXISTS server_2;
DROP DATABASE IF EXISTS server_3;
SET client_min_messages TO NOTICE;
CREATE DATABASE server_1;
CREATE DATABASE server_2;
CREATE DATABASE server_3;
\c server_1
-- Creating extension is only possible as super-user
\c server_1 :ROLE_SUPERUSER
SET client_min_messages TO ERROR;
CREATE EXTENSION timescaledb;
\c server_2
\c server_2 :ROLE_SUPERUSER
SET client_min_messages TO ERROR;
CREATE EXTENSION timescaledb;
\c server_3
SET client_min_messages TO ERROR;
CREATE EXTENSION timescaledb;
\c :TEST_DBNAME :ROLE_SUPERUSER;
\c :TEST_DBNAME :ROLE_SUPERUSER
SET ROLE :ROLE_DEFAULT_CLUSTER_USER;
-- Add servers using the TimescaleDB server management API
SELECT * FROM add_server('server_1', database => 'server_1', password => 'pass', if_not_exists => true);
SELECT * FROM add_server('server_2', database => 'server_2', password => 'pass', if_not_exists => true);
SELECT inet_server_port() AS "port" \gset
CREATE TABLE hyper (time timestamptz, device int, temp float);
SELECT * FROM create_hypertable('hyper', 'time', 'device', 2, replication_factor => 1);
-- Create a similar PostgreSQL partitioned table
CREATE SERVER IF NOT EXISTS server_pg1 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', dbname 'server_1', port inet_server_port());
OPTIONS (host 'localhost', dbname 'server_1', port :'port');
CREATE SERVER IF NOT EXISTS server_pg2 FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', dbname 'server_2', port '5432');
OPTIONS (host 'localhost', dbname 'server_2', port :'port');
CREATE USER MAPPING IF NOT EXISTS FOR :ROLE_DEFAULT_CLUSTER_USER server server_pg1
OPTIONS (user :'ROLE_DEFAULT_CLUSTER_USER', password 'pass');
CREATE USER MAPPING IF NOT EXISTS FOR :ROLE_DEFAULT_CLUSTER_USER server server_pg2
OPTIONS (user :'ROLE_DEFAULT_CLUSTER_USER', password 'pass');
-- Create a 2-dimensional partitioned table for comparision
CREATE TABLE pg2dim (time timestamptz, device int, temp float) PARTITION BY HASH (device);
CREATE TABLE pg2dim_h1 PARTITION OF pg2dim FOR VALUES WITH (MODULUS 2, REMAINDER 0) PARTITION BY RANGE(time);
CREATE TABLE pg2dim_h2 PARTITION OF pg2dim FOR VALUES WITH (MODULUS 2, REMAINDER 1) PARTITION BY RANGE(time);
CREATE FOREIGN TABLE pg2dim_h1_t1 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-01-01 00:00') TO ('2018-09-01 00:00') SERVER server_pg1;
CREATE FOREIGN TABLE pg2dim_h1_t2 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-09-01 00:00') TO ('2018-12-01 00:00') SERVER server_pg1;
CREATE FOREIGN TABLE pg2dim_h2_t1 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-01-01 00:00') TO ('2018-09-01 00:00') SERVER server_pg2;
CREATE FOREIGN TABLE pg2dim_h2_t2 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-09-01 00:00') TO ('2018-12-01 00:00') SERVER server_pg2;
CREATE FOREIGN TABLE pg2dim_h1_t1 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-01-18 00:00') TO ('2018-04-18 00:00') SERVER server_pg1;
CREATE FOREIGN TABLE pg2dim_h1_t2 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-04-18 00:00') TO ('2018-07-18 00:00') SERVER server_pg1;
CREATE FOREIGN TABLE pg2dim_h2_t1 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-01-18 00:00') TO ('2018-04-18 00:00') SERVER server_pg2;
CREATE FOREIGN TABLE pg2dim_h2_t2 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-04-18 00:00') TO ('2018-07-18 00:00') SERVER server_pg2;
-- Create a 1-dimensional partitioned table for comparison
CREATE TABLE pg1dim (time timestamptz, device int, temp float) PARTITION BY HASH (device);
CREATE FOREIGN TABLE pg1dim_h1 PARTITION OF pg1dim FOR VALUES WITH (MODULUS 2, REMAINDER 0) SERVER server_pg1;
CREATE FOREIGN TABLE pg1dim_h2 PARTITION OF pg1dim FOR VALUES WITH (MODULUS 2, REMAINDER 1) SERVER server_pg2;
-- Create these partitioned tables on the servers
SELECT * FROM test.remote_exec('{ server_pg1, server_pg2 }', $$
CREATE TABLE pg2dim (time timestamptz, device int, temp float) PARTITION BY HASH (device);
CREATE TABLE pg2dim_h1 PARTITION OF pg2dim FOR VALUES WITH (MODULUS 2, REMAINDER 0) PARTITION BY RANGE(time);
CREATE TABLE pg2dim_h2 PARTITION OF pg2dim FOR VALUES WITH (MODULUS 2, REMAINDER 1) PARTITION BY RANGE(time);
CREATE TABLE pg2dim_h1_t1 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-01-18 00:00') TO ('2018-04-18 00:00');
CREATE TABLE pg2dim_h1_t2 PARTITION OF pg2dim_h1 FOR VALUES FROM ('2018-04-18 00:00') TO ('2018-07-18 00:00');
CREATE TABLE pg2dim_h2_t1 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-01-18 00:00') TO ('2018-04-18 00:00');
CREATE TABLE pg2dim_h2_t2 PARTITION OF pg2dim_h2 FOR VALUES FROM ('2018-04-18 00:00') TO ('2018-07-18 00:00');
$$);
-- Add servers using the TimescaleDB server management AP
SELECT * FROM add_server('server_1', database => 'server_1', password => 'pass', if_not_exists => true);
SELECT * FROM add_server('server_2', database => 'server_2', password => 'pass', if_not_exists => true);
CREATE TABLE hyper (time timestamptz, device int, temp float);
SELECT * FROM create_hypertable('hyper', 'time', 'device', 2, replication_factor => 1, chunk_time_interval => '3 months'::interval);
INSERT INTO hyper VALUES
('2018-01-19 13:01', 1, 2.3),
('2018-01-20 15:05', 1, 5.3),
('2018-02-21 13:01', 3, 1.5),
('2018-02-28 15:05', 1, 5.6),
('2018-02-19 13:02', 3, 3.1),
('2018-02-19 13:02', 3, 6.7),
('2018-04-19 13:01', 1, 7.6),
('2018-04-20 15:08', 3, 8.4),
('2018-05-19 13:01', 1, 5.1),
('2018-05-20 15:08', 1, 9.4),
('2018-05-30 13:02', 3, 9.0);
INSERT INTO pg2dim VALUES
('2018-01-19 13:01', 1, 2.3),
('2018-01-20 15:05', 1, 5.3),
('2018-02-21 13:01', 3, 1.5),
('2018-02-28 15:05', 1, 5.6),
('2018-02-19 13:02', 3, 3.1),
('2018-02-19 13:02', 3, 6.7),
('2018-04-19 13:01', 1, 7.6),
('2018-04-20 15:08', 3, 8.4),
('2018-05-19 13:01', 1, 5.1),
('2018-05-20 15:08', 1, 9.4),
('2018-05-30 13:02', 3, 9.0);
SELECT * FROM test.show_subtables('hyper');
SELECT * FROM pg2dim_h1_t1;
SELECT * FROM pg2dim_h1_t2;
SELECT * FROM pg2dim_h2_t1;
SELECT * FROM pg2dim_h2_t2;
SELECT * FROM _timescaledb_internal._hyper_1_1_dist_chunk;
SELECT * FROM _timescaledb_internal._hyper_1_2_dist_chunk;
SELECT * FROM _timescaledb_internal._hyper_1_3_dist_chunk;
SELECT * FROM _timescaledb_internal._hyper_1_4_dist_chunk;
---------------------------------------------------------------------
-- PARTIAL partitionwise - Not all partition keys are covered by GROUP
-- BY
---------------------------------------------------------------------
SET enable_partitionwise_aggregate = OFF;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT device, avg(temp)
FROM pg2dim
GROUP BY 1
ORDER BY 1;
SELECT device, avg(temp)
FROM pg2dim
GROUP BY 1
ORDER BY 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT device, avg(temp)
FROM hyper
GROUP BY 1
ORDER BY 1;
-- Show result
SELECT device, avg(temp)
FROM hyper
GROUP BY 1
ORDER BY 1;
SET enable_partitionwise_aggregate = ON;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT device, avg(temp)
FROM pg2dim
GROUP BY 1
ORDER BY 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT device, avg(temp)
FROM hyper
GROUP BY 1
ORDER BY 1;
-- Show result
SELECT device, avg(temp)
FROM hyper
GROUP BY 1
ORDER BY 1;
--------------------------------------------------------------
-- FULL partitionwise - All partition keys covered by GROUP BY
--------------------------------------------------------------
SET enable_partitionwise_aggregate = OFF;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM pg2dim
GROUP BY 1, 2
ORDER BY 1, 2;
SELECT time, device, avg(temp)
FROM pg2dim
GROUP BY 1, 2
ORDER BY 1, 2;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
SET enable_partitionwise_aggregate = ON;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM pg2dim
GROUP BY 1, 2
ORDER BY 1, 2;
-- On hypertable, first show partitionwise aggs without per-server queries
SET timescaledb.enable_per_server_queries = OFF;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Enable per-server queries. Aggregate should be pushed down per
-- server instead of per chunk.
SET timescaledb.enable_per_server_queries = ON;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result
SELECT time, device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Only one chunk per server, still uses per-server plan. Not
-- choosing pushed down aggregate plan here, probably due to costing.
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
WHERE time > '2018-04-19 00:01'
GROUP BY 1, 2
ORDER BY 1, 2;
-- Test HAVING qual
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp) AS temp
FROM pg2dim
WHERE time > '2018-04-19 00:01'
GROUP BY 1, 2
HAVING avg(temp) > 4
ORDER BY 1, 2;
SELECT time, device, avg(temp) AS temp
FROM pg2dim
WHERE time > '2018-04-19 00:01'
GROUP BY 1, 2
HAVING avg(temp) > 4
ORDER BY 1, 2;
-- Test HAVING qual. Not choosing pushed down aggregate plan here,
-- probably due to costing.
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp) AS temp
FROM hyper
WHERE time > '2018-04-19 00:01'
GROUP BY 1, 2
HAVING avg(temp) > 4
ORDER BY 1, 2;
SELECT time, device, avg(temp) AS temp
FROM hyper
WHERE time > '2018-04-19 00:01'
GROUP BY 1, 2
HAVING avg(temp) > 4
ORDER BY 1, 2;
-------------------------------------------------------------------
-- All partition keys not covered by GROUP BY because of date_trunc
-- expression on time (partial partitionwise). This won't be pushed
-- down a.t.m., since no way to send partials
-------------------------------------------------------------------
SET enable_partitionwise_aggregate = OFF;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT date_trunc('month', time), device, avg(temp)
FROM pg2dim
GROUP BY 1, 2
ORDER BY 1, 2;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT date_trunc('month', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result
SELECT date_trunc('month', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
SET enable_partitionwise_aggregate = ON;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT date_trunc('month', time), device, avg(temp)
FROM pg2dim
GROUP BY 1, 2
ORDER BY 1, 2;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT date_trunc('month', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result by month
SELECT date_trunc('month', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result by year
SELECT date_trunc('year', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-------------------------------------------------------
-- Test time_bucket (only supports up to days grouping)
-------------------------------------------------------
SET enable_partitionwise_aggregate = OFF;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time_bucket('1 day', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result
SELECT time_bucket('1 day', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
SET enable_partitionwise_aggregate = ON;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time_bucket('1 day', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
-- Show result
SELECT time_bucket('1 day', time), device, avg(temp)
FROM hyper
GROUP BY 1, 2
ORDER BY 1, 2;
---------------------------------------------------------------------
-- Test expressions that either aren't pushed down or only pushed down
-- in parts
---------------------------------------------------------------------
-- Create a custom aggregate that does not exist on the data nodes
CREATE AGGREGATE custom_sum(int4) (
SFUNC = int4_sum,
STYPE = int8
);
-- sum contains random(), so not pushed down to servers
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), sum(temp * (random() <= 1)::int) as sum
FROM pg2dim
GROUP BY 1, 2
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), sum(temp * (random() <= 1)::int) as sum
FROM hyper
GROUP BY 1, 2
LIMIT 1;
-- Pushed down with non-pushable expression taken out
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), random() * device as rand_dev, custom_sum(device)
FROM pg2dim
GROUP BY 1, 2
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), random() * device as rand_dev, custom_sum(device)
FROM hyper
GROUP BY 1, 2
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), sum(temp) * random() * device as sum_temp
FROM pg2dim
GROUP BY 1, 2
HAVING avg(temp) * custom_sum(device) > 0.8
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp), sum(temp) * random() * device as sum_temp
FROM hyper
GROUP BY 1, 2
HAVING avg(temp) * custom_sum(device) > 0.8
LIMIT 1;
-- not pushed down because of non-shippable expression on the
-- underlying rel
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM pg2dim
WHERE (pg2dim.temp * random() <= 20)
GROUP BY 1, 2
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
WHERE (hyper.temp * random() <= 20)
GROUP BY 1, 2
LIMIT 1;