1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-23 06:22:03 +08:00

Add tableoid support for transparent decompression

This patch adds support for the tableoid system column to transparent
decompression. tableoid will be the relid of the uncompressed chunk.
All other system columns will still throw an error when queried.
This commit is contained in:
Sven Klemm 2019-09-19 19:48:48 +02:00 committed by Matvey Arye
parent 0606aeba9e
commit 32cb4a6af8
12 changed files with 3105 additions and 2549 deletions

@ -348,6 +348,10 @@ ts_ordered_append_should_optimize(PlannerInfo *root, RelOptInfo *rel, Hypertable
else
return false;
/* ordered append won't work for system columns / whole row orderings */
if (sort_var->varattno <= 0)
return false;
sort_relid = sort_var->varno;
tce = lookup_type_cache(sort_var->vartype,
TYPECACHE_EQ_OPR | TYPECACHE_LT_OPR | TYPECACHE_GT_OPR);

@ -747,7 +747,7 @@ can_order_by_pathkeys(RelOptInfo *chunk_rel, CompressionInfo *info, List *pathke
else
continue;
if (var->varno != chunk_rel->relid)
if (var->varno != chunk_rel->relid || var->varattno <= 0)
continue;
if (IsA(other, Const) || IsA(other, Param))
@ -782,6 +782,10 @@ can_order_by_pathkeys(RelOptInfo *chunk_rel, CompressionInfo *info, List *pathke
return false;
var = castNode(Var, expr);
if (var->varattno <= 0)
return false;
column_name = get_attname_compat(info->chunk_rte->relid, var->varattno, false);
ci = get_column_compressioninfo(info->hypertable_compression_info, column_name);
@ -818,6 +822,10 @@ can_order_by_pathkeys(RelOptInfo *chunk_rel, CompressionInfo *info, List *pathke
return false;
var = castNode(Var, expr);
if (var->varattno <= 0)
return false;
column_name = get_attname_compat(info->chunk_rte->relid, var->varattno, false);
ci = get_column_compressioninfo(info->hypertable_compression_info, column_name);

@ -46,7 +46,6 @@ typedef struct DecompressChunkPath
* that do not have a representation in the uncompressed chunk
*/
List *varattno_map;
bool reverse;
bool needs_sequence_num;
} DecompressChunkPath;

@ -6,6 +6,7 @@
#include <postgres.h>
#include <miscadmin.h>
#include <access/sysattr.h>
#include <executor/executor.h>
#include <nodes/bitmapset.h>
#include <nodes/makefuncs.h>
@ -70,6 +71,7 @@ typedef struct DecompressChunkState
bool initialized;
bool reverse;
int hypertable_id;
Oid chunk_relid;
List *hypertable_compression_info;
int counter;
MemoryContext per_batch_context;
@ -100,7 +102,7 @@ decompress_chunk_state_create(CustomScan *cscan)
settings = linitial(cscan->custom_private);
state->hypertable_id = linitial_int(settings);
state->reverse = lsecond_int(settings);
state->chunk_relid = lsecond_int(settings);
state->varattno_map = lsecond(cscan->custom_private);
return (Node *) state;
@ -165,6 +167,53 @@ initialize_column_state(DecompressChunkState *state)
}
}
typedef struct ConstifyTableOidContext
{
Index chunk_index;
Oid chunk_relid;
} ConstifyTableOidContext;
static Node *
constify_tableoid_walker(Node *node, ConstifyTableOidContext *ctx)
{
if (node == NULL)
return NULL;
if (IsA(node, Var))
{
Var *var = castNode(Var, node);
if (var->varno != ctx->chunk_index)
return node;
if (var->varattno == TableOidAttributeNumber)
return (
Node *) makeConst(OIDOID, -1, InvalidOid, 4, (Datum) ctx->chunk_relid, false, true);
/*
* we doublecheck system columns here because projection will
* segfault if any system columns get through
*/
if (var->varattno < 0)
elog(ERROR, "transparent decompression only supports tableoid system column");
return node;
}
return expression_tree_mutator(node, constify_tableoid_walker, (void *) ctx);
}
static List *
constify_tableoid(List *node, Index chunk_index, Oid chunk_relid)
{
ConstifyTableOidContext ctx = {
.chunk_index = chunk_index,
.chunk_relid = chunk_relid,
};
return (List *) constify_tableoid_walker((Node *) node, &ctx);
}
/*
* Complete initialization of the supplied CustomScanState.
*
@ -179,8 +228,28 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
Plan *compressed_scan = linitial(cscan->custom_plans);
Assert(list_length(cscan->custom_plans) == 1);
if (eflags & EXEC_FLAG_BACKWARD)
state->reverse = !state->reverse;
if (node->ss.ps.ps_ProjInfo)
{
/*
* if we are projecting we need to constify tableoid references here
* because decompressed tuple are virtual tuples and don't have
* system columns.
*
* We do the constify in executor because even after plan creation
* our targetlist might still get modified by parent nodes pushing
* down targetlist.
*/
List *tlist = node->ss.ps.plan->targetlist;
PlanState *ps = &node->ss.ps;
tlist = constify_tableoid(tlist, cscan->scan.scanrelid, state->chunk_relid);
ps->ps_ProjInfo =
ExecBuildProjectionInfoCompat(tlist,
ps->ps_ExprContext,
ps->ps_ResultTupleSlot,
ps,
node->ss.ss_ScanTupleSlot->tts_tupleDescriptor);
}
state->hypertable_compression_info = ts_hypertable_compression_get(state->hypertable_id);

@ -144,9 +144,17 @@ build_scan_tlist(DecompressChunkPath *path)
scan_tlist = lappend(scan_tlist, tle);
}
/* check for system columns */
bit = bms_next_member(attrs_used, -1);
if (bit > 0 && bit + FirstLowInvalidHeapAttributeNumber < 0)
elog(ERROR, "transparent decompression does not support system attributes");
{
/* we support tableoid so skip that */
if (bit == TableOidAttributeNumber - FirstLowInvalidHeapAttributeNumber)
bit = bms_next_member(attrs_used, bit);
if (bit > 0 && bit + FirstLowInvalidHeapAttributeNumber < 0)
elog(ERROR, "transparent decompression only supports tableoid system column");
}
/* check for reference to whole row */
if (bms_is_member(0 - FirstLowInvalidHeapAttributeNumber, attrs_used))
@ -177,7 +185,8 @@ build_scan_tlist(DecompressChunkPath *path)
* we need to include junk columns because they might be needed for
* filtering or sorting
*/
for (bit = bms_next_member(attrs_used, -1); bit > 0; bit = bms_next_member(attrs_used, bit))
for (bit = bms_next_member(attrs_used, 0 - FirstLowInvalidHeapAttributeNumber); bit > 0;
bit = bms_next_member(attrs_used, bit))
{
/* bits are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber ht_attno = bit + FirstLowInvalidHeapAttributeNumber;
@ -284,6 +293,11 @@ replace_compressed_vars(Node *node, CompressionInfo *info)
Var *new_var;
char *colname;
/* constify tableoid in quals */
if (var->varno == info->chunk_rel->relid && var->varattno == TableOidAttributeNumber)
return (Node *)
makeConst(OIDOID, -1, InvalidOid, 4, (Datum) info->chunk_rte->relid, false, true);
/* Upper-level Vars should be long gone at this point */
Assert(var->varlevelsup == 0);
/* If not to be replaced, we can just return the Var unmodified */
@ -432,7 +446,7 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
Assert(list_length(custom_plans) == 1);
settings = list_make2_int(dcpath->info->hypertable_id, dcpath->reverse);
settings = list_make2_int(dcpath->info->hypertable_id, dcpath->info->chunk_rte->relid);
cscan->custom_private = list_make2(settings, dcpath->varattno_map);
return &cscan->scan.plan;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -14,9 +14,13 @@ CREATE INDEX ON metrics_ordered(device_id,device_id_peer,time);
CREATE INDEX ON metrics_ordered(device_id,time);
CREATE INDEX ON metrics_ordered(device_id_peer,time);
SELECT compress_chunk('_timescaledb_internal._hyper_5_20_chunk');
SELECT compress_chunk('_timescaledb_internal._hyper_5_21_chunk');
SELECT compress_chunk('_timescaledb_internal._hyper_5_22_chunk');
-- compress all chunks
SELECT
compress_chunk(c.schema_name || '.' || c.table_name)
FROM _timescaledb_catalog.chunk c
INNER JOIN _timescaledb_catalog.hypertable ht ON c.hypertable_id=ht.id
WHERE ht.table_name = 'metrics_ordered'
ORDER BY c.id;
-- should not have ordered DecompressChunk path because segmentby columns are not part of pathkeys
:PREFIX SELECT * FROM metrics_ordered ORDER BY time DESC LIMIT 10;

@ -254,3 +254,12 @@ EXECUTE param_prep(1);
EXECUTE param_prep(2);
EXECUTE param_prep(1);
DEALLOCATE param_prep;
-- test continuous aggs
SET client_min_messages TO error;
CREATE VIEW cagg_test WITH (timescaledb.continuous) AS SELECT time_bucket('1d',time) AS time, device_id, avg(v1) FROM :TEST_TABLE WHERE device_id=1 GROUP BY 1,2;
REFRESH MATERIALIZED VIEW cagg_test;
SELECT time FROM cagg_test ORDER BY time LIMIT 1;
DROP VIEW cagg_test CASCADE;
RESET client_min_messages;

@ -0,0 +1,38 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\set TEST_TABLE 'metrics'
-- test system columns
-- all system columns except for tableoid should error
\set ON_ERROR_STOP 0
SELECT xmin FROM :TEST_TABLE ORDER BY time;
SELECT cmin FROM :TEST_TABLE ORDER BY time;
SELECT xmax FROM :TEST_TABLE ORDER BY time;
SELECT cmax FROM :TEST_TABLE ORDER BY time;
SELECT ctid FROM :TEST_TABLE ORDER BY time;
-- test system columns in WHERE and ORDER BY clause
SELECT tableoid, xmin FROM :TEST_TABLE ORDER BY time;
SELECT FROM :TEST_TABLE ORDER BY cmin::text;
SELECT FROM :TEST_TABLE WHERE cmin IS NOT NULL;
\set ON_ERROR_STOP 1
-- test tableoid in different parts of query
SELECT pg_typeof(tableoid) FROM :TEST_TABLE ORDER BY time LIMIT 1;
SELECT FROM :TEST_TABLE ORDER BY tableoid LIMIT 1;
SELECT FROM :TEST_TABLE WHERE tableoid::int > 0 LIMIT 1;
SELECT tableoid::regclass FROM :TEST_TABLE GROUP BY tableoid ORDER BY 1;
SELECT count(distinct tableoid) FROM :TEST_TABLE WHERE device_id=1 AND time < now();
-- test prepared statement
PREPARE tableoid_prep AS SELECT tableoid::regclass FROM :TEST_TABLE WHERE device_id = 1 ORDER BY time LIMIT 1;
:PREFIX EXECUTE tableoid_prep;
EXECUTE tableoid_prep;
EXECUTE tableoid_prep;
EXECUTE tableoid_prep;
EXECUTE tableoid_prep;
EXECUTE tableoid_prep;
DEALLOCATE tableoid_prep;

@ -64,26 +64,27 @@ SELECT compress_chunk('_timescaledb_internal._hyper_2_6_chunk');
SELECT compress_chunk('_timescaledb_internal._hyper_2_10_chunk');
SELECT compress_chunk('_timescaledb_internal._hyper_2_11_chunk');
-- test system columns
\set ON_ERROR_STOP 0
SELECT tableoid FROM :TEST_TABLE ORDER BY time;
SELECT xmin FROM :TEST_TABLE ORDER BY time;
SELECT cmin FROM :TEST_TABLE ORDER BY time;
SELECT xmax FROM :TEST_TABLE ORDER BY time;
SELECT cmax FROM :TEST_TABLE ORDER BY time;
SELECT ctid FROM :TEST_TABLE ORDER BY time;
\set ON_ERROR_STOP 1
SELECT
ht.schema_name || '.' || ht.table_name AS "METRICS_COMPRESSED"
FROM _timescaledb_catalog.hypertable ht
INNER JOIN _timescaledb_catalog.hypertable ht2 ON ht.id=ht2.compressed_hypertable_id AND ht2.table_name = 'metrics'
\gset
SELECT
ht.schema_name || '.' || ht.table_name AS "METRICS_SPACE_COMPRESSED"
FROM _timescaledb_catalog.hypertable ht
INNER JOIN _timescaledb_catalog.hypertable ht2 ON ht.id=ht2.compressed_hypertable_id AND ht2.table_name = 'metrics_space'
\gset
\c :TEST_DBNAME :ROLE_SUPERUSER
-- TODO this should change once we have a canonical way to create indexes on compressed tables
CREATE INDEX c_index ON _timescaledb_internal._compressed_hypertable_3(device_id);
CREATE INDEX c_space_index ON _timescaledb_internal._compressed_hypertable_4(device_id);
CREATE INDEX c_index ON :METRICS_COMPRESSED(device_id);
CREATE INDEX c_space_index ON :METRICS_SPACE_COMPRESSED(device_id);
CREATE INDEX c_index_2 ON _timescaledb_internal._compressed_hypertable_3(device_id, _ts_meta_count);
CREATE INDEX c_space_index_2 ON _timescaledb_internal._compressed_hypertable_4(device_id, _ts_meta_count);
CREATE INDEX c_index_2 ON :METRICS_COMPRESSED(device_id, _ts_meta_count);
CREATE INDEX c_space_index_2 ON :METRICS_SPACE_COMPRESSED(device_id, _ts_meta_count);
CREATE INDEX ON _timescaledb_internal._compressed_hypertable_3(device_id_peer);
CREATE INDEX ON _timescaledb_internal._compressed_hypertable_4(device_id_peer);
CREATE INDEX ON :METRICS_COMPRESSED(device_id_peer);
CREATE INDEX ON :METRICS_SPACE_COMPRESSED(device_id_peer);
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
CREATE INDEX ON metrics_space(device_id,device_id_peer, v0, v1 desc, time);
@ -127,6 +128,7 @@ SET max_parallel_workers_per_gather TO 0;
\set TEST_TABLE 'metrics_space'
\ir :TEST_QUERY_NAME
\ir include/transparent_decompression_ordered.sql
\ir include/transparent_decompression_systemcolumns.sql
-- run query with parallel enabled to ensure nothing is preventing parallel execution
-- this is just a sanity check, the result queries dont run with parallel disabled