mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
Move trigger handling from PLPGSQL to C
Applying triggers to chunks requires taking the definition of a trigger on a hypertable and executing it on a chunk. Previously this was done with string replacement in the trigger definition. This was not especially safe, and thus we moved the logic to C where we can do proper parsing/deparsing and replacement of the table name. Another positive aspect is that we got rid of some DDL triggers.
This commit is contained in:
parent
238003316e
commit
51821b3709
1
Makefile
1
Makefile
@ -42,6 +42,7 @@ SRCS = \
|
||||
src/cache.c \
|
||||
src/cache_invalidate.c \
|
||||
src/process_utility.c \
|
||||
src/trigger.c \
|
||||
src/chunk.c \
|
||||
src/scanner.c \
|
||||
src/hypertable_cache.c \
|
||||
|
@ -100,11 +100,5 @@ BEGIN
|
||||
WHERE conrelid = main_table_oid
|
||||
AND _timescaledb_internal.need_chunk_constraint(oid);
|
||||
|
||||
PERFORM _timescaledb_internal.create_chunk_trigger(chunk_row.id, tgname,
|
||||
_timescaledb_internal.get_general_trigger_definition(oid))
|
||||
FROM pg_trigger
|
||||
WHERE tgrelid = main_table_oid
|
||||
AND _timescaledb_internal.need_chunk_trigger(chunk_row.hypertable_id, oid);
|
||||
|
||||
END
|
||||
$BODY$;
|
||||
|
@ -1,81 +0,0 @@
|
||||
-- Convert a general trigger definition to a create trigger sql command for a
|
||||
-- particular table and trigger name.
|
||||
-- static
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.get_trigger_definition_for_table(
|
||||
chunk_id INTEGER,
|
||||
general_definition TEXT
|
||||
)
|
||||
RETURNS TEXT LANGUAGE PLPGSQL AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
chunk_row _timescaledb_catalog.chunk;
|
||||
sql_code TEXT;
|
||||
BEGIN
|
||||
SELECT * INTO STRICT chunk_row FROM _timescaledb_catalog.chunk WHERE id = chunk_id;
|
||||
sql_code := replace(general_definition, '/*TABLE_NAME*/', format('%I.%I', chunk_row.schema_name, chunk_row.table_name));
|
||||
RETURN sql_code;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Creates a chunk_trigger_row.
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.create_chunk_trigger(
|
||||
chunk_id INTEGER,
|
||||
trigger_name NAME,
|
||||
def TEXT
|
||||
)
|
||||
RETURNS VOID LANGUAGE PLPGSQL AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
sql_code TEXT;
|
||||
BEGIN
|
||||
sql_code := _timescaledb_internal.get_trigger_definition_for_table(chunk_id, def);
|
||||
EXECUTE sql_code;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.drop_chunk_trigger(
|
||||
chunk_id INTEGER,
|
||||
trigger_name NAME
|
||||
)
|
||||
RETURNS VOID LANGUAGE PLPGSQL AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
chunk_row _timescaledb_catalog.chunk;
|
||||
BEGIN
|
||||
SELECT * INTO STRICT chunk_row FROM _timescaledb_catalog.chunk WHERE id = chunk_id;
|
||||
EXECUTE format($$ DROP TRIGGER IF EXISTS %I ON %I.%I $$, trigger_name, chunk_row.schema_name, chunk_row.table_name);
|
||||
END
|
||||
$BODY$
|
||||
SET client_min_messages = WARNING;
|
||||
|
||||
-- Creates a trigger on all chunk for a hypertable.
|
||||
-- static
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.create_trigger_on_all_chunks(
|
||||
hypertable_id INTEGER,
|
||||
trigger_name NAME,
|
||||
definition TEXT
|
||||
)
|
||||
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
BEGIN
|
||||
PERFORM _timescaledb_internal.create_chunk_trigger(c.id, trigger_name, definition)
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
WHERE c.hypertable_id = create_trigger_on_all_chunks.hypertable_id;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Drops trigger on all chunks for a hypertable.
|
||||
-- static
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.drop_trigger_on_all_chunks(
|
||||
hypertable_id INTEGER,
|
||||
trigger_name NAME
|
||||
)
|
||||
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
|
||||
$BODY$
|
||||
BEGIN
|
||||
PERFORM _timescaledb_internal.drop_chunk_trigger(c.id, trigger_name)
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
WHERE c.hypertable_id = drop_trigger_on_all_chunks.hypertable_id;
|
||||
END
|
||||
$BODY$;
|
@ -153,15 +153,6 @@ BEGIN
|
||||
WHERE indrelid = main_table AND _timescaledb_internal.need_chunk_index(hypertable_row.id, pg_index.indexrelid)
|
||||
ORDER BY pg_index.indexrelid;
|
||||
|
||||
PERFORM 1
|
||||
FROM pg_trigger,
|
||||
LATERAL _timescaledb_internal.add_trigger(
|
||||
hypertable_row.id,
|
||||
oid
|
||||
)
|
||||
WHERE tgrelid = main_table
|
||||
AND _timescaledb_internal.need_chunk_trigger(hypertable_row.id, oid);
|
||||
|
||||
IF create_default_indexes THEN
|
||||
PERFORM _timescaledb_internal.create_default_indexes(hypertable_row, main_table, partitioning_column);
|
||||
END IF;
|
||||
|
@ -198,39 +198,6 @@ INSERT INTO _timescaledb_catalog.hypertable_index (hypertable_id, main_schema_na
|
||||
VALUES (hypertable_id, main_schema_name, main_index_name, definition);
|
||||
$BODY$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.trigger_is_row_trigger(tgtype int2) RETURNS BOOLEAN
|
||||
AS '$libdir/timescaledb', 'trigger_is_row_trigger' LANGUAGE C IMMUTABLE STRICT;
|
||||
|
||||
-- do I need to add a hypertable trigger to the chunks?
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.need_chunk_trigger(
|
||||
hypertable_id INTEGER,
|
||||
trigger_oid OID
|
||||
)
|
||||
RETURNS BOOLEAN LANGUAGE SQL STABLE AS
|
||||
$BODY$
|
||||
-- row trigger and not an internal trigger used for constraints
|
||||
SELECT _timescaledb_internal.trigger_is_row_trigger(t.tgtype) AND NOT t.tgisinternal FROM pg_trigger t WHERE OID = trigger_oid;
|
||||
$BODY$;
|
||||
|
||||
|
||||
-- Add a trigger to a hypertable
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.add_trigger(
|
||||
hypertable_id INTEGER,
|
||||
trigger_oid OID
|
||||
)
|
||||
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
trigger_row record;
|
||||
BEGIN
|
||||
IF _timescaledb_internal.need_chunk_trigger(hypertable_id, trigger_oid) THEN
|
||||
SELECT * INTO STRICT trigger_row FROM pg_trigger WHERE OID = trigger_oid;
|
||||
PERFORM _timescaledb_internal.create_trigger_on_all_chunks(hypertable_id, trigger_row.tgname,
|
||||
_timescaledb_internal.get_general_trigger_definition(trigger_oid));
|
||||
END IF;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Drops the index for a hypertable
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.drop_index(
|
||||
main_schema_name NAME,
|
||||
@ -379,42 +346,6 @@ END
|
||||
$BODY$;
|
||||
|
||||
|
||||
|
||||
-- Create the "general definition" of a trigger. The general definition
|
||||
-- is the corresponding create trigger command with the placeholders /*TABLE_NAME*/
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.get_general_trigger_definition(
|
||||
trigger_oid REGCLASS
|
||||
)
|
||||
RETURNS text
|
||||
LANGUAGE plpgsql VOLATILE AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
def TEXT;
|
||||
c INTEGER;
|
||||
trigger_row RECORD;
|
||||
BEGIN
|
||||
-- Get trigger definition
|
||||
def := pg_get_triggerdef(trigger_oid);
|
||||
|
||||
IF def IS NULL THEN
|
||||
RAISE EXCEPTION 'Cannot process trigger with no definition: %', trigger_oid::TEXT;
|
||||
END IF;
|
||||
|
||||
SELECT * INTO STRICT trigger_row FROM pg_trigger WHERE oid = trigger_oid;
|
||||
|
||||
SELECT count(*) INTO c
|
||||
FROM regexp_matches(def, 'ON '|| trigger_row.tgrelid::regclass::TEXT, 'g');
|
||||
IF c <> 1 THEN
|
||||
RAISE EXCEPTION 'Cannot process trigger with definition(no table name match: %): %', trigger_row.tgrelid::regclass::TEXT, def
|
||||
USING ERRCODE = 'IO103';
|
||||
END IF;
|
||||
|
||||
def := replace(def, 'ON '|| trigger_row.tgrelid::regclass::TEXT, 'ON /*TABLE_NAME*/');
|
||||
RETURN def;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
|
||||
-- Creates the default indexes on a hypertable.
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.create_default_indexes(
|
||||
hypertable_row _timescaledb_catalog.hypertable,
|
||||
|
@ -46,60 +46,6 @@ BEGIN
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Handles ddl create trigger commands on hypertables
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.ddl_process_create_trigger()
|
||||
RETURNS event_trigger LANGUAGE plpgsql
|
||||
SECURITY DEFINER SET search_path = ''
|
||||
AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
info record;
|
||||
table_oid regclass;
|
||||
index_oid OID;
|
||||
trigger_name TEXT;
|
||||
hypertable_row _timescaledb_catalog.hypertable;
|
||||
BEGIN
|
||||
--NOTE: pg_event_trigger_ddl_commands prevents this SECURITY DEFINER function from being called outside trigger.
|
||||
FOR info IN SELECT * FROM pg_event_trigger_ddl_commands()
|
||||
LOOP
|
||||
SELECT OID, tgrelid, tgname INTO STRICT index_oid, table_oid, trigger_name
|
||||
FROM pg_catalog.pg_trigger
|
||||
WHERE oid = info.objid;
|
||||
|
||||
IF _timescaledb_internal.is_main_table(table_oid) THEN
|
||||
hypertable_row := _timescaledb_internal.hypertable_from_main_table(table_oid);
|
||||
PERFORM _timescaledb_internal.add_trigger(hypertable_row.id, index_oid);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Handles ddl drop index commands on hypertables
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.ddl_process_drop_trigger()
|
||||
RETURNS event_trigger LANGUAGE plpgsql
|
||||
SECURITY DEFINER SET search_path = ''
|
||||
AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
info record;
|
||||
hypertable_row _timescaledb_catalog.hypertable;
|
||||
BEGIN
|
||||
--NOTE: pg_event_trigger_ddl_commands prevents this SECURITY DEFINER function from being called outside trigger.
|
||||
FOR info IN SELECT * FROM pg_event_trigger_dropped_objects()
|
||||
LOOP
|
||||
IF info.classid = 'pg_trigger'::regclass THEN
|
||||
SELECT * INTO hypertable_row
|
||||
FROM _timescaledb_catalog.hypertable
|
||||
WHERE schema_name = info.address_names[1] AND table_name = info.address_names[2];
|
||||
|
||||
IF hypertable_row IS NOT NULL THEN
|
||||
PERFORM _timescaledb_internal.drop_trigger_on_all_chunks(hypertable_row.id, info.address_names[3]);
|
||||
END IF;
|
||||
END IF;
|
||||
END LOOP;
|
||||
END
|
||||
$BODY$;
|
||||
|
||||
-- Handles ddl alter index commands on hypertables
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.ddl_process_alter_index()
|
||||
RETURNS event_trigger LANGUAGE plpgsql
|
||||
|
@ -3,7 +3,6 @@ sql/chunk.sql
|
||||
sql/ddl_internal.sql
|
||||
sql/util_time.sql
|
||||
sql/util_internal_table_ddl.sql
|
||||
sql/chunk_trigger.sql
|
||||
sql/chunk_constraint.sql
|
||||
sql/hypertable_triggers.sql
|
||||
sql/hypertable_index_triggers.sql
|
||||
|
@ -53,20 +53,10 @@ BEGIN
|
||||
WHEN tag IN ('drop index')
|
||||
EXECUTE PROCEDURE _timescaledb_internal.ddl_process_drop_index();
|
||||
|
||||
CREATE EVENT TRIGGER ddl_create_trigger ON ddl_command_end
|
||||
WHEN tag IN ('create trigger')
|
||||
EXECUTE PROCEDURE _timescaledb_internal.ddl_process_create_trigger();
|
||||
|
||||
CREATE EVENT TRIGGER ddl_drop_trigger
|
||||
ON sql_drop
|
||||
EXECUTE PROCEDURE _timescaledb_internal.ddl_process_drop_trigger();
|
||||
|
||||
IF restore THEN
|
||||
ALTER EXTENSION timescaledb ADD EVENT TRIGGER ddl_create_index;
|
||||
ALTER EXTENSION timescaledb ADD EVENT TRIGGER ddl_alter_index;
|
||||
ALTER EXTENSION timescaledb ADD EVENT TRIGGER ddl_drop_index;
|
||||
ALTER EXTENSION timescaledb ADD EVENT TRIGGER ddl_create_trigger;
|
||||
ALTER EXTENSION timescaledb ADD EVENT TRIGGER ddl_drop_trigger;
|
||||
END IF;
|
||||
|
||||
END
|
||||
|
@ -54,3 +54,17 @@ DROP FUNCTION _timescaledb_internal.chunk_create_table(int);
|
||||
DROP FUNCTION _timescaledb_internal.ddl_process_drop_table();
|
||||
DROP FUNCTION _timescaledb_internal.on_change_chunk();
|
||||
DROP FUNCTION _timescaledb_internal.drop_hypertable(name, name);
|
||||
|
||||
DROP EVENT TRIGGER ddl_create_trigger;
|
||||
DROP EVENT TRIGGER ddl_drop_trigger;
|
||||
|
||||
DROP FUNCTION _timescaledb_internal.add_trigger(int, oid);
|
||||
DROP FUNCTION _timescaledb_internal.create_chunk_trigger(int, name, text);
|
||||
DROP FUNCTION _timescaledb_internal.create_trigger_on_all_chunks(int, name, text);
|
||||
DROP FUNCTION _timescaledb_internal.ddl_process_create_trigger();
|
||||
DROP FUNCTION _timescaledb_internal.ddl_process_drop_trigger();
|
||||
DROP FUNCTION _timescaledb_internal.drop_chunk_trigger(int, name);
|
||||
DROP FUNCTION _timescaledb_internal.drop_trigger_on_all_chunks(INTEGER, NAME);
|
||||
DROP FUNCTION _timescaledb_internal.get_general_trigger_definition(regclass);
|
||||
DROP FUNCTION _timescaledb_internal.get_trigger_definition_for_table(INTEGER, text);
|
||||
DROP FUNCTION _timescaledb_internal.need_chunk_trigger(int, oid);
|
||||
|
18
src/chunk.c
18
src/chunk.c
@ -1,5 +1,12 @@
|
||||
#include <postgres.h>
|
||||
#include <catalog/namespace.h>
|
||||
#include <catalog/pg_trigger.h>
|
||||
#include <catalog/indexing.h>
|
||||
#include <commands/trigger.h>
|
||||
#include <tcop/tcopprot.h>
|
||||
#include <access/htup.h>
|
||||
#include <access/htup_details.h>
|
||||
#include <access/xact.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/hsearch.h>
|
||||
@ -15,6 +22,7 @@
|
||||
#include "metadata_queries.h"
|
||||
#include "scanner.h"
|
||||
#include "process_utility.h"
|
||||
#include "trigger.h"
|
||||
|
||||
typedef bool (*on_chunk_func) (ChunkScanCtx *ctx, Chunk *chunk);
|
||||
|
||||
@ -260,9 +268,10 @@ chunk_collision_resolve(Hyperspace *hs, Hypercube *cube, Point *p)
|
||||
}
|
||||
|
||||
static Chunk *
|
||||
chunk_create_after_lock(Hyperspace *hs, Point *p, const char *schema, const char *prefix)
|
||||
chunk_create_after_lock(Hypertable *ht, Point *p, const char *schema, const char *prefix)
|
||||
{
|
||||
Oid schema_oid = get_namespace_oid(schema, false);
|
||||
Hyperspace *hs = ht->space;
|
||||
Catalog *catalog = catalog_get();
|
||||
CatalogSecurityContext sec_ctx;
|
||||
Hypercube *cube;
|
||||
@ -306,15 +315,18 @@ chunk_create_after_lock(Hyperspace *hs, Point *p, const char *schema, const char
|
||||
|
||||
chunk->table_id = get_relname_relid(NameStr(chunk->fd.table_name), schema_oid);
|
||||
|
||||
trigger_create_on_all_chunks(ht, chunk);
|
||||
|
||||
catalog_restore_user(&sec_ctx);
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
Chunk *
|
||||
chunk_create(Hyperspace *hs, Point *p, const char *schema, const char *prefix)
|
||||
chunk_create(Hypertable *ht, Point *p, const char *schema, const char *prefix)
|
||||
{
|
||||
Catalog *catalog = catalog_get();
|
||||
Hyperspace *hs = ht->space;
|
||||
Chunk *chunk;
|
||||
Relation rel;
|
||||
|
||||
@ -324,7 +336,7 @@ chunk_create(Hyperspace *hs, Point *p, const char *schema, const char *prefix)
|
||||
chunk = chunk_find(hs, p);
|
||||
|
||||
if (NULL == chunk)
|
||||
chunk = chunk_create_after_lock(hs, p, schema, prefix);
|
||||
chunk = chunk_create_after_lock(ht, p, schema, prefix);
|
||||
|
||||
heap_close(rel, ExclusiveLock);
|
||||
|
||||
|
@ -8,6 +8,7 @@
|
||||
|
||||
#include "catalog.h"
|
||||
#include "chunk_constraint.h"
|
||||
#include "hypertable.h"
|
||||
|
||||
typedef struct Hypercube Hypercube;
|
||||
typedef struct Point Point;
|
||||
@ -64,7 +65,7 @@ typedef struct ChunkScanEntry
|
||||
} ChunkScanEntry;
|
||||
|
||||
extern Chunk *chunk_create_from_tuple(HeapTuple tuple, int16 num_constraints);
|
||||
extern Chunk *chunk_create(Hyperspace *hs, Point *p, const char *schema, const char *prefix);
|
||||
extern Chunk *chunk_create(Hypertable *ht, Point *p, const char *schema, const char *prefix);
|
||||
extern Chunk *chunk_create_stub(int32 id, int16 num_constraints);
|
||||
extern bool chunk_add_constraint(Chunk *chunk, ChunkConstraint *constraint);
|
||||
extern bool chunk_add_constraint_from_tuple(Chunk *chunk, HeapTuple constraint_tuple);
|
||||
|
@ -44,7 +44,7 @@ hypertable_get_chunk(Hypertable *h, Point *point)
|
||||
chunk = chunk_find(h->space, point);
|
||||
|
||||
if (NULL == chunk)
|
||||
chunk = chunk_create(h->space, point,
|
||||
chunk = chunk_create(h, point,
|
||||
NameStr(h->fd.associated_schema_name),
|
||||
NameStr(h->fd.associated_table_prefix));
|
||||
|
||||
|
@ -5,10 +5,14 @@
|
||||
#include <catalog/pg_inherits_fn.h>
|
||||
#include <catalog/index.h>
|
||||
#include <catalog/objectaddress.h>
|
||||
#include <catalog/pg_trigger.h>
|
||||
#include <commands/copy.h>
|
||||
#include <commands/vacuum.h>
|
||||
#include <commands/defrem.h>
|
||||
#include <commands/trigger.h>
|
||||
#include <commands/tablecmds.h>
|
||||
#include <access/htup_details.h>
|
||||
#include <access/xact.h>
|
||||
#include <utils/rel.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/builtins.h>
|
||||
@ -24,6 +28,7 @@
|
||||
#include "copy.h"
|
||||
#include "chunk.h"
|
||||
#include "guc.h"
|
||||
#include "trigger.h"
|
||||
|
||||
void _process_utility_init(void);
|
||||
void _process_utility_fini(void);
|
||||
@ -185,9 +190,8 @@ typedef void (*process_chunk_t) (Oid chunk_relid, void *arg);
|
||||
* hypertable.
|
||||
*/
|
||||
static int
|
||||
foreach_chunk(RangeVar *rv, process_chunk_t process_chunk, void *arg)
|
||||
foreach_chunk_relid(Oid relid, process_chunk_t process_chunk, void *arg)
|
||||
{
|
||||
Oid relid = hypertable_relid(rv);
|
||||
List *chunks;
|
||||
ListCell *lc;
|
||||
int n = 0;
|
||||
@ -207,6 +211,14 @@ foreach_chunk(RangeVar *rv, process_chunk_t process_chunk, void *arg)
|
||||
return n;
|
||||
}
|
||||
|
||||
static int
|
||||
foreach_chunk(RangeVar *rv, process_chunk_t process_chunk, void *arg)
|
||||
{
|
||||
Oid relid = hypertable_relid(rv);
|
||||
|
||||
return foreach_chunk_relid(relid, process_chunk, arg);
|
||||
}
|
||||
|
||||
typedef struct VacuumCtx
|
||||
{
|
||||
VacuumStmt *stmt;
|
||||
@ -271,18 +283,12 @@ reindex_chunk(Oid chunk_relid, void *arg)
|
||||
}
|
||||
|
||||
static bool
|
||||
process_drop(Node *parsetree)
|
||||
process_drop_table(DropStmt *stmt)
|
||||
{
|
||||
DropStmt *stmt = (DropStmt *) parsetree;
|
||||
ListCell *cell1;
|
||||
Cache *hcache = NULL;
|
||||
bool handled = false;
|
||||
|
||||
if (stmt->removeType != OBJECT_TABLE)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
hcache = hypertable_cache_pin();
|
||||
|
||||
foreach(cell1, stmt->objects)
|
||||
@ -321,6 +327,78 @@ process_drop(Node *parsetree)
|
||||
return handled;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
drop_trigger_chunk(Oid chunk_relid, void *arg)
|
||||
{
|
||||
const char *trigger_name = (const char *) arg;
|
||||
Oid trigger_oid = get_trigger_oid(chunk_relid, trigger_name, false);
|
||||
|
||||
RemoveTriggerById(trigger_oid);
|
||||
}
|
||||
|
||||
static bool
|
||||
process_drop_trigger(DropStmt *stmt)
|
||||
{
|
||||
ListCell *cell1;
|
||||
bool handled = false;
|
||||
|
||||
foreach(cell1, stmt->objects)
|
||||
{
|
||||
List *objname = lfirst(cell1);
|
||||
List *objargs = NIL;
|
||||
Relation relation = NULL;
|
||||
Form_pg_trigger trigger;
|
||||
|
||||
ObjectAddress address;
|
||||
|
||||
address = get_object_address(stmt->removeType,
|
||||
objname, objargs,
|
||||
&relation,
|
||||
AccessExclusiveLock,
|
||||
stmt->missing_ok);
|
||||
|
||||
if (!OidIsValid(address.objectId))
|
||||
{
|
||||
Assert(stmt->missing_ok);
|
||||
continue;
|
||||
}
|
||||
|
||||
trigger = trigger_by_oid(address.objectId, stmt->missing_ok);
|
||||
if (trigger_is_chunk_trigger(trigger))
|
||||
{
|
||||
if (is_hypertable(relation->rd_id))
|
||||
{
|
||||
foreach_chunk_relid(relation->rd_id, drop_trigger_chunk, trigger_name(trigger));
|
||||
handled = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (relation != NULL)
|
||||
heap_close(relation, AccessShareLock);
|
||||
}
|
||||
|
||||
return handled;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static bool
|
||||
process_drop(Node *parsetree)
|
||||
{
|
||||
DropStmt *stmt = (DropStmt *) parsetree;
|
||||
|
||||
switch (stmt->removeType)
|
||||
{
|
||||
case OBJECT_TABLE:
|
||||
return process_drop_table(stmt);
|
||||
case OBJECT_TRIGGER:
|
||||
return process_drop_trigger(stmt);
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Reindex a hypertable and all its chunks. Currently works only for REINDEX
|
||||
* TABLE.
|
||||
@ -662,6 +740,47 @@ process_altertable(Node *parsetree)
|
||||
cache_release(hcache);
|
||||
}
|
||||
|
||||
typedef struct CreateIndexCtx
|
||||
{
|
||||
CreateTrigStmt *stmt;
|
||||
const char *queryString;
|
||||
Oid hypertable_relid;
|
||||
} CreateIndexCtx;
|
||||
|
||||
static void
|
||||
create_trigger_chunk(Oid chunk_relid, void *arg)
|
||||
{
|
||||
CreateIndexCtx *ctx = (CreateIndexCtx *) arg;
|
||||
CreateTrigStmt *stmt = ctx->stmt;
|
||||
|
||||
Oid trigger_oid = get_trigger_oid(ctx->hypertable_relid, stmt->trigname, false);
|
||||
|
||||
char *relschema = get_namespace_name(get_rel_namespace(chunk_relid));
|
||||
char *relname = get_rel_name(chunk_relid);
|
||||
|
||||
trigger_create_on_chunk(trigger_oid, relschema, relname);
|
||||
}
|
||||
|
||||
static void
|
||||
process_create_trigger(CreateTrigStmt *stmt, const char *query_string)
|
||||
{
|
||||
Oid relid = RangeVarGetRelid(stmt->relation, NoLock, true);
|
||||
CreateIndexCtx ctx = {
|
||||
.stmt = stmt,
|
||||
.queryString = query_string,
|
||||
.hypertable_relid = relid
|
||||
};
|
||||
|
||||
if (!stmt->row || !OidIsValid(relid))
|
||||
return;
|
||||
|
||||
if (is_hypertable(relid))
|
||||
{
|
||||
CommandCounterIncrement(); /* allow following code to see
|
||||
* inserted hypertable trigger */
|
||||
foreach_chunk(stmt->relation, create_trigger_chunk, &ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/* Hook-intercept for ProcessUtility. */
|
||||
static void
|
||||
@ -714,6 +833,11 @@ timescaledb_ProcessUtility(Node *parsetree,
|
||||
if (process_vacuum(parsetree, context))
|
||||
return;
|
||||
break;
|
||||
case T_CreateTrigStmt:
|
||||
/* Process on main table then add to otherrs */
|
||||
prev_ProcessUtility(parsetree, query_string, context, params, dest, completion_tag);
|
||||
process_create_trigger((CreateTrigStmt *) parsetree, query_string);
|
||||
return;
|
||||
case T_ReindexStmt:
|
||||
if (process_reindex(parsetree))
|
||||
return;
|
||||
|
112
src/trigger.c
Normal file
112
src/trigger.c
Normal file
@ -0,0 +1,112 @@
|
||||
#include "trigger.h"
|
||||
|
||||
#include <access/htup_details.h>
|
||||
#include <access/heapam.h>
|
||||
#include <utils/relcache.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/fmgroids.h>
|
||||
#include <tcop/tcopprot.h>
|
||||
#include <catalog/objectaddress.h>
|
||||
#include <catalog/indexing.h>
|
||||
#include <commands/trigger.h>
|
||||
#include <access/xact.h>
|
||||
#include <fmgr.h>
|
||||
|
||||
|
||||
Form_pg_trigger
|
||||
trigger_by_oid(Oid trigger_oid, bool missing_ok)
|
||||
{
|
||||
Relation trigRel;
|
||||
HeapTuple tup;
|
||||
Form_pg_trigger trig = NULL;
|
||||
|
||||
trigRel = heap_open(TriggerRelationId, AccessShareLock);
|
||||
|
||||
tup = get_catalog_object_by_oid(trigRel, trigger_oid);
|
||||
|
||||
if (!HeapTupleIsValid(tup))
|
||||
{
|
||||
if (!missing_ok)
|
||||
elog(ERROR, "could not find tuple for trigger %u",
|
||||
trigger_oid);
|
||||
}
|
||||
else
|
||||
{
|
||||
trig = (Form_pg_trigger) GETSTRUCT(tup);
|
||||
}
|
||||
|
||||
heap_close(trigRel, AccessShareLock);
|
||||
return trig;
|
||||
}
|
||||
|
||||
|
||||
bool
|
||||
trigger_is_chunk_trigger(const Form_pg_trigger trigger)
|
||||
{
|
||||
return trigger != NULL && TRIGGER_FOR_ROW(trigger->tgtype) && !trigger->tgisinternal;
|
||||
}
|
||||
|
||||
char *
|
||||
trigger_name(const Form_pg_trigger trigger)
|
||||
{
|
||||
return NameStr(trigger->tgname);
|
||||
}
|
||||
|
||||
/* all creation of triggers on chunks should go through this. Strictly speaking,
|
||||
* this deparsing is not necessary in all cases, but this keeps things consistent. */
|
||||
void
|
||||
trigger_create_on_chunk(Oid trigger_oid, char *chunk_schema_name, char *chunk_table_name)
|
||||
{
|
||||
Datum datum_def = DirectFunctionCall1(pg_get_triggerdef, ObjectIdGetDatum(trigger_oid));
|
||||
const char *def = TextDatumGetCString(datum_def);
|
||||
List *deparsed_list;
|
||||
Node *deparsed_node;
|
||||
CreateTrigStmt *stmt;
|
||||
|
||||
deparsed_list = pg_parse_query(def);
|
||||
Assert(list_length(deparsed_list) == 1);
|
||||
deparsed_node = linitial(deparsed_list);
|
||||
Assert(IsA(deparsed_node, CreateTrigStmt));
|
||||
stmt = (CreateTrigStmt *) deparsed_node;
|
||||
|
||||
stmt->relation->relname = chunk_table_name;
|
||||
stmt->relation->schemaname = chunk_schema_name;
|
||||
|
||||
CreateTrigger(stmt, def, InvalidOid, InvalidOid,
|
||||
InvalidOid, InvalidOid, false);
|
||||
|
||||
CommandCounterIncrement(); /* needed to prevent pg_class being updated
|
||||
* twice */
|
||||
}
|
||||
|
||||
|
||||
|
||||
void
|
||||
trigger_create_on_all_chunks(Hypertable *ht, Chunk *chunk)
|
||||
{
|
||||
ScanKeyData skey;
|
||||
Relation tgrel;
|
||||
SysScanDesc tgscan;
|
||||
HeapTuple htup;
|
||||
|
||||
ScanKeyInit(&skey,
|
||||
Anum_pg_trigger_tgrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ht->main_table_relid);
|
||||
|
||||
tgrel = heap_open(TriggerRelationId, AccessShareLock);
|
||||
tgscan = systable_beginscan(tgrel, TriggerRelidNameIndexId, true,
|
||||
NULL, 1, &skey);
|
||||
|
||||
while (HeapTupleIsValid(htup = systable_getnext(tgscan)))
|
||||
{
|
||||
Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
|
||||
Oid trigger_oid = HeapTupleGetOid(htup);
|
||||
|
||||
if (trigger_is_chunk_trigger(pg_trigger))
|
||||
{
|
||||
trigger_create_on_chunk(trigger_oid, NameStr(chunk->fd.schema_name), NameStr(chunk->fd.table_name));
|
||||
}
|
||||
}
|
||||
systable_endscan(tgscan);
|
||||
heap_close(tgrel, AccessShareLock);
|
||||
}
|
16
src/trigger.h
Normal file
16
src/trigger.h
Normal file
@ -0,0 +1,16 @@
|
||||
#ifndef TIMESCALEDB_TRIGGER_H
|
||||
#define TIMESCALEDB_TRIGGER_H
|
||||
|
||||
#include <postgres.h>
|
||||
#include <catalog/pg_trigger.h>
|
||||
#include "hypertable.h"
|
||||
#include "chunk.h"
|
||||
|
||||
extern Form_pg_trigger trigger_by_oid(Oid trigger_oid, bool missing_ok);
|
||||
extern bool trigger_is_chunk_trigger(const Form_pg_trigger trigger);
|
||||
extern char *trigger_name(const Form_pg_trigger trigger);
|
||||
|
||||
void trigger_create_on_chunk(Oid trigger_oid, char *chunk_schema_name, char *chunk_table_name);
|
||||
void trigger_create_on_all_chunks(Hypertable *ht, Chunk *chunk);
|
||||
|
||||
#endif /* TIMESCALEDB_TRIGGER_H */
|
10
src/utils.c
10
src/utils.c
@ -356,13 +356,3 @@ date_bucket(PG_FUNCTION_ARGS)
|
||||
bucketed = DirectFunctionCall2(timestamp_bucket, PG_GETARG_DATUM(0), converted_ts);
|
||||
return DirectFunctionCall1(timestamp_date, bucketed);
|
||||
}
|
||||
|
||||
PG_FUNCTION_INFO_V1(trigger_is_row_trigger);
|
||||
|
||||
Datum
|
||||
trigger_is_row_trigger(PG_FUNCTION_ARGS)
|
||||
{
|
||||
int16 tgtype = PG_GETARG_INT16(0);
|
||||
|
||||
PG_RETURN_BOOL(TRIGGER_FOR_ROW(tgtype));
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ SELECT count(*)
|
||||
AND refobjid = (SELECT oid FROM pg_extension WHERE extname = 'timescaledb');
|
||||
count
|
||||
-------
|
||||
132
|
||||
119
|
||||
(1 row)
|
||||
|
||||
\c postgres
|
||||
@ -67,7 +67,7 @@ SELECT count(*)
|
||||
AND refobjid = (SELECT oid FROM pg_extension WHERE extname = 'timescaledb');
|
||||
count
|
||||
-------
|
||||
132
|
||||
119
|
||||
(1 row)
|
||||
|
||||
\c single
|
||||
|
@ -45,6 +45,11 @@ CREATE TRIGGER z_test_trigger_all
|
||||
CREATE TRIGGER _0_test_trigger_insert_after
|
||||
AFTER INSERT ON hyper
|
||||
FOR EACH ROW EXECUTE PROCEDURE test_trigger();
|
||||
CREATE TRIGGER _0_test_trigger_insert_after_when_dev1
|
||||
AFTER INSERT ON hyper
|
||||
FOR EACH ROW
|
||||
WHEN (NEW.device_id = 'dev1')
|
||||
EXECUTE PROCEDURE test_trigger();
|
||||
CREATE TRIGGER _0_test_trigger_update_after
|
||||
AFTER UPDATE ON hyper
|
||||
FOR EACH ROW EXECUTE PROCEDURE test_trigger();
|
||||
@ -87,6 +92,7 @@ WARNING: FIRING trigger when: BEFORE level: STATEMENT op: INSERT cnt: 0 trigger
|
||||
WARNING: FIRING trigger when: BEFORE level: ROW op: INSERT cnt: 0 trigger_name _0_test_trigger_insert
|
||||
WARNING: FIRING trigger when: BEFORE level: ROW op: INSERT cnt: 0 trigger_name z_test_trigger_all
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_after
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_after_when_dev1
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name z_test_trigger_all_after
|
||||
WARNING: FIRING trigger when: AFTER level: STATEMENT op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_s_after
|
||||
INSERT INTO hyper(time, device_id,sensor_1) VALUES
|
||||
@ -139,6 +145,7 @@ DROP TRIGGER _0_test_trigger_insert_s_after ON hyper;
|
||||
INSERT INTO hyper(time, device_id,sensor_1) VALUES
|
||||
(1257987600000000000, 'dev1', 1);
|
||||
WARNING: FIRING trigger when: BEFORE level: ROW op: INSERT cnt: 0 trigger_name z_test_trigger_all
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_after_when_dev1
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name z_test_trigger_all_after
|
||||
INSERT INTO hyper(time, device_id,sensor_1) VALUES
|
||||
(1257987700000000000, 'dev2', 1), (1257987800000000000, 'dev2', 1);
|
||||
@ -223,6 +230,7 @@ WARNING: FIRING trigger when: BEFORE level: STATEMENT op: INSERT cnt: 0 trigger
|
||||
WARNING: FIRING trigger when: BEFORE level: ROW op: INSERT cnt: 0 trigger_name _0_test_trigger_insert
|
||||
WARNING: FIRING trigger when: BEFORE level: ROW op: INSERT cnt: 0 trigger_name z_test_trigger_all
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_after
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_after_when_dev1
|
||||
WARNING: FIRING trigger when: AFTER level: ROW op: INSERT cnt: 1 trigger_name z_test_trigger_all_after
|
||||
WARNING: FIRING trigger when: AFTER level: STATEMENT op: INSERT cnt: 1 trigger_name _0_test_trigger_insert_s_after
|
||||
INSERT INTO hyper(time, device_id,sensor_1) VALUES
|
||||
|
@ -46,6 +46,12 @@ CREATE TRIGGER _0_test_trigger_insert_after
|
||||
AFTER INSERT ON hyper
|
||||
FOR EACH ROW EXECUTE PROCEDURE test_trigger();
|
||||
|
||||
CREATE TRIGGER _0_test_trigger_insert_after_when_dev1
|
||||
AFTER INSERT ON hyper
|
||||
FOR EACH ROW
|
||||
WHEN (NEW.device_id = 'dev1')
|
||||
EXECUTE PROCEDURE test_trigger();
|
||||
|
||||
CREATE TRIGGER _0_test_trigger_update_after
|
||||
AFTER UPDATE ON hyper
|
||||
FOR EACH ROW EXECUTE PROCEDURE test_trigger();
|
||||
|
Loading…
x
Reference in New Issue
Block a user