From 473d743927944d763616b9bd98ae0d3fc9278ea3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstro=CC=88m?= Date: Wed, 22 Mar 2017 13:41:51 +0100 Subject: [PATCH] Cleanup insert state on error If an error is generated in any of the insert triggers, the insert state kept during a batch insert might be left in an undefined state, breaking the next insert. This patch makes sure errors are captured in the insert triggers so that the state can be cleaned up. --- src/insert.c | 296 ++++++++++++++++++++++----------------- test/expected/insert.out | 26 ++++ test/sql/insert.sql | 12 ++ 3 files changed, 204 insertions(+), 130 deletions(-) diff --git a/src/insert.c b/src/insert.c index d55fb1f5f..b8d14735b 100644 --- a/src/insert.c +++ b/src/insert.c @@ -49,7 +49,7 @@ get_close_if_needed_fn() } static void -close_if_needed(Hypertable * hci, Chunk * chunk) +close_if_needed(Hypertable *hci, Chunk *chunk) { ChunkReplica *cr; Catalog *catalog = catalog_get(); @@ -76,7 +76,7 @@ typedef struct ChunkInsertCtxRel EState *estate; ResultRelInfo *resultRelInfo; BulkInsertState bistate; -} ChunkInsertCtxRel; +} ChunkInsertCtxRel; static ChunkInsertCtxRel * chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range_table) @@ -102,7 +102,7 @@ chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range } static void -chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx) +chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx) { FreeBulkInsertState(rel_ctx->bistate); ExecCloseIndices(rel_ctx->resultRelInfo); @@ -113,7 +113,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx) static void -chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel * rel_ctx, HeapTuple tuple) +chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) { int hi_options = 0; /* no optimization */ CommandId mycid = GetCurrentCommandId(true); @@ -143,10 +143,10 @@ typedef struct ChunkInsertCtx Chunk *chunk; Cache *pinned; List *ctxs; -} ChunkInsertCtx; +} ChunkInsertCtx; static ChunkInsertCtx * -chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) +chunk_insert_ctx_new(Chunk *chunk, Cache *pinned) { List *rel_ctx_list = NIL; ChunkInsertCtx *ctx; @@ -186,7 +186,7 @@ chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); - + PreventCommandIfParallelMode("COPY FROM"); if (rel->rd_rel->relkind != RELKIND_RELATION) @@ -223,7 +223,7 @@ chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) } static void -chunk_insert_ctx_destroy(ChunkInsertCtx * ctx) +chunk_insert_ctx_destroy(ChunkInsertCtx *ctx) { ListCell *lc; @@ -243,7 +243,7 @@ chunk_insert_ctx_destroy(ChunkInsertCtx * ctx) } static void -chunk_insert_ctx_insert_tuple(ChunkInsertCtx * ctx, HeapTuple tup) +chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup) { ListCell *lc; @@ -274,7 +274,7 @@ typedef struct InsertTriggerCtx TupleDesc expanded_tupdesc; HeapTuple first_tuple; MemoryContext mctx; -} InsertTriggerCtx; +} InsertTriggerCtx; static InsertTriggerCtx *insert_trigger_ctx; @@ -359,14 +359,14 @@ insert_trigger_ctx_create(HeapTuple tuple, Oid relid) } static void -insert_trigger_ctx_sort_init(InsertTriggerCtx * tctx, TupleDesc tupdesc) +insert_trigger_ctx_sort_init(InsertTriggerCtx *tctx, TupleDesc tupdesc) { tctx->expanded_tupdesc = tuple_desc_expand(tupdesc, &tctx->time_attno, tctx->hypertable->time_column_name); tctx->sort = tuple_sort_state_init(tctx->expanded_tupdesc, tctx->time_attno, tctx->hypertable->time_column_type); } static void -insert_trigger_ctx_free(InsertTriggerCtx * tctx) +insert_trigger_ctx_free(InsertTriggerCtx *tctx) { if (tctx->chunk_ctx != NULL) chunk_insert_ctx_destroy(tctx->chunk_ctx); @@ -400,7 +400,7 @@ heap_tuple_add_partition_index(HeapTuple tuple, TupleDesc tupdesc, TupleDesc new } static void -insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 time_pt) +insert_tuple(InsertTriggerCtx *tctx, TupleInfo *ti, int partition_index, int64 time_pt) { if (tctx->chunk_ctx != NULL && !chunk_timepoint_is_member(tctx->chunk_ctx->chunk, time_pt)) { @@ -444,7 +444,7 @@ insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 } static Partition * -insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple, +insert_trigger_ctx_lookup_partition(InsertTriggerCtx *tctx, HeapTuple tuple, TupleDesc tupdesc, int64 *timepoint_out) { Datum datum; @@ -489,7 +489,7 @@ insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple, static void -insert_trigger_ctx_tuplesort_put(InsertTriggerCtx * tctx, HeapTuple tuple, TupleDesc tupdesc) +insert_trigger_ctx_tuplesort_put(InsertTriggerCtx *tctx, HeapTuple tuple, TupleDesc tupdesc) { Partition *part; TupleTableSlot *slot; @@ -554,48 +554,68 @@ insert_main_table_trigger(PG_FUNCTION_ARGS) InsertTriggerCtx *tctx = insert_trigger_ctx; HeapTuple tuple; TupleDesc tupdesc = trigdata->tg_relation->rd_att; - MemoryContext oldctx; + MemoryContext oldctx = NULL; - /* Check that this is called the way it should be */ - if (!CALLED_AS_TRIGGER(fcinfo)) - elog(ERROR, "Trigger not called by trigger manager"); - - if (!TRIGGER_FIRED_BEFORE(trigdata->tg_event)) - elog(ERROR, "Trigger should only fire before insert"); - - if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) - tuple = trigdata->tg_newtuple; - else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) - tuple = trigdata->tg_trigtuple; - else - elog(ERROR, "Unsupported event for trigger"); - - if (insert_trigger_ctx == NULL) + PG_TRY(); { - /* This is the first row. Allocate a new insert context */ - insert_trigger_ctx = insert_trigger_ctx_create(tuple, trigdata->tg_relation->rd_id); - return PointerGetDatum(NULL); - } + /* Check that this is called the way it should be */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "Trigger not called by trigger manager"); - /* - * Use the insert context's memory context so that the state we use - * survives across trigger invocations until the after trigger. - */ - oldctx = MemoryContextSwitchTo(tctx->mctx); + if (!TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + elog(ERROR, "Trigger should only fire before insert"); + + if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + tuple = trigdata->tg_newtuple; + else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + tuple = trigdata->tg_trigtuple; + else + elog(ERROR, "Unsupported event for trigger"); + + if (insert_trigger_ctx == NULL) + { + /* This is the first row. Allocate a new insert context */ + insert_trigger_ctx = insert_trigger_ctx_create(tuple, trigdata->tg_relation->rd_id); + return PointerGetDatum(NULL); + } - if (tctx->sort == NULL) - { /* - * Multi-tuple case, i.e., we must sort the tuples. Initialize the - * sort state and put the first tuple that we saved from the last row - * trigger in the same batch. + * Use the insert context's memory context so that the state we use + * survives across trigger invocations until the after trigger. */ - insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc); - insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc); - } + oldctx = MemoryContextSwitchTo(tctx->mctx); - /* The rest of the tuples in the batch are put into the sort state here */ - insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc); + + if (tctx->sort == NULL) + { + /* + * Multi-tuple case, i.e., we must sort the tuples. Initialize the + * sort state and put the first tuple that we saved from the last + * row trigger in the same batch. + */ + insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc); + insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc); + } + + /* + * The rest of the tuples in the batch are put into the sort state + * here + */ + insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc); + } + PG_CATCH(); + { + if (oldctx != NULL) + MemoryContextSwitchTo(oldctx); + + if (insert_trigger_ctx != NULL) + { + insert_trigger_ctx_free(tctx); + insert_trigger_ctx = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); MemoryContextSwitchTo(oldctx); @@ -610,100 +630,116 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS) InsertTriggerCtx *tctx = insert_trigger_ctx; char *insert_guard; - if (!CALLED_AS_TRIGGER(fcinfo)) - elog(ERROR, "not called by trigger manager"); - - if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event) && - !TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) - elog(ERROR, "Unsupported event for trigger"); - - /* - * This guard protects against calling insert_data() twice in the same - * transaction, which might otherwise cause a deadlock in case the second - * insert_data() involves a chunk that was inserted into in the first call - * to insert_data(). This is a temporary safe guard that should ideally be - * removed once chunk management has been refactored and improved to avoid - * such deadlocks. NOTE: In its current form, this safe guard unfortunately - * prohibits transactions involving INSERTs on two different hypertables. - */ - insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); - - if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) + PG_TRY(); { - ereport(ERROR, - (errcode(ERRCODE_IO_OPERATION_NOT_SUPPORTED), - errmsg("insert_data() can only be called once per transaction"))); - } - /* set the guard locally (for this transaction) */ - set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "not called by trigger manager"); + if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event) && + !TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + elog(ERROR, "Unsupported event for trigger"); - if (tctx->sort == NULL) - { - /* Single-tuple batch fast path */ - Partition *part; - int64 timepoint; - TupleInfo ti = { - .desc = trigdata->tg_relation->rd_att, - .tuple = tctx->first_tuple, - }; + /* + * This guard protects against inserting twice in the same + * transaction, which might otherwise cause a deadlock in case the + * second insert involves a chunk that was targeted in the first + * insert. This is a temporary safe guard that should ideally be + * removed once chunk management has been refactored and improved to + * avoid such deadlocks. NOTE: In its current form, this safe guard + * unfortunately prohibits transactions involving INSERTs on two + * different hypertables. + */ + insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); - /* We must set the time attno because it was not set on the fast path */ - tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id, - tctx->hypertable->time_column_name); - part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx, - ti.tuple, - ti.desc, - &timepoint); - - insert_tuple(insert_trigger_ctx, &ti, part->index, timepoint); - } - else - { - /* Multi-tuple batch slow path: sort the tuples */ - TupleDesc tupdesc = tctx->expanded_tupdesc; - TupleTableSlot *slot = MakeSingleTupleTableSlot(tupdesc); - bool doreplace[trigdata->tg_relation->rd_att->natts]; - bool validslot; - Datum datum; - - memset(doreplace, 0, sizeof(doreplace)); - - tuplesort_performsort(tctx->sort); - - /* Loop over the sorted tuples and insert one by one */ - validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); - - while (validslot) + if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) { - bool isnull; - int partition_index; + ereport(ERROR, + (errcode(ERRCODE_IO_OPERATION_NOT_SUPPORTED), + errmsg("insert_data() can only be called once per transaction"))); + } + /* set the guard locally (for this transaction) */ + set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + if (tctx->sort == NULL) + { + /* Single-tuple batch fast path */ + Partition *part; int64 timepoint; - HeapTuple tuple = ExecFetchSlotTuple(slot); TupleInfo ti = { .desc = trigdata->tg_relation->rd_att, - - /* - * Strip off the partition attribute from the tuple so that we - * do not add it to the chunk when we insert. - */ - .tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace), + .tuple = tctx->first_tuple, }; - datum = heap_getattr(tuple, tupdesc->natts, tupdesc, &isnull); - partition_index = DatumGetInt32(datum); - datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull); - timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type); + /* + * We must set the time attno because it was not set on the fast + * path + */ + tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id, + tctx->hypertable->time_column_name); + part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx, + ti.tuple, + ti.desc, + &timepoint); - insert_tuple(insert_trigger_ctx, &ti, partition_index, timepoint); - - ExecClearTuple(slot); - validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + insert_tuple(insert_trigger_ctx, &ti, part->index, timepoint); } + else + { + /* Multi-tuple batch slow path: sort the tuples */ + TupleDesc tupdesc = tctx->expanded_tupdesc; + TupleTableSlot *slot = MakeSingleTupleTableSlot(tupdesc); + bool doreplace[trigdata->tg_relation->rd_att->natts]; + bool validslot; + Datum datum; - tuplesort_end(tctx->sort); + memset(doreplace, 0, sizeof(doreplace)); + + tuplesort_performsort(tctx->sort); + + /* Loop over the sorted tuples and insert one by one */ + validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + + while (validslot) + { + bool isnull; + int partition_index; + int64 timepoint; + HeapTuple tuple = ExecFetchSlotTuple(slot); + TupleInfo ti = { + .desc = trigdata->tg_relation->rd_att, + + /* + * Strip off the partition attribute from the tuple so + * that we do not add it to the chunk when we insert. + */ + .tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace), + }; + + datum = heap_getattr(tuple, tupdesc->natts, tupdesc, &isnull); + partition_index = DatumGetInt32(datum); + datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull); + timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type); + + insert_tuple(insert_trigger_ctx, &ti, partition_index, timepoint); + + ExecClearTuple(slot); + validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + } + + tuplesort_end(tctx->sort); + } } + PG_CATCH(); + { + if (insert_trigger_ctx != NULL) + { + insert_trigger_ctx_free(tctx); + insert_trigger_ctx = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); insert_trigger_ctx_free(tctx); insert_trigger_ctx = NULL; diff --git a/test/expected/insert.out b/test/expected/insert.out index 3918a75d3..bebd7a18e 100644 --- a/test/expected/insert.out +++ b/test/expected/insert.out @@ -490,3 +490,29 @@ SELECT * FROM ONLY "testNs"; ------------+-----------+----------+----------+----------+------------- (0 rows) +CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL); +SELECT create_hypertable('error_test', 'time', 'device', 2); + create_hypertable +------------------- + +(1 row) + +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1'); +\set ON_ERROR_STOP 0 +-- generate insert error +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL); +ERROR: 23502: null value in column "device" violates not-null constraint +DETAIL: Failing row contains (Mon Mar 20 09:18:22.3 2017, 21.1, null). +SCHEMA NAME: _timescaledb_internal +TABLE NAME: _hyper_3_5_0_7_data +COLUMN NAME: device +LOCATION: ExecConstraints, execMain.c:1732 +\set ON_ERROR_STOP 1 +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2'); +SELECT * FROM error_test; + time | temp | device +----------------------------+------+-------- + Mon Mar 20 09:18:20.1 2017 | 21.3 | dev1 + Mon Mar 20 09:18:25.7 2017 | 22.4 | dev2 +(2 rows) + diff --git a/test/sql/insert.sql b/test/sql/insert.sql index 471d8e197..b8211b71d 100644 --- a/test/sql/insert.sql +++ b/test/sql/insert.sql @@ -13,3 +13,15 @@ SELECT * FROM chunk_closing_test; SELECT * FROM ONLY chunk_closing_test; SELECT * FROM "testNs"; SELECT * FROM ONLY "testNs"; + +CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL); +SELECT create_hypertable('error_test', 'time', 'device', 2); + +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1'); + +\set ON_ERROR_STOP 0 +-- generate insert error +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL); +\set ON_ERROR_STOP 1 +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2'); +SELECT * FROM error_test;