mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 11:03:36 +08:00
Fix DecompressChunk parallel execution
When DecompressChunk is used in parallel plans the scan on the compressed hypertable chunk needs to be parallel aware to prevent duplicating work. This patch will change DecompressChunk to always create a non parallel safe path and if requested a parallel safe partial path with a parallel aware scan.
This commit is contained in:
parent
abbe5c84fd
commit
42a2c8666e
@ -26,6 +26,7 @@
|
||||
#include "decompress_chunk/decompress_chunk.h"
|
||||
#include "decompress_chunk/planner.h"
|
||||
#include "decompress_chunk/qual_pushdown.h"
|
||||
#include "utils.h"
|
||||
|
||||
#define DECOMPRESS_CHUNK_CPU_TUPLE_COST 0.01
|
||||
#define DECOMPRESS_CHUNK_BATCH_SIZE 1000
|
||||
@ -35,9 +36,65 @@ static CustomPathMethods decompress_chunk_path_methods = {
|
||||
.PlanCustomPath = decompress_chunk_plan_create,
|
||||
};
|
||||
|
||||
static RangeTblEntry *decompress_chunk_make_rte(PlannerInfo *root, Oid compressed_relid,
|
||||
LOCKMODE lockmode);
|
||||
static void create_compressed_scan_paths(PlannerInfo *root, DecompressChunkPath *path);
|
||||
static RangeTblEntry *decompress_chunk_make_rte(Oid compressed_relid, LOCKMODE lockmode);
|
||||
static void create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel,
|
||||
int parallel_workers);
|
||||
|
||||
static Path *decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *chunk_rel,
|
||||
RelOptInfo *compressed_rel, Hypertable *ht,
|
||||
List *compression_info, int parallel_workers);
|
||||
|
||||
static Index decompress_chunk_add_plannerinfo(PlannerInfo *root, Chunk *chunk);
|
||||
|
||||
void
|
||||
ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *chunk_rel, Hypertable *ht,
|
||||
Chunk *chunk)
|
||||
{
|
||||
Index compressed_index;
|
||||
RelOptInfo *compressed_rel;
|
||||
Path *path;
|
||||
List *compression_info = get_hypertablecompression_info(ht->fd.id);
|
||||
/*
|
||||
* since we rely on parallel coordination from the scan below
|
||||
* this node it is probably not beneficial to have more
|
||||
* than a single worker per chunk
|
||||
*/
|
||||
int parallel_workers = 1;
|
||||
|
||||
Assert(chunk->fd.compressed_chunk_id > 0);
|
||||
|
||||
chunk_rel->pathlist = NIL;
|
||||
chunk_rel->partial_pathlist = NIL;
|
||||
|
||||
/* add RangeTblEntry and RelOptInfo for compressed chunk */
|
||||
compressed_index = decompress_chunk_add_plannerinfo(root, chunk);
|
||||
compressed_rel = root->simple_rel_array[compressed_index];
|
||||
|
||||
compressed_rel->consider_parallel = chunk_rel->consider_parallel;
|
||||
|
||||
pushdown_quals(root, chunk_rel, compressed_rel, compression_info);
|
||||
set_baserel_size_estimates(root, compressed_rel);
|
||||
chunk_rel->rows = compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
|
||||
create_compressed_scan_paths(root,
|
||||
compressed_rel,
|
||||
compressed_rel->consider_parallel ? parallel_workers : 0);
|
||||
|
||||
/* create non-parallel path */
|
||||
path = decompress_chunk_path_create(root, chunk_rel, compressed_rel, ht, compression_info, 0);
|
||||
add_path(chunk_rel, path);
|
||||
|
||||
/* create parallel path */
|
||||
if (compressed_rel->consider_parallel && list_length(compressed_rel->partial_pathlist) > 0)
|
||||
{
|
||||
path = decompress_chunk_path_create(root,
|
||||
chunk_rel,
|
||||
compressed_rel,
|
||||
ht,
|
||||
compression_info,
|
||||
parallel_workers);
|
||||
add_partial_path(chunk_rel, path);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* calculate cost for DecompressChunkPath
|
||||
@ -56,48 +113,18 @@ cost_decompress_chunk(Path *path, Path *compressed_path)
|
||||
path->total_cost = compressed_path->total_cost + path->rows * DECOMPRESS_CHUNK_CPU_TUPLE_COST;
|
||||
}
|
||||
|
||||
Path *
|
||||
ts_decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht, Chunk *chunk,
|
||||
Path *subpath)
|
||||
/*
|
||||
* create RangeTblEntry and RelOptInfo for the compressed chunk
|
||||
* and add it to PlannerInfo
|
||||
*/
|
||||
static Index
|
||||
decompress_chunk_add_plannerinfo(PlannerInfo *root, Chunk *chunk)
|
||||
{
|
||||
DecompressChunkPath *path;
|
||||
Index compressed_index = root->simple_rel_array_size;
|
||||
Index chunk_index = rel->relid;
|
||||
Chunk *compressed_chunk = ts_chunk_get_by_id(chunk->fd.compressed_chunk_id, 0, true);
|
||||
Oid compressed_relid = compressed_chunk->table_id;
|
||||
AppendRelInfo *appinfo;
|
||||
RangeTblEntry *compressed_rte;
|
||||
|
||||
path = (DecompressChunkPath *) newNode(sizeof(DecompressChunkPath), T_CustomPath);
|
||||
|
||||
path->chunk_rel = rel;
|
||||
path->chunk_rte = planner_rt_fetch(chunk_index, root);
|
||||
path->hypertable_id = ht->fd.id;
|
||||
path->compression_info = get_hypertablecompression_info(ht->fd.id);
|
||||
|
||||
#if PG96 || PG10
|
||||
appinfo = find_childrel_appendrelinfo(root, rel);
|
||||
#else
|
||||
appinfo = root->append_rel_array[rel->relid];
|
||||
#endif
|
||||
|
||||
path->ht_rte = planner_rt_fetch(appinfo->parent_relid, root);
|
||||
|
||||
path->cpath.path.pathtype = T_CustomScan;
|
||||
path->cpath.path.parent = rel;
|
||||
path->cpath.path.pathtarget = rel->reltarget;
|
||||
path->cpath.path.param_info = subpath->param_info;
|
||||
|
||||
path->cpath.path.parallel_aware = false;
|
||||
path->cpath.path.parallel_safe = subpath->parallel_safe;
|
||||
path->cpath.path.parallel_workers = subpath->parallel_workers;
|
||||
|
||||
path->cpath.flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
||||
path->cpath.methods = &decompress_chunk_path_methods;
|
||||
|
||||
/*
|
||||
* create RangeTblEntry and RelOptInfo for the compressed chunk
|
||||
* and add it to PlannerInfo
|
||||
*/
|
||||
root->simple_rel_array_size++;
|
||||
root->simple_rel_array =
|
||||
repalloc(root->simple_rel_array, root->simple_rel_array_size * sizeof(RelOptInfo *));
|
||||
@ -109,25 +136,62 @@ ts_decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *
|
||||
root->append_rel_array[compressed_index] = NULL;
|
||||
#endif
|
||||
|
||||
path->compressed_rte = decompress_chunk_make_rte(root, compressed_relid, AccessShareLock);
|
||||
compressed_rte = decompress_chunk_make_rte(compressed_relid, AccessShareLock);
|
||||
root->simple_rte_array[compressed_index] = compressed_rte;
|
||||
|
||||
root->simple_rte_array[compressed_index] = path->compressed_rte;
|
||||
root->parse->rtable = lappend(root->parse->rtable, path->compressed_rte);
|
||||
root->parse->rtable = lappend(root->parse->rtable, compressed_rte);
|
||||
|
||||
root->simple_rel_array[compressed_index] = NULL;
|
||||
#if PG96
|
||||
path->compressed_rel = build_simple_rel(root, compressed_index, RELOPT_BASEREL);
|
||||
root->simple_rel_array[compressed_index] =
|
||||
build_simple_rel(root, compressed_index, RELOPT_BASEREL);
|
||||
#else
|
||||
path->compressed_rel = build_simple_rel(root, compressed_index, NULL);
|
||||
root->simple_rel_array[compressed_index] = build_simple_rel(root, compressed_index, NULL);
|
||||
#endif
|
||||
root->simple_rel_array[compressed_index] = path->compressed_rel;
|
||||
|
||||
pushdown_quals(path);
|
||||
set_baserel_size_estimates(root, path->compressed_rel);
|
||||
return compressed_index;
|
||||
}
|
||||
|
||||
static Path *
|
||||
decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *chunk_rel, RelOptInfo *compressed_rel,
|
||||
Hypertable *ht, List *compression_info, int parallel_workers)
|
||||
{
|
||||
DecompressChunkPath *path;
|
||||
AppendRelInfo *appinfo;
|
||||
bool parallel_safe = false;
|
||||
|
||||
path = (DecompressChunkPath *) newNode(sizeof(DecompressChunkPath), T_CustomPath);
|
||||
|
||||
path->chunk_rel = chunk_rel;
|
||||
path->chunk_rte = planner_rt_fetch(chunk_rel->relid, root);
|
||||
path->compressed_rel = compressed_rel;
|
||||
path->compressed_rte = planner_rt_fetch(compressed_rel->relid, root);
|
||||
path->hypertable_id = ht->fd.id;
|
||||
path->compression_info = compression_info;
|
||||
|
||||
appinfo = ts_get_appendrelinfo(root, chunk_rel->relid);
|
||||
path->ht_rte = planner_rt_fetch(appinfo->parent_relid, root);
|
||||
|
||||
path->cpath.path.pathtype = T_CustomScan;
|
||||
path->cpath.path.parent = chunk_rel;
|
||||
path->cpath.path.pathtarget = chunk_rel->reltarget;
|
||||
|
||||
path->cpath.flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN;
|
||||
path->cpath.methods = &decompress_chunk_path_methods;
|
||||
|
||||
path->cpath.path.rows = path->compressed_rel->rows * DECOMPRESS_CHUNK_BATCH_SIZE;
|
||||
|
||||
create_compressed_scan_paths(root, path);
|
||||
path->cpath.custom_paths = list_make1(path->compressed_rel->cheapest_total_path);
|
||||
if (parallel_workers > 0 && list_length(compressed_rel->partial_pathlist) > 0)
|
||||
parallel_safe = true;
|
||||
|
||||
path->cpath.path.parallel_aware = false;
|
||||
path->cpath.path.parallel_safe = parallel_safe;
|
||||
path->cpath.path.parallel_workers = parallel_workers;
|
||||
|
||||
if (parallel_safe)
|
||||
path->cpath.custom_paths = list_make1(linitial(compressed_rel->partial_pathlist));
|
||||
else
|
||||
path->cpath.custom_paths = list_make1(compressed_rel->cheapest_total_path);
|
||||
|
||||
cost_decompress_chunk(&path->cpath.path, path->compressed_rel->cheapest_total_path);
|
||||
|
||||
@ -135,24 +199,30 @@ ts_decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *
|
||||
}
|
||||
|
||||
static void
|
||||
create_compressed_scan_paths(PlannerInfo *root, DecompressChunkPath *path)
|
||||
create_compressed_scan_paths(PlannerInfo *root, RelOptInfo *compressed_rel, int parallel_workers)
|
||||
{
|
||||
List *pathlist = NIL;
|
||||
Path *compressed_path;
|
||||
|
||||
compressed_path = create_seqscan_path(root, path->compressed_rel, NULL, 0);
|
||||
pathlist = lappend(pathlist, compressed_path);
|
||||
/* create non parallel scan path */
|
||||
compressed_path = create_seqscan_path(root, compressed_rel, NULL, 0);
|
||||
add_path(compressed_rel, compressed_path);
|
||||
|
||||
path->compressed_rel->pathlist = pathlist;
|
||||
/* create parallel scan path */
|
||||
if (compressed_rel->consider_parallel && parallel_workers > 0)
|
||||
{
|
||||
compressed_path = create_seqscan_path(root, compressed_rel, NULL, parallel_workers);
|
||||
Assert(compressed_path->parallel_aware);
|
||||
add_partial_path(compressed_rel, compressed_path);
|
||||
}
|
||||
|
||||
set_cheapest(path->compressed_rel);
|
||||
set_cheapest(compressed_rel);
|
||||
}
|
||||
|
||||
/*
|
||||
* create RangeTblEntry for compressed chunk
|
||||
*/
|
||||
static RangeTblEntry *
|
||||
decompress_chunk_make_rte(PlannerInfo *root, Oid compressed_relid, LOCKMODE lockmode)
|
||||
decompress_chunk_make_rte(Oid compressed_relid, LOCKMODE lockmode)
|
||||
{
|
||||
RangeTblEntry *rte = makeNode(RangeTblEntry);
|
||||
Relation r = heap_open(compressed_relid, lockmode);
|
||||
|
@ -27,8 +27,8 @@ typedef struct DecompressChunkPath
|
||||
bool reverse;
|
||||
} DecompressChunkPath;
|
||||
|
||||
extern Path *ts_decompress_chunk_path_create(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
|
||||
Chunk *chunk, Path *subpath);
|
||||
void ts_decompress_chunk_generate_paths(PlannerInfo *root, RelOptInfo *rel, Hypertable *ht,
|
||||
Chunk *chunk);
|
||||
|
||||
FormData_hypertable_compression *get_column_compressioninfo(List *hypertable_compression_info,
|
||||
char *column_name);
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <postgres.h>
|
||||
#include <nodes/makefuncs.h>
|
||||
#include <nodes/nodeFuncs.h>
|
||||
#include <nodes/relation.h>
|
||||
#include <optimizer/clauses.h>
|
||||
#include <optimizer/pathnode.h>
|
||||
#include <optimizer/restrictinfo.h>
|
||||
@ -21,158 +22,94 @@
|
||||
#include "decompress_chunk/qual_pushdown.h"
|
||||
#include "hypertable_compression.h"
|
||||
|
||||
static void pushdown_nulltest(DecompressChunkPath *path, NullTest *op);
|
||||
static void pushdown_opexpr(DecompressChunkPath *path, OpExpr *op);
|
||||
static void pushdown_scalararrayopexpr(DecompressChunkPath *path, ScalarArrayOpExpr *op);
|
||||
static bool can_pushdown_var(DecompressChunkPath *path, Var *chunk_var, Var **compressed_var);
|
||||
typedef struct QualPushdownContext
|
||||
{
|
||||
RelOptInfo *chunk_rel;
|
||||
RelOptInfo *compressed_rel;
|
||||
RangeTblEntry *chunk_rte;
|
||||
RangeTblEntry *compressed_rte;
|
||||
List *compression_info;
|
||||
bool can_pushdown;
|
||||
} QualPushdownContext;
|
||||
|
||||
static bool adjust_expression(Node *node, QualPushdownContext *context);
|
||||
|
||||
void
|
||||
pushdown_quals(DecompressChunkPath *path)
|
||||
pushdown_quals(PlannerInfo *root, RelOptInfo *chunk_rel, RelOptInfo *compressed_rel,
|
||||
List *compression_info)
|
||||
{
|
||||
ListCell *lc;
|
||||
QualPushdownContext context = {
|
||||
.chunk_rel = chunk_rel,
|
||||
.compressed_rel = compressed_rel,
|
||||
.chunk_rte = planner_rt_fetch(chunk_rel->relid, root),
|
||||
.compressed_rte = planner_rt_fetch(compressed_rel->relid, root),
|
||||
.compression_info = compression_info,
|
||||
};
|
||||
|
||||
foreach (lc, path->chunk_rel->baserestrictinfo)
|
||||
foreach (lc, chunk_rel->baserestrictinfo)
|
||||
{
|
||||
RestrictInfo *ri = lfirst(lc);
|
||||
|
||||
switch (nodeTag(ri->clause))
|
||||
Expr *expr = copyObject(ri->clause);
|
||||
context.can_pushdown = true;
|
||||
adjust_expression((Node *) expr, &context);
|
||||
if (context.can_pushdown)
|
||||
{
|
||||
case T_NullTest:
|
||||
pushdown_nulltest(path, castNode(NullTest, ri->clause));
|
||||
break;
|
||||
case T_OpExpr:
|
||||
pushdown_opexpr(path, castNode(OpExpr, ri->clause));
|
||||
break;
|
||||
case T_ScalarArrayOpExpr:
|
||||
pushdown_scalararrayopexpr(path, castNode(ScalarArrayOpExpr, ri->clause));
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
compressed_rel->baserestrictinfo =
|
||||
lappend(compressed_rel->baserestrictinfo, make_simple_restrictinfo(expr));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static bool
|
||||
can_pushdown_var(DecompressChunkPath *path, Var *chunk_var, Var **compressed_var)
|
||||
adjust_expression(Node *node, QualPushdownContext *context)
|
||||
{
|
||||
char *column_name;
|
||||
FormData_hypertable_compression *compressioninfo;
|
||||
|
||||
Assert(chunk_var->varno == path->chunk_rel->relid);
|
||||
|
||||
/* ignore system attibutes or whole row references */
|
||||
if (!IsA(chunk_var, Var) || chunk_var->varattno <= 0)
|
||||
if (node == NULL)
|
||||
return false;
|
||||
|
||||
column_name = get_attname_compat(path->chunk_rte->relid, chunk_var->varattno, false);
|
||||
compressioninfo = get_column_compressioninfo(path->compression_info, column_name);
|
||||
|
||||
/* we can only push down quals for segmentby columns */
|
||||
if (compressioninfo->segmentby_column_index > 0)
|
||||
switch (nodeTag(node))
|
||||
{
|
||||
AttrNumber compressed_attno = get_attnum(path->compressed_rte->relid, column_name);
|
||||
Var *var = copyObject(chunk_var);
|
||||
|
||||
var->varno = path->compressed_rel->relid;
|
||||
var->varattno = compressed_attno;
|
||||
|
||||
*compressed_var = var;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static void
|
||||
pushdown_nulltest(DecompressChunkPath *path, NullTest *op)
|
||||
{
|
||||
Var *compressed_var;
|
||||
|
||||
if (!IsA(op->arg, Var))
|
||||
return;
|
||||
|
||||
if (can_pushdown_var(path, castNode(Var, op->arg), &compressed_var))
|
||||
{
|
||||
NullTest *compressed_op = copyObject(op);
|
||||
RestrictInfo *compressed_ri;
|
||||
compressed_op->arg = (Expr *) compressed_var;
|
||||
|
||||
compressed_ri = make_simple_restrictinfo((Expr *) compressed_op);
|
||||
|
||||
path->compressed_rel->baserestrictinfo =
|
||||
lappend(path->compressed_rel->baserestrictinfo, compressed_ri);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
pushdown_opexpr(DecompressChunkPath *path, OpExpr *op)
|
||||
{
|
||||
bool var_on_left = false;
|
||||
Expr *left, *right;
|
||||
|
||||
if (list_length(op->args) != 2)
|
||||
return;
|
||||
|
||||
left = linitial(op->args);
|
||||
right = lsecond(op->args);
|
||||
|
||||
/* we only support Var OP Const / Const OP Var for now */
|
||||
if ((IsA(left, Var) && IsA(right, Const)) || (IsA(left, Const) && IsA(right, Var)))
|
||||
{
|
||||
Var *var, *compressed_var;
|
||||
|
||||
if (IsA(left, Var))
|
||||
var_on_left = true;
|
||||
|
||||
var = var_on_left ? (Var *) left : (Var *) right;
|
||||
|
||||
/* we can only push down quals for segmentby columns */
|
||||
if (can_pushdown_var(path, var, &compressed_var))
|
||||
case T_OpExpr:
|
||||
case T_ScalarArrayOpExpr:
|
||||
case T_List:
|
||||
case T_Const:
|
||||
case T_NullTest:
|
||||
break;
|
||||
case T_Var:
|
||||
{
|
||||
OpExpr *compressed_op = copyObject(op);
|
||||
RestrictInfo *compressed_ri;
|
||||
Var *var = castNode(Var, node);
|
||||
char *column_name;
|
||||
FormData_hypertable_compression *compressioninfo;
|
||||
AttrNumber compressed_attno;
|
||||
|
||||
if (var_on_left)
|
||||
compressed_op->args = list_make2(compressed_var, copyObject(right));
|
||||
else
|
||||
compressed_op->args = list_make2(copyObject(left), compressed_var);
|
||||
/* ignore system attibutes or whole row references */
|
||||
if (var->varattno <= 0)
|
||||
{
|
||||
context->can_pushdown = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
compressed_ri = make_simple_restrictinfo((Expr *) compressed_op);
|
||||
column_name = get_attname_compat(context->chunk_rte->relid, var->varattno, false);
|
||||
compressioninfo = get_column_compressioninfo(context->compression_info, column_name);
|
||||
|
||||
path->compressed_rel->baserestrictinfo =
|
||||
lappend(path->compressed_rel->baserestrictinfo, compressed_ri);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
pushdown_scalararrayopexpr(DecompressChunkPath *path, ScalarArrayOpExpr *op)
|
||||
{
|
||||
Expr *left, *right;
|
||||
|
||||
if (list_length(op->args) != 2)
|
||||
return;
|
||||
|
||||
left = linitial(op->args);
|
||||
right = lsecond(op->args);
|
||||
|
||||
if (IsA(left, Var) && IsA(right, Const))
|
||||
{
|
||||
Var *compressed_var;
|
||||
|
||||
/* we can only push down quals for segmentby columns */
|
||||
if (can_pushdown_var(path, castNode(Var, left), &compressed_var))
|
||||
{
|
||||
ScalarArrayOpExpr *compressed_op = copyObject(op);
|
||||
RestrictInfo *compressed_ri;
|
||||
|
||||
compressed_op->args = list_make2(compressed_var, copyObject(right));
|
||||
|
||||
compressed_ri = make_simple_restrictinfo((Expr *) compressed_op);
|
||||
|
||||
path->compressed_rel->baserestrictinfo =
|
||||
lappend(path->compressed_rel->baserestrictinfo, compressed_ri);
|
||||
/* we can only push down quals for segmentby columns */
|
||||
if (compressioninfo->segmentby_column_index <= 0)
|
||||
{
|
||||
context->can_pushdown = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
compressed_attno = get_attnum(context->compressed_rte->relid, column_name);
|
||||
var->varno = context->compressed_rel->relid;
|
||||
var->varattno = compressed_attno;
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
context->can_pushdown = false;
|
||||
return true;
|
||||
break;
|
||||
}
|
||||
|
||||
return expression_tree_walker(node, adjust_expression, context);
|
||||
}
|
||||
|
@ -8,4 +8,5 @@
|
||||
|
||||
#include "decompress_chunk/decompress_chunk.h"
|
||||
|
||||
void pushdown_quals(DecompressChunkPath *path);
|
||||
void pushdown_quals(PlannerInfo *root, RelOptInfo *chunk_rel, RelOptInfo *compressed_rel,
|
||||
List *compression_info);
|
||||
|
@ -36,10 +36,6 @@ tsl_set_rel_pathlist_hook(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTb
|
||||
Chunk *chunk = ts_chunk_get_by_relid(rte->relid, 0, true);
|
||||
|
||||
if (chunk->fd.compressed_chunk_id > 0)
|
||||
{
|
||||
rel->pathlist = list_make1(
|
||||
ts_decompress_chunk_path_create(root, rel, ht, chunk, linitial(rel->pathlist)));
|
||||
rel->partial_pathlist = NIL;
|
||||
}
|
||||
ts_decompress_chunk_generate_paths(root, rel, ht, chunk);
|
||||
}
|
||||
}
|
||||
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
@ -4,10 +4,10 @@
|
||||
|
||||
|
||||
-- this should use DecompressChunk node
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = 1 ORDER BY time LIMIT 5;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = 1 ORDER BY time LIMIT 5;
|
||||
|
||||
-- test RECORD by itself
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = 1 ORDER BY time;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = 1 ORDER BY time;
|
||||
|
||||
-- test expressions
|
||||
:PREFIX SELECT
|
||||
@ -16,53 +16,53 @@
|
||||
COALESCE(NULL,v1,v2) AS "coalesce",
|
||||
NULL AS "NULL",
|
||||
'text' AS "text",
|
||||
metrics AS "RECORD"
|
||||
FROM metrics WHERE device_id IN (1,2) ORDER BY time, device_id;
|
||||
:TEST_TABLE AS "RECORD"
|
||||
FROM :TEST_TABLE WHERE device_id IN (1,2) ORDER BY time, device_id;
|
||||
|
||||
-- test empty targetlist
|
||||
:PREFIX SELECT FROM metrics;
|
||||
:PREFIX SELECT FROM :TEST_TABLE;
|
||||
|
||||
-- test empty resultset
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id < 0;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id < 0;
|
||||
|
||||
-- test targetlist not referencing columns
|
||||
:PREFIX SELECT 1 FROM metrics;
|
||||
:PREFIX SELECT 1 FROM :TEST_TABLE;
|
||||
|
||||
-- test constraints not present in targetlist
|
||||
:PREFIX SELECT v1 FROM metrics WHERE device_id = 1 ORDER BY v1;
|
||||
:PREFIX SELECT v1 FROM :TEST_TABLE WHERE device_id = 1 ORDER BY v1;
|
||||
|
||||
-- test order not present in targetlist
|
||||
:PREFIX SELECT v2 FROM metrics WHERE device_id = 1 ORDER BY v1;
|
||||
:PREFIX SELECT v2 FROM :TEST_TABLE WHERE device_id = 1 ORDER BY v1;
|
||||
|
||||
-- test column with all NULL
|
||||
:PREFIX SELECT v3 FROM metrics WHERE device_id = 1;
|
||||
:PREFIX SELECT v3 FROM :TEST_TABLE WHERE device_id = 1;
|
||||
|
||||
--
|
||||
-- test qual pushdown
|
||||
--
|
||||
|
||||
-- time is not segment by column so should not be pushed down
|
||||
:PREFIX SELECT * FROM metrics WHERE time < '2000-01-08' ORDER BY time, device_id;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE time < '2000-01-08' ORDER BY time, device_id;
|
||||
|
||||
-- device_id constraint should be pushed down
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = 1 ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = 1 ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
-- test IS NULL / IS NOT NULL
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id IS NOT NULL ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id IS NULL ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id IS NOT NULL ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id IS NULL ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
-- test IN (Const,Const)
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id IN (1,2) ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id IN (1,2) ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
-- test cast pushdown
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = '1'::text::int ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = '1'::text::int ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
-- test expressions
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = 1 + 4/2 ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = 1 + 4/2 ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
-- test function calls
|
||||
-- not yet pushed down
|
||||
:PREFIX SELECT * FROM metrics WHERE device_id = length(substring(version(),1,3)) ORDER BY time, device_id LIMIT 10;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE device_id = length(substring(version(),1,3)) ORDER BY time, device_id LIMIT 10;
|
||||
|
||||
--
|
||||
-- test constraint exclusion
|
||||
@ -70,34 +70,34 @@ FROM metrics WHERE device_id IN (1,2) ORDER BY time, device_id;
|
||||
|
||||
-- test plan time exclusion
|
||||
-- first chunk should be excluded
|
||||
:PREFIX SELECT * FROM metrics WHERE time > '2000-01-08' ORDER BY time, device_id;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE time > '2000-01-08' ORDER BY time, device_id;
|
||||
|
||||
-- test runtime exclusion
|
||||
-- first chunk should be excluded
|
||||
:PREFIX SELECT * FROM metrics WHERE time > '2000-01-08'::text::timestamptz ORDER BY time, device_id;
|
||||
:PREFIX SELECT * FROM :TEST_TABLE WHERE time > '2000-01-08'::text::timestamptz ORDER BY time, device_id;
|
||||
|
||||
-- test aggregate
|
||||
:PREFIX SELECT count(*) FROM metrics;
|
||||
:PREFIX SELECT count(*) FROM :TEST_TABLE;
|
||||
|
||||
-- test aggregate with GROUP BY
|
||||
:PREFIX SELECT count(*) FROM metrics GROUP BY device_id ORDER BY device_id;
|
||||
:PREFIX SELECT count(*) FROM :TEST_TABLE GROUP BY device_id ORDER BY device_id;
|
||||
|
||||
-- test window functions with GROUP BY
|
||||
:PREFIX SELECT sum(count(*)) OVER () FROM metrics GROUP BY device_id ORDER BY device_id;
|
||||
:PREFIX SELECT sum(count(*)) OVER () FROM :TEST_TABLE GROUP BY device_id ORDER BY device_id;
|
||||
|
||||
-- test CTE
|
||||
:PREFIX WITH
|
||||
q AS (SELECT v1 FROM metrics ORDER BY time)
|
||||
q AS (SELECT v1 FROM :TEST_TABLE ORDER BY time)
|
||||
SELECT * FROM q ORDER BY v1;
|
||||
|
||||
-- test CTE join
|
||||
:PREFIX WITH
|
||||
q1 AS (SELECT time, v1 FROM metrics WHERE device_id=1 ORDER BY time),
|
||||
q2 AS (SELECT time, v2 FROM metrics WHERE device_id=2 ORDER BY time)
|
||||
q1 AS (SELECT time, v1 FROM :TEST_TABLE WHERE device_id=1 ORDER BY time),
|
||||
q2 AS (SELECT time, v2 FROM :TEST_TABLE WHERE device_id=2 ORDER BY time)
|
||||
SELECT * FROM q1 INNER JOIN q2 ON q1.time=q2.time ORDER BY q1.time;
|
||||
|
||||
-- test prepared statement
|
||||
PREPARE prep AS SELECT count(time) FROM metrics WHERE device_id = 1;
|
||||
PREPARE prep AS SELECT count(time) FROM :TEST_TABLE WHERE device_id = 1;
|
||||
:PREFIX EXECUTE prep;
|
||||
EXECUTE prep;
|
||||
EXECUTE prep;
|
||||
@ -109,17 +109,17 @@ DEALLOCATE prep;
|
||||
|
||||
-- test explicit self-join
|
||||
-- XXX FIXME
|
||||
-- :PREFIX SELECT * FROM metrics m1 INNER JOIN metrics m2 ON m1.time = m2.time ORDER BY m1.time;
|
||||
-- :PREFIX SELECT * FROM :TEST_TABLE m1 INNER JOIN :TEST_TABLE m2 ON m1.time = m2.time ORDER BY m1.time;
|
||||
|
||||
-- test implicit self-join
|
||||
-- XXX FIXME
|
||||
-- :PREFIX SELECT * FROM metrics m1, metrics m2 WHERE m1.time = m2.time ORDER BY m1.time;
|
||||
-- :PREFIX SELECT * FROM :TEST_TABLE m1, :TEST_TABLE m2 WHERE m1.time = m2.time ORDER BY m1.time;
|
||||
|
||||
-- test self-join with sub-query
|
||||
-- XXX FIXME
|
||||
-- :PREFIX SELECT * FROM (SELECT * FROM metrics m1) m1 INNER JOIN (SELECT * FROM metrics m2) m2 ON m1.time = m2.time ORDER BY m1.time;
|
||||
-- :PREFIX SELECT * FROM (SELECT * FROM :TEST_TABLE m1) m1 INNER JOIN (SELECT * FROM :TEST_TABLE m2) m2 ON m1.time = m2.time ORDER BY m1.time;
|
||||
|
||||
-- test system columns
|
||||
-- XXX FIXME
|
||||
--SELECT xmin FROM metrics ORDER BY time;
|
||||
--SELECT xmin FROM :TEST_TABLE ORDER BY time;
|
||||
|
||||
|
@ -24,26 +24,53 @@ ALTER TABLE metrics DROP COLUMN filler_3;
|
||||
INSERT INTO metrics(time,device_id,v1,v2,v3) SELECT time, device_id, device_id + 0.25, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','5m') gtime(time), generate_series(1,5,1) gdevice(device_id);
|
||||
ANALYZE metrics;
|
||||
|
||||
-- create identical hypertable with space partitioning
|
||||
CREATE TABLE metrics_space(filler_1 int, filler_2 int, filler_3 int, time timestamptz NOT NULL, device_id int, v1 float, v2 float, v3 float);
|
||||
SELECT create_hypertable('metrics_space','time','device_id',3);
|
||||
|
||||
ALTER TABLE metrics_space DROP COLUMN filler_1;
|
||||
INSERT INTO metrics_space(time,device_id,v1,v2,v3) SELECT time, device_id, device_id + 0.25, device_id + 0.5, NULL FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,'2000-01-05 23:55:00+0','5m') gtime(time), generate_series(1,5,1) gdevice(device_id);
|
||||
ALTER TABLE metrics_space DROP COLUMN filler_2;
|
||||
INSERT INTO metrics_space(time,device_id,v1,v2,v3) SELECT time, device_id, device_id + 0.25, device_id + 0.5, NULL FROM generate_series('2000-01-06 0:00:00+0'::timestamptz,'2000-01-12 23:55:00+0','5m') gtime(time), generate_series(1,5,1) gdevice(device_id);
|
||||
ALTER TABLE metrics_space DROP COLUMN filler_3;
|
||||
INSERT INTO metrics_space(time,device_id,v1,v2,v3) SELECT time, device_id, device_id + 0.25, device_id + 0.5, NULL FROM generate_series('2000-01-13 0:00:00+0'::timestamptz,'2000-01-19 23:55:00+0','5m') gtime(time), generate_series(1,5,1) gdevice(device_id);
|
||||
ANALYZE metrics_space;
|
||||
|
||||
-- run queries on uncompressed hypertable and store result
|
||||
\set PREFIX ''
|
||||
\set ECHO none
|
||||
SET client_min_messages TO error;
|
||||
\o :TEST_RESULTS_UNCOMPRESSED
|
||||
\set TEST_TABLE 'metrics'
|
||||
\ir :TEST_QUERY_NAME
|
||||
\set TEST_TABLE 'metrics_space'
|
||||
\ir :TEST_QUERY_NAME
|
||||
\o
|
||||
RESET client_min_messages;
|
||||
\set ECHO all
|
||||
|
||||
-- compress some chunks on the hypertable
|
||||
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time asc nulls last', timescaledb.compress_segmentby='device_id');
|
||||
-- compress first and last chunk on the hypertable
|
||||
ALTER TABLE metrics SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_1_3_chunk');
|
||||
|
||||
-- compress some chunks on space partitioned hypertable
|
||||
-- we compress all chunks of first time slice, none of second, and 2 of the last time slice
|
||||
ALTER TABLE metrics_space SET (timescaledb.compress, timescaledb.compress_orderby='time', timescaledb.compress_segmentby='device_id');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_2_4_chunk');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_2_5_chunk');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_2_6_chunk');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_2_10_chunk');
|
||||
SELECT compress_chunk('_timescaledb_internal._hyper_2_11_chunk');
|
||||
|
||||
-- run queries on compressed hypertable and store result
|
||||
\set PREFIX ''
|
||||
\set ECHO none
|
||||
SET client_min_messages TO error;
|
||||
\o :TEST_RESULTS_COMPRESSED
|
||||
\set TEST_TABLE 'metrics'
|
||||
\ir :TEST_QUERY_NAME
|
||||
\set TEST_TABLE 'metrics_space'
|
||||
\ir :TEST_QUERY_NAME
|
||||
\o
|
||||
RESET client_min_messages;
|
||||
@ -62,7 +89,16 @@ SELECT
|
||||
SET max_parallel_workers_per_gather TO 0;
|
||||
|
||||
-- get explain for queries on hypertable with compression
|
||||
\set TEST_TABLE 'metrics'
|
||||
\ir :TEST_QUERY_NAME
|
||||
\set TEST_TABLE 'metrics_space'
|
||||
\ir :TEST_QUERY_NAME
|
||||
|
||||
-- run query with parallel enabled to ensure nothing is preventing parallel execution
|
||||
-- this is just a sanity check, the result queries dont run with parallel disabled
|
||||
SET max_parallel_workers_per_gather TO 4;
|
||||
EXPLAIN (costs off) SELECT * FROM metrics ORDER BY time, device_id;
|
||||
EXPLAIN (costs off) SELECT * FROM metrics_space ORDER BY time, device_id;
|
||||
|
||||
-- diff compressed and uncompressed results
|
||||
:DIFF_CMD
|
||||
|
Loading…
x
Reference in New Issue
Block a user