Merged in enordstr/backend-database/enordstr/insert-trigger-sorting (pull request #113)

Refactor insert path to use triggers instead of temp copy table.

Approved-by: Matvey Arye
Approved-by: ci-vast
This commit is contained in:
enordstr NA 2017-03-10 19:21:04 +00:00
commit 98d6d6a6fa
16 changed files with 480 additions and 511 deletions

View File

@ -42,7 +42,6 @@ sql/main/chunk_replica_node_index_triggers.sql
sql/main/chunk_replica_node_triggers.sql
sql/main/chunk_triggers.sql
sql/main/chunk.sql
sql/main/insert.sql
sql/main/meta_info.sql
sql/main/ddl_util.sql
sql/main/ddl.sql

View File

@ -29,6 +29,12 @@ BEGIN
END
$BODY$;
CREATE OR REPLACE FUNCTION _timescaledb_internal.root_table_insert_trigger() RETURNS TRIGGER
AS '$libdir/timescaledb', 'insert_root_table_trigger' LANGUAGE C;
CREATE OR REPLACE FUNCTION _timescaledb_internal.root_table_after_insert_trigger() RETURNS TRIGGER
AS '$libdir/timescaledb', 'insert_root_table_trigger_after' LANGUAGE C;
-- Trigger for handling the addition of new hypertables.
CREATE OR REPLACE FUNCTION _timescaledb_internal.on_change_hypertable()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
@ -36,9 +42,7 @@ $BODY$
DECLARE
remote_node _timescaledb_catalog.node;
BEGIN
IF TG_OP = 'INSERT' THEN
EXECUTE format(
$$
CREATE SCHEMA IF NOT EXISTS %I
@ -62,6 +66,16 @@ BEGIN
CREATE TRIGGER insert_trigger AFTER INSERT ON %I.%I
FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.on_modify_main_table();
$$, NEW.schema_name, NEW.table_name);
EXECUTE format(
$$
CREATE TRIGGER insert_trigger BEFORE INSERT ON %I.%I
FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger(%L);
$$, NEW.root_schema_name, NEW.root_table_name, NEW.id);
EXECUTE format(
$$
CREATE TRIGGER after_insert_trigger AFTER INSERT ON %I.%I
FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger(%L);
$$, NEW.root_schema_name, NEW.root_table_name, NEW.id);
RETURN NEW;
END IF;

View File

@ -1,4 +0,0 @@
-- This file contains functions that aid in inserting data into a hypertable.
CREATE OR REPLACE FUNCTION _timescaledb_internal.insert_trigger_on_copy_table_c()
RETURNS TRIGGER AS '$libdir/timescaledb', 'insert_trigger_on_copy_table_c' LANGUAGE C;

View File

@ -1,5 +1,2 @@
CREATE OR REPLACE FUNCTION _timescaledb_catalog.get_partition_for_key(text, int) RETURNS smallint
AS '$libdir/timescaledb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT;
CREATE OR REPLACE FUNCTION _timescaledb_catalog.array_position_least(smallint[], smallint) RETURNS smallint
AS '$libdir/timescaledb', 'array_position_least' LANGUAGE C IMMUTABLE STRICT;

View File

@ -1,41 +1,23 @@
#include "postgres.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/relscan.h"
#include "commands/defrem.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "catalog/pg_trigger.h"
#include "catalog/pg_class.h"
#include "optimizer/planner.h"
#include "optimizer/clauses.h"
#include "nodes/nodes.h"
#include "nodes/print.h"
#include "nodes/nodeFuncs.h"
#include "nodes/makefuncs.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/int8.h"
#include "executor/spi.h"
#include "commands/extension.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "deps/dblink.h"
#include "parser/parse_utilcmd.h"
#include "parser/parser.h"
#include <postgres.h>
#include <funcapi.h>
#include <catalog/pg_type.h>
#include <catalog/pg_opfamily.h>
#include <utils/rel.h>
#include <utils/tuplesort.h>
#include <utils/tqual.h>
#include <utils/rls.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <utils/guc.h>
#include <commands/tablecmds.h>
#include <commands/trigger.h>
#include "access/xact.h"
#include "access/htup_details.h"
#include "parser/parse_oper.h"
#include "parser/parse_func.h"
#include <access/xact.h>
#include <access/htup_details.h>
#include <access/heapam.h>
#include "fmgr.h"
#include <miscadmin.h>
#include <fmgr.h>
#include "insert.h"
#include "cache.h"
@ -51,24 +33,6 @@
#include "chunk.h"
#include "timescaledb.h"
#include <utils/tqual.h>
#include <utils/rls.h>
#include <miscadmin.h>
#include <access/heapam.h>
#define INSERT_TRIGGER_COPY_TABLE_FN "insert_trigger_on_copy_table_c"
#define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger"
/* private funcs */
static ObjectAddress create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo * part_info, epoch_and_partitions_set * epoch);
static Node *get_keyspace_fn_call(PartitioningInfo * part_info);
/*
* Inserts rows from the temporary copy table into correct hypertable child tables.
* hypertable_id - ID of the hypertable the data belongs to
*/
static FmgrInfo *
get_close_if_needed_fn()
{
@ -101,15 +65,11 @@ close_if_needed(hypertable_cache_entry * hci, Chunk * chunk)
}
/*
*
* Helper functions for inserting tuples into chunk tables
*
* We insert one chunk at a time and hold a context while we insert
* a particular chunk;
*
* */
*/
typedef struct ChunkInsertCtxRel
{
Relation rel;
@ -295,64 +255,170 @@ chunk_insert_ctx_insert_tuple(ChunkInsertCtx * ctx, HeapTuple tup)
}
}
typedef struct CopyTableQueryCtx
Datum insert_root_table_trigger(PG_FUNCTION_ARGS);
Datum insert_root_table_trigger_after(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(insert_root_table_trigger);
PG_FUNCTION_INFO_V1(insert_root_table_trigger_after);
typedef struct InsertTriggerCtx
{
Cache *hypertable_cache;
hypertable_cache_entry *hypertable;
epoch_and_partitions_set *epoch;
Oid relid;
AttrNumber time_attno;
Partition *part;
ChunkInsertCtx *chunk_ctx;
epoch_and_partitions_set *pe;
hypertable_cache_entry *hci;
} CopyTableQueryCtx;
Tuplesortstate *sort;
TupleDesc expanded_tupdesc;
HeapTuple first_tuple;
MemoryContext mctx;
} InsertTriggerCtx;
static bool
copy_table_tuple_found(TupleInfo * ti, void *data)
static InsertTriggerCtx *insert_trigger_ctx;
static TupleDesc
tuple_desc_expand(TupleDesc old_tupdesc, AttrNumber *time_attno, const char *time_column_name)
{
bool is_null;
CopyTableQueryCtx *ctx = data;
int16 keyspace_pt;
int64 time_pt;
TupleDesc new_tupdesc = CreateTemplateTupleDesc(old_tupdesc->natts + 1, false);
int i;
if (ctx->pe->num_partitions > 1)
for (i = 1; i <= old_tupdesc->natts; i++)
{
/*
* first element is partition index (used for sorting but not
* necessary here)
*/
Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null);
Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = DatumGetInt16(keyspace_datum);
}
else
{
Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = KEYSPACE_PT_NO_PARTITIONING;
if (strncmp(old_tupdesc->attrs[i - 1]->attname.data, time_column_name, NAMEDATALEN) == 0)
{
*time_attno = i;
}
TupleDescCopyEntry(new_tupdesc, (AttrNumber) i, old_tupdesc, (AttrNumber) i);
}
TupleDescInitEntry(new_tupdesc, (AttrNumber) new_tupdesc->natts,
CATALOG_SCHEMA_NAME "_partition_id", INT4OID, -1, 0);
if (ctx->chunk_ctx != NULL && !chunk_timepoint_is_member(ctx->chunk_ctx->chunk, time_pt))
return BlessTupleDesc(new_tupdesc);
}
static Oid
time_type_to_sort_type(Oid time_type)
{
switch (time_type)
{
case TIMESTAMPOID:
case TIMESTAMPTZOID:
return INT8OID;
case INT2OID:
case INT4OID:
case INT8OID:
return time_type;
default:
elog(ERROR, "Unsupported time type %u", time_type);
}
}
static Tuplesortstate *
tuple_sort_state_init(TupleDesc tupdesc, AttrNumber time_attno, Oid time_type)
{
Oid sort_time_type = time_type_to_sort_type(time_type);
AttrNumber columns[2] = {
tupdesc->natts,
time_attno,
};
Oid ops[2] = {
get_opfamily_member(INTEGER_BTREE_FAM_OID, INT4OID, INT4OID, BTLessStrategyNumber),
get_opfamily_member(INTEGER_BTREE_FAM_OID, sort_time_type, sort_time_type, BTLessStrategyNumber),
};
Oid collations[2] = {InvalidOid, InvalidOid};
bool nullsFirstFlags[2] = {false, false};
return tuplesort_begin_heap(tupdesc, 2, columns, ops, collations, nullsFirstFlags, work_mem, false);
}
static InsertTriggerCtx *
insert_trigger_ctx_create(HeapTuple tuple, int hypertable_id, Oid relid)
{
MemoryContext mctx = AllocSetContextCreate(CacheMemoryContext,
"Insert context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldctx;
InsertTriggerCtx *tctx;
oldctx = MemoryContextSwitchTo(mctx);
tctx = palloc0(sizeof(InsertTriggerCtx));
tctx->mctx = mctx;
tctx->relid = relid;
tctx->hypertable_cache = hypertable_cache_pin();
tctx->hypertable = hypertable_cache_get_entry(tctx->hypertable_cache, hypertable_id);
tctx->first_tuple = heap_copytuple(tuple);
MemoryContextSwitchTo(oldctx);
return tctx;
}
static void
insert_trigger_ctx_sort_init(InsertTriggerCtx * tctx, TupleDesc tupdesc)
{
tctx->expanded_tupdesc = tuple_desc_expand(tupdesc, &tctx->time_attno, tctx->hypertable->time_column_name);
tctx->sort = tuple_sort_state_init(tctx->expanded_tupdesc, tctx->time_attno, tctx->hypertable->time_column_type);
}
static void
insert_trigger_ctx_free(InsertTriggerCtx * tctx)
{
if (tctx->chunk_ctx != NULL)
chunk_insert_ctx_destroy(tctx->chunk_ctx);
cache_release(tctx->hypertable_cache);
MemoryContextDelete(tctx->mctx);
}
static HeapTuple
heap_tuple_add_partition_index(HeapTuple tuple, TupleDesc tupdesc, TupleDesc new_tupdesc, int32 partition_index)
{
Datum values[new_tupdesc->natts];
bool nulls[new_tupdesc->natts];
HeapTuple new_tuple;
heap_deform_tuple(tuple, tupdesc, values, nulls);
values[new_tupdesc->natts - 1] = Int32GetDatum(partition_index);
nulls[new_tupdesc->natts - 1] = false;
new_tuple = heap_form_tuple(new_tupdesc, values, nulls);
new_tuple->t_data->t_ctid = tuple->t_data->t_ctid;
new_tuple->t_self = tuple->t_self;
new_tuple->t_tableOid = tuple->t_tableOid;
if (tupdesc->tdhasoid)
HeapTupleSetOid(new_tuple, HeapTupleGetOid(tuple));
return new_tuple;
}
static void
insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 time_pt)
{
if (tctx->chunk_ctx != NULL && !chunk_timepoint_is_member(tctx->chunk_ctx->chunk, time_pt))
{
/* moving on to next chunk; */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
ctx->chunk_ctx = NULL;
chunk_insert_ctx_destroy(tctx->chunk_ctx);
tctx->chunk_ctx = NULL;
}
if (ctx->part != NULL && !partition_keyspace_pt_is_member(ctx->part, keyspace_pt))
if (tctx->part != NULL && tctx->part->index != partition_index)
{
/* moving on to next ctx->partition. */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
ctx->chunk_ctx = NULL;
ctx->part = NULL;
/* moving on to next partition. */
chunk_insert_ctx_destroy(tctx->chunk_ctx);
tctx->chunk_ctx = NULL;
}
if (ctx->part == NULL)
{
ctx->part = partition_epoch_get_partition(ctx->pe, keyspace_pt);
}
tctx->part = &tctx->epoch->partitions[partition_index];
if (ctx->chunk_ctx == NULL)
if (tctx->chunk_ctx == NULL)
{
Chunk *chunk;
Cache *pinned = chunk_cache_pin();
@ -361,79 +427,197 @@ copy_table_tuple_found(TupleInfo * ti, void *data)
* TODO: this first call should be non-locking and use a cache(for
* performance)
*/
chunk = chunk_cache_get(pinned, ctx->part, ctx->hci->num_replicas, time_pt, false);
close_if_needed(ctx->hci, chunk);
chunk = chunk_cache_get(pinned, tctx->part, tctx->hypertable->num_replicas, time_pt, false);
close_if_needed(tctx->hypertable, chunk);
/*
* chunk may have been closed and thus changed /or/ need to get share
* lock
*/
chunk = chunk_cache_get(pinned, ctx->part, ctx->hci->num_replicas, time_pt, true);
chunk = chunk_cache_get(pinned, tctx->part, tctx->hypertable->num_replicas, time_pt, true);
ctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned);
tctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned);
}
/* insert here: */
/* has to be a copy(not sure why) */
chunk_insert_ctx_insert_tuple(ctx->chunk_ctx, heap_copytuple(ti->tuple));
return true;
chunk_insert_ctx_insert_tuple(tctx->chunk_ctx, ti->tuple);
}
static Partition *
insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple,
TupleDesc tupdesc, int64 *timepoint_out)
{
Datum datum;
bool isnull;
int64 timepoint,
spacepoint;
epoch_and_partitions_set *epoch;
/*
* Get the timepoint from the tuple, converting to our internal time
* representation
*/
datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull);
if (isnull)
{
elog(ERROR, "No time attribute in tuple");
}
timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type);
epoch = hypertable_cache_get_partition_epoch(tctx->hypertable_cache, tctx->hypertable,
timepoint, tctx->relid);
/* Save the epoch in the insert state */
tctx->epoch = epoch;
if (epoch->num_partitions > 1)
{
spacepoint = partitioning_func_apply_tuple(epoch->partitioning, tuple, tupdesc);
}
else
{
spacepoint = KEYSPACE_PT_NO_PARTITIONING;
}
if (timepoint_out != NULL)
*timepoint_out = timepoint;
return partition_epoch_get_partition(epoch, spacepoint);
}
static void
scan_copy_table_and_insert_post(int num_tuples, void *data)
insert_trigger_ctx_tuplesort_put(InsertTriggerCtx * tctx, HeapTuple tuple, TupleDesc tupdesc)
{
CopyTableQueryCtx *ctx = data;
Partition *part;
TupleTableSlot *slot;
if (ctx->chunk_ctx != NULL)
chunk_insert_ctx_destroy(ctx->chunk_ctx);
/*
* Get the epoch (time) and partition (space) based on the information in
* the tuple
*/
part = insert_trigger_ctx_lookup_partition(tctx, tuple, tupdesc, NULL);
/*
* Create a new (expanded) tuple from the old one that has the partition
* index and timepoint as the two last attributes
*/
tuple = heap_tuple_add_partition_index(tuple, tupdesc, tctx->expanded_tupdesc, part->index);
/* Put the new tuple into the tuple sort set */
slot = MakeSingleTupleTableSlot(tctx->expanded_tupdesc);
slot = ExecStoreTuple(tuple, slot, InvalidBuffer, false);
tuplesort_puttupleslot(tctx->sort, slot);
ExecClearTuple(slot);
}
static void
scan_copy_table_and_insert(hypertable_cache_entry * hci,
epoch_and_partitions_set * pe,
Oid table, Oid index)
{
CopyTableQueryCtx query_ctx = {
.pe = pe,
.hci = hci,
};
ScannerCtx scanCtx = {
.table = table,
.index = index,
.scantype = ScannerTypeIndex,
.want_itup = true,
.nkeys = 0,
.scankey = NULL,
.data = &query_ctx,
.tuple_found = copy_table_tuple_found,
.postscan = scan_copy_table_and_insert_post,
.lockmode = AccessShareLock,
.scandirection = ForwardScanDirection,
};
/* Perform an index scan on primary key. */
scanner_scan(&scanCtx);
}
PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c);
/*
* This row-level trigger is called for every row INSERTed into a hypertable. We
* use it to redirect inserted tuples to the correct hypertable chunk in space
* and time.
*
* To avoid deadlocks, we need to insert tuples in partition and chunk/time
* order. Therefore, we collect tuples for every insert batch, sort them at the
* end of the batch, and finally insert in chunk order.
*
* Tuples are collected into a context that keeps the state across single
* invocations of the row trigger. The insert context is allocated on the first
* row of a batch, and reset in an 'after' trigger when the batch completes. The
* insert context is tracked via a static/global pointer.
*
* The insert trigger supports two processing paths, depending on single-row or
* multi-row batch inserts:
*
* Single-row (fast) path:
* =====================
*
* For single-row batches, no sorting is needed. Therefore, the trigger
* allocates only a partial insert state for the first tuple, eschewing the
* sorting state. If the after trigger is called without the sorting state,
* there is only one tuple and no sorting occurs in the 'after' trigger.
*
*
* Multi-row (slow) path:
* ======================
*
* For multi-row batches, sorting is required. On the second tuple encountered a
* sorting state is initialized and both the first tuple and all following
* tuples are inserted into the sorting state. All tuples are sorted in the
* 'after' trigger before insertion into chunks.
*/
Datum
insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
insert_root_table_trigger(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
InsertTriggerCtx *tctx = insert_trigger_ctx;
int32 hypertable_id = atoi(trigdata->tg_trigger->tgargs[0]);
HeapTuple tuple;
TupleDesc tupdesc = trigdata->tg_relation->rd_att;
MemoryContext oldctx;
/* arg 0 = hypertable id */
char *hypertable_id_arg = trigdata->tg_trigger->tgargs[0];
int32 hypertable_id = atoi(hypertable_id_arg);
/* Check that this is called the way it should be */
if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "Trigger not called by trigger manager");
hypertable_cache_entry *hci;
if (!TRIGGER_FIRED_BEFORE(trigdata->tg_event))
elog(ERROR, "Trigger should only fire before insert");
epoch_and_partitions_set *pe;
Cache *hypertable_cache;
ObjectAddress idx;
if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
tuple = trigdata->tg_newtuple;
else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
tuple = trigdata->tg_trigtuple;
else
elog(ERROR, "Unsupported event for trigger");
DropStmt *drop = makeNode(DropStmt);
if (insert_trigger_ctx == NULL)
{
/* This is the first row. Allocate a new insert context */
insert_trigger_ctx = insert_trigger_ctx_create(tuple, hypertable_id, trigdata->tg_relation->rd_id);
return PointerGetDatum(NULL);
}
/*
* Use the insert context's memory context so that the state we use
* survives across trigger invocations until the after trigger.
*/
oldctx = MemoryContextSwitchTo(tctx->mctx);
if (tctx->sort == NULL)
{
/*
* Multi-tuple case, i.e., we must sort the tuples. Initialize the
* sort state and put the first tuple that we saved from the last row
* trigger in the same batch.
*/
insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc);
insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc);
}
/* The rest of the tuples in the batch are put into the sort state here */
insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc);
MemoryContextSwitchTo(oldctx);
/* Return NULL since we do not want the tuple in the trigger's table */
return PointerGetDatum(NULL);
}
Datum
insert_root_table_trigger_after(PG_FUNCTION_ARGS)
{
TriggerData *trigdata = (TriggerData *) fcinfo->context;
int32 hypertable_id = atoi(trigdata->tg_trigger->tgargs[0]);
InsertTriggerCtx *tctx = insert_trigger_ctx;
char *insert_guard;
if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "not called by trigger manager");
if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event) &&
!TRIGGER_FIRED_BY_INSERT(trigdata->tg_event))
elog(ERROR, "Unsupported event for trigger");
/*
* --This guard protects against calling insert_data() twice in the same
@ -445,7 +629,7 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
* safe guard unfortunately prohibits transactions --involving INSERTs on
* two different hypertables.
*/
char *insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true);
insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true);
if (insert_guard != NULL && strcmp(insert_guard, "on") == 0)
{
@ -457,252 +641,76 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
/*
* get the hypertable cache; use the time column name to figure out the
* column fnum for time field
*/
hypertable_cache = hypertable_cache_pin();
hci = hypertable_cache_get_entry(hypertable_cache, hypertable_id);
/* TODO: hack assumes single pe. */
pe = hypertable_cache_get_partition_epoch(hypertable_cache, hci, 0, trigdata->tg_relation->rd_id);
/*
* create an index that colocates row from the same chunk together and
* guarantees an order on chunk access as well
*/
idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, pe);
scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId);
cache_release(hypertable_cache);
drop->removeType = OBJECT_INDEX;
drop->missing_ok = FALSE;
drop->objects = list_make1(list_make1(makeString("copy_insert")));
drop->arguments = NIL;
drop->behavior = DROP_RESTRICT;
drop->concurrent = false;
RemoveRelations(drop);
return PointerGetDatum(NULL);
}
/* Creates a temp table for INSERT and COPY commands. This table
* stores the data until it is distributed to the appropriate chunks.
* The copy table uses ON COMMIT DELETE ROWS and inherits from the root table.
* */
Oid
create_copy_table(int32 hypertable_id, Oid root_oid)
{
/*
* Inserting into a hypertable transformed into inserting into a "copy"
* temporary table that has a trigger which calls insert_data afterwords
*/
Oid copy_table_relid;
ObjectAddress copyTableRelationAddr;
StringInfo temp_table_name = makeStringInfo();
StringInfo hypertable_id_arg = makeStringInfo();
RangeVar *parent,
*rel;
CreateStmt *create;
CreateTrigStmt *createTrig;
appendStringInfo(temp_table_name, "_copy_temp_%d", hypertable_id);
appendStringInfo(hypertable_id_arg, "%d", hypertable_id);
parent = makeRangeVarFromRelid(root_oid);
rel = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1);
rel->relpersistence = RELPERSISTENCE_TEMP;
RangeVarGetAndCheckCreationNamespace(rel, NoLock, &copy_table_relid);
if (OidIsValid(copy_table_relid))
if (hypertable_id != tctx->hypertable->id)
{
return copy_table_relid;
elog(ERROR, "Mismatching hypertable IDs. Insert context in bad state!");
}
create = makeNode(CreateStmt);
/*
* Create the target relation by faking up a CREATE TABLE parsetree and
* passing it to DefineRelation.
*/
create->relation = rel;
create->tableElts = NIL;
create->inhRelations = list_make1(parent);
create->ofTypename = NULL;
create->constraints = NIL;
create->options = NIL;
create->oncommit = ONCOMMIT_DELETE_ROWS;
create->tablespacename = NULL;
create->if_not_exists = false;
copyTableRelationAddr = DefineRelation(create, RELKIND_RELATION, InvalidOid, NULL);
createTrig = makeNode(CreateTrigStmt);
createTrig->trigname = INSERT_TRIGGER_COPY_TABLE_NAME;
createTrig->relation = rel;
createTrig->funcname = list_make2(makeString(TIMESCALEDB_INTERNAL_SCHEMA), makeString(INSERT_TRIGGER_COPY_TABLE_FN));
createTrig->args = list_make1(makeString(hypertable_id_arg->data));
createTrig->row = false;
createTrig->timing = TRIGGER_TYPE_AFTER;
createTrig->events = TRIGGER_TYPE_INSERT;
createTrig->columns = NIL;
createTrig->whenClause = NULL;
createTrig->isconstraint = FALSE;
createTrig->deferrable = FALSE;
createTrig->initdeferred = FALSE;
createTrig->constrrel = NULL;
CreateTrigger(createTrig, NULL, copyTableRelationAddr.objectId, 0, 0, 0, false);
/* make trigger visible */
CommandCounterIncrement();
return copyTableRelationAddr.objectId;
}
static IndexElem *
makeIndexElem(char *name, Node *expr)
{
IndexElem *time_elem = makeNode(IndexElem);
Assert((name == NULL || expr == NULL) && (name != NULL || expr != NULL));
time_elem->name = name;
time_elem->expr = expr;
time_elem->indexcolname = NULL;
time_elem->collation = NIL;
time_elem->opclass = NIL;
time_elem->ordering = SORTBY_DEFAULT;
time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
return time_elem;
}
/* creates index for inserting in set chunk order.
*
* The index is the following:
* If there is a partitioning_func:
* partition_no, time, keyspace_value
* If there is no partitioning_func:
* time
*
* Partition_num is simply a unique number identifying the partition for the epoch the row belongs to.
* It is obtained by getting the maximal index in the end_time_partitions array such that the keyspace value
* is less than or equal to the value in the array.
*
* Keyspace_value without partition_num is not sufficient because:
* consider the partitions with keyspaces 0-5,6-10, and time partitions 100-200,201-300
* Then consider the following input:
* row 1: keyspace=0, time=100
* row 2: keyspace=2, time=250
* row 3: keyspace=4, time=100
* row 1 and 3 should be in the same chunk but they are now not together in the order (row 2 is between them).
*
* keyspace_value should probably be moved out of the index.
*
* */
static ObjectAddress
create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo * part_info, epoch_and_partitions_set * epoch)
{
IndexStmt *index_stmt = makeNode(IndexStmt);
IndexElem *time_elem;
Oid relid;
List *indexElem = NIL;
int i;
time_elem = makeIndexElem(time_field, NULL);
if (part_info != NULL)
if (tctx->sort == NULL)
{
IndexElem *partition_elem;
IndexElem *keyspace_elem;
List *array_pos_func_name = list_make2(makeString(CATALOG_SCHEMA_NAME), makeString(ARRAY_POSITION_LEAST_FN_NAME));
List *array_pos_args;
List *array_list = NIL;
A_ArrayExpr *array_expr;
FuncCall *array_pos_fc;
/* Single-tuple batch fast path */
Partition *part;
int64 timepoint;
TupleInfo ti = {
.desc = trigdata->tg_relation->rd_att,
.tuple = tctx->first_tuple,
};
for (i = 0; i < epoch->num_partitions; i++)
{
A_Const *end_time_const = makeNode(A_Const);
TypeCast *cast = makeNode(TypeCast);
/* We must set the time attno because it was not set on the fast path */
tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id,
tctx->hypertable->time_column_name);
part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx,
ti.tuple,
ti.desc,
&timepoint);
end_time_const->val = *makeInteger((int) epoch->partitions[i].keyspace_end);
end_time_const->location = -1;
cast->arg = (Node *) end_time_const;
cast->typeName = SystemTypeName("int2");
cast->location = -1;
array_list = lappend(array_list, cast);
}
array_expr = makeNode(A_ArrayExpr);
array_expr->elements = array_list;
array_expr->location = -1;
array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info));
array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1);
partition_elem = makeIndexElem(NULL, (Node *) array_pos_fc);
keyspace_elem = makeIndexElem(NULL, (Node *) get_keyspace_fn_call(part_info));
/* partition_number, time, keyspace */
/* can probably get rid of keyspace but later */
indexElem = list_make3(partition_elem, time_elem, keyspace_elem);
insert_tuple(insert_trigger_ctx, &ti, part->index, timepoint);
}
else
{
indexElem = list_make1(time_elem);
/* Multi-tuple batch slow path: sort the tuples */
TupleDesc tupdesc = tctx->expanded_tupdesc;
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupdesc);
bool doreplace[trigdata->tg_relation->rd_att->natts];
bool validslot;
Datum datum;
memset(doreplace, 0, sizeof(doreplace));
tuplesort_performsort(tctx->sort);
/* Loop over the sorted tuples and insert one by one */
validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL);
while (validslot)
{
bool isnull;
int partition_index;
int64 timepoint;
HeapTuple tuple = ExecFetchSlotTuple(slot);
TupleInfo ti = {
.desc = trigdata->tg_relation->rd_att,
/* Strip off the partition attribute from the tuple so that we
* do not add it to the chunk when we insert. */
.tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace),
};
datum = heap_getattr(tuple, tupdesc->natts, tupdesc, &isnull);
partition_index = DatumGetInt32(datum);
datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull);
timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type);
insert_tuple(insert_trigger_ctx, &ti, partition_index, timepoint);
ExecClearTuple(slot);
validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL);
}
tuplesort_end(tctx->sort);
}
index_stmt->idxname = "copy_insert";
index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1);
index_stmt->accessMethod = "btree";
index_stmt->indexParams = indexElem;
relid =
RangeVarGetRelidExtended(index_stmt->relation, ShareLock,
false, false,
RangeVarCallbackOwnsRelation,
NULL);
index_stmt = transformIndexStmt(relid, index_stmt, "");
return DefineIndex(relid, /* OID of heap relation */
index_stmt,
InvalidOid, /* no predefined OID */
false, /* is_alter_table */
true, /* check_rights */
false, /* skip_build */
false); /* quiet */
insert_trigger_ctx_free(tctx);
insert_trigger_ctx = NULL;
}
/* Helper function to create the FuncCall for calculating the keyspace_value. Used for
* creating the copy_insert index
*
*/
static Node *
get_keyspace_fn_call(PartitioningInfo * part_info)
{
ColumnRef *col_ref = makeNode(ColumnRef);
A_Const *mod_const;
List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name));
List *part_func_args;
col_ref->fields = list_make1(makeString(part_info->column));
col_ref->location = -1;
mod_const = makeNode(A_Const);
mod_const->val = *makeInteger(part_info->partfunc.modulos);
mod_const->location = -1;
part_func_args = list_make2(col_ref, mod_const);
return (Node *) makeFuncCall(part_func_name, part_func_args, -1);
return PointerGetDatum(NULL);
}

View File

@ -63,17 +63,34 @@ partitioning_info_create(int num_partitions,
}
int16
partitioning_func_apply(PartitioningFunc * pf, Datum value)
partitioning_func_apply(PartitioningInfo * pinfo, Datum value)
{
Datum text = FunctionCall1(&pf->textfunc_fmgr, value);
Datum text = FunctionCall1(&pinfo->partfunc.textfunc_fmgr, value);
char *partition_val = DatumGetCString(text);
Datum keyspace_datum = FunctionCall2(&pf->func_fmgr,
Datum keyspace_datum = FunctionCall2(&pinfo->partfunc.func_fmgr,
CStringGetTextDatum(partition_val),
Int32GetDatum(pf->modulos));
Int32GetDatum(pinfo->partfunc.modulos));
return DatumGetInt16(keyspace_datum);
}
int16
partitioning_func_apply_tuple(PartitioningInfo * pinfo, HeapTuple tuple, TupleDesc desc)
{
Datum value;
bool isnull;
value = heap_getattr(tuple, pinfo->column_attnum, desc, &isnull);
if (isnull)
{
return 0;
}
return partitioning_func_apply(pinfo, value);
}
/* Partition epoch index column numbers from sql/common/table.sql */
#define PE_IDX_COL_HTID 1
#define PE_IDX_COL_STARTTIME 2
@ -238,6 +255,7 @@ partition_tuple_found(TupleInfo * ti, void *arg)
pe->partitions[pctx->num_partitions].keyspace_start = DatumGetInt16(datum);
datum = heap_getattr(ti->tuple, PARTITION_TBL_COL_KEYSPACE_END, ti->desc, &is_null);
pe->partitions[pctx->num_partitions].keyspace_end = DatumGetInt16(datum);
pe->partitions[pctx->num_partitions].index = pctx->num_partitions;
/* Abort the scan if we have found all partitions */
if (pctx->num_partitions == 0)

View File

@ -5,6 +5,7 @@
#include <postgres.h>
#include <access/attnum.h>
#include <access/htup.h>
#include <fmgr.h>
#define OPEN_START_TIME -1
@ -13,6 +14,8 @@
typedef struct Partition
{
int32 id;
int32 index; /* Index of this partition in the epoch's
* partition array */
int16 keyspace_start;
int16 keyspace_end;
} Partition;
@ -58,10 +61,12 @@ typedef struct epoch_and_partitions_set
typedef struct epoch_and_partitions_set epoch_and_partitions_set;
epoch_and_partitions_set *partition_epoch_scan(int32 hypertable_id, int64 timepoint, Oid relid);
int16 partitioning_func_apply(PartitioningFunc * pf, Datum value);
int16 partitioning_func_apply(PartitioningInfo * pinfo, Datum value);
int16 partitioning_func_apply_tuple(PartitioningInfo * pinfo, HeapTuple tuple, TupleDesc desc);
Partition *partition_epoch_get_partition(epoch_and_partitions_set * epoch, int16 keyspace_pt);
void partition_epoch_free(epoch_and_partitions_set * epoch);
bool partition_keyspace_pt_is_member(const Partition * part, const int16 keyspace_pt);
#endif /* TIMESCALEDB_PARTITIONING_H */

View File

@ -59,96 +59,3 @@ get_partition_for_key(PG_FUNCTION_ARGS)
PG_RETURN_INT16(res);
}
/*
* array_position_least returns the highest position in the array such that the element
* in the array is <= searched element. If the array is of partition-keyspace-end values
* then this gives a unique bucket for each keyspace value.
*
* Arg 0: sorted array of smallint
* Arg 1: searched_element (smallint)
*/
PG_FUNCTION_INFO_V1(array_position_least);
Datum
array_position_least(PG_FUNCTION_ARGS)
{
ArrayType *array;
Datum searched_element,
value;
bool isnull;
int position;
ArrayMetaState *my_extra;
ArrayIterator array_iterator;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
{
elog(ERROR, "neither parameter should be null");
}
array = PG_GETARG_ARRAYTYPE_P(0);
if (INT2OID != ARR_ELEMTYPE(array))
{
elog(ERROR, "only support smallint arrays");
}
/* should be a single dimensioned array */
if (ARR_NDIM(array) > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can only work with single-dimension array")));
if (PG_ARGISNULL(1))
{
elog(ERROR, "does not expect null as searched-for element");
}
else
{
searched_element = PG_GETARG_DATUM(1);
}
position = (ARR_LBOUND(array))[0] - 1;
/*
* We arrange to look up type info for array_create_iterator only once per
* series of calls, assuming the element type doesn't change underneath
* us.
*/
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
if (my_extra == NULL)
{
fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
sizeof(ArrayMetaState));
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
get_typlenbyvalalign(INT2OID,
&my_extra->typlen,
&my_extra->typbyval,
&my_extra->typalign);
my_extra->element_type = INT2OID;
}
/* Examine each array element until the element is >= searched_element. */
array_iterator = array_create_iterator(array, 0, my_extra);
while (array_iterate(array_iterator, &value, &isnull))
{
position++;
if (isnull)
{
elog(ERROR, "No element in array should be null");
}
if (DatumGetInt16(value) >= DatumGetInt16(searched_element))
break;
}
array_free_iterator(array_iterator);
/* Avoid leaking memory when handed toasted input */
PG_FREE_IF_COPY(array, 0);
PG_RETURN_INT16(position);
}

View File

@ -250,7 +250,7 @@ change_table_name_walker(Node *node, void *context)
if (hinfo != NULL)
{
rangeTableEntry->relid = create_copy_table(hinfo->hypertable_id, hinfo->root_oid);
rangeTableEntry->relid = hinfo->root_oid;
}
}
return false;
@ -792,7 +792,7 @@ timescaledb_ProcessUtility(Node *parsetree,
if (hinfo != NULL)
{
copystmt->relation = makeRangeVarFromRelid(create_copy_table(hinfo->hypertable_id, hinfo->root_oid));
copystmt->relation = makeRangeVarFromRelid(hinfo->root_oid);
}
}
prev_ProcessUtility((Node *) copystmt, queryString, context, params, dest, completionTag);

View File

@ -120,8 +120,10 @@ Triggers:
sensor_2 | numeric | not null default 1 | main | |
sensor_3 | numeric | not null default 1 | main | |
sensor_4 | numeric | not null default 1 | main | |
Child tables: _copy_temp_1,
_timescaledb_internal._hyper_1_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
\d+ _timescaledb_internal._hyper_1_1_0_1_data
Table "_timescaledb_internal._hyper_1_1_0_1_data"
@ -208,8 +210,10 @@ Triggers:
temp_f | integer | not null default 31 | plain | |
sensor_3 | bigint | not null default 131 | plain | |
sensor_4 | bigint | not null default 131 | plain | |
Child tables: _copy_temp_1,
_timescaledb_internal._hyper_1_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
SELECT * FROM PUBLIC."Hypertable_1";
time | Device_id | humidity | sensor_1 | sensor_2_renamed | sensor_3_renamed | temp_f | sensor_3 | sensor_4

View File

@ -141,8 +141,10 @@ Triggers:
temp_f | integer | not null default 31 | plain | |
sensor_3 | bigint | not null default 131 | plain | |
sensor_4 | bigint | not null default 131 | plain | |
Child tables: _copy_temp_1,
_timescaledb_internal._hyper_1_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
\d+ _timescaledb_internal._hyper_1_1_0_1_data
Table "_timescaledb_internal._hyper_1_1_0_1_data"

View File

@ -1,4 +1,6 @@
\set ON_ERROR_STOP 1
\set VERBOSITY verbose
\set SHOW_CONTEXT never
\o /dev/null
\ir include/insert_two_partitions.sql
\set ON_ERROR_STOP 1
@ -278,6 +280,9 @@ Child tables: _timescaledb_internal._hyper_1_2_0_3_data
series_1 | double precision | | plain | |
series_2 | double precision | | plain | |
series_bool | boolean | | plain | |
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
Table "_timescaledb_internal._hyper_2_0_replica"
@ -352,6 +357,9 @@ Inherits: _timescaledb_internal._hyper_2_0_replica
time | bigint | | plain | |
metric | integer | | plain | |
device_id | text | | extended | |
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_2_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('2')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_2_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('2')
Child tables: _timescaledb_internal._hyper_2_0_replica
-- Test that renaming hypertable is blocked

View File

@ -309,8 +309,10 @@ Child tables: _timescaledb_internal._hyper_1_2_0_3_data
series_1 | double precision | | plain | |
series_2 | double precision | | plain | |
series_bool | boolean | | plain | |
Child tables: _copy_temp_1,
_timescaledb_internal._hyper_1_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
Table "_timescaledb_internal._hyper_2_0_replica"
Column | Type | Modifiers | Storage | Stats target | Description
@ -384,8 +386,10 @@ Inherits: _timescaledb_internal._hyper_2_0_replica
time | bigint | | plain | |
metric | integer | | plain | |
device_id | text | | extended | |
Child tables: _copy_temp_2,
_timescaledb_internal._hyper_2_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_2_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('2')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_2_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('2')
Child tables: _timescaledb_internal._hyper_2_0_replica
SELECT *
FROM "_timescaledb_internal"._hyper_1_0_replica;

View File

@ -238,6 +238,9 @@ Child tables: "testNs"._hyper_1_1_0_1_data,
series_1 | double precision | | plain | |
series_2 | double precision | | plain | |
series_bool | boolean | | plain | |
Triggers:
after_insert_trigger AFTER INSERT ON "testNs"._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON "testNs"._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: "testNs"._hyper_1_0_replica
SELECT * FROM "testNs";

View File

@ -112,17 +112,18 @@ Child tables: _timescaledb_internal._hyper_1_2_0_2_data
time | timestamp without time zone | | plain | |
temp | double precision | | plain | |
device_id | text | | extended | |
Child tables: _copy_temp_1,
_timescaledb_internal._hyper_1_0_replica
Triggers:
after_insert_trigger AFTER INSERT ON _timescaledb_internal._hyper_1_root FOR EACH STATEMENT EXECUTE PROCEDURE _timescaledb_internal.root_table_after_insert_trigger('1')
insert_trigger BEFORE INSERT ON _timescaledb_internal._hyper_1_root FOR EACH ROW EXECUTE PROCEDURE _timescaledb_internal.root_table_insert_trigger('1')
Child tables: _timescaledb_internal._hyper_1_0_replica
--cleanup
\set VERBOSITY default
drop table test_tspace;
NOTICE: drop cascades to 6 other objects
NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table _timescaledb_internal._hyper_1_0_replica
drop cascades to table _timescaledb_internal._hyper_1_1_0_partition
drop cascades to table _timescaledb_internal._hyper_1_1_0_1_data
drop cascades to table _timescaledb_internal._hyper_1_2_0_partition
drop cascades to table _timescaledb_internal._hyper_1_2_0_2_data
drop cascades to table _copy_temp_1
drop tablespace tspace1;

View File

@ -1,4 +1,7 @@
\set ON_ERROR_STOP 1
\set VERBOSITY verbose
\set SHOW_CONTEXT never
\o /dev/null
\ir include/insert_two_partitions.sql
\o