diff --git a/src/insert.c b/src/insert.c index d55fb1f5f..b8d14735b 100644 --- a/src/insert.c +++ b/src/insert.c @@ -49,7 +49,7 @@ get_close_if_needed_fn() } static void -close_if_needed(Hypertable * hci, Chunk * chunk) +close_if_needed(Hypertable *hci, Chunk *chunk) { ChunkReplica *cr; Catalog *catalog = catalog_get(); @@ -76,7 +76,7 @@ typedef struct ChunkInsertCtxRel EState *estate; ResultRelInfo *resultRelInfo; BulkInsertState bistate; -} ChunkInsertCtxRel; +} ChunkInsertCtxRel; static ChunkInsertCtxRel * chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range_table) @@ -102,7 +102,7 @@ chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range } static void -chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx) +chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx) { FreeBulkInsertState(rel_ctx->bistate); ExecCloseIndices(rel_ctx->resultRelInfo); @@ -113,7 +113,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx) static void -chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel * rel_ctx, HeapTuple tuple) +chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) { int hi_options = 0; /* no optimization */ CommandId mycid = GetCurrentCommandId(true); @@ -143,10 +143,10 @@ typedef struct ChunkInsertCtx Chunk *chunk; Cache *pinned; List *ctxs; -} ChunkInsertCtx; +} ChunkInsertCtx; static ChunkInsertCtx * -chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) +chunk_insert_ctx_new(Chunk *chunk, Cache *pinned) { List *rel_ctx_list = NIL; ChunkInsertCtx *ctx; @@ -186,7 +186,7 @@ chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) if (XactReadOnly && !rel->rd_islocaltemp) PreventCommandIfReadOnly("COPY FROM"); - + PreventCommandIfParallelMode("COPY FROM"); if (rel->rd_rel->relkind != RELKIND_RELATION) @@ -223,7 +223,7 @@ chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) } static void -chunk_insert_ctx_destroy(ChunkInsertCtx * ctx) +chunk_insert_ctx_destroy(ChunkInsertCtx *ctx) { ListCell *lc; @@ -243,7 +243,7 @@ chunk_insert_ctx_destroy(ChunkInsertCtx * ctx) } static void -chunk_insert_ctx_insert_tuple(ChunkInsertCtx * ctx, HeapTuple tup) +chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup) { ListCell *lc; @@ -274,7 +274,7 @@ typedef struct InsertTriggerCtx TupleDesc expanded_tupdesc; HeapTuple first_tuple; MemoryContext mctx; -} InsertTriggerCtx; +} InsertTriggerCtx; static InsertTriggerCtx *insert_trigger_ctx; @@ -359,14 +359,14 @@ insert_trigger_ctx_create(HeapTuple tuple, Oid relid) } static void -insert_trigger_ctx_sort_init(InsertTriggerCtx * tctx, TupleDesc tupdesc) +insert_trigger_ctx_sort_init(InsertTriggerCtx *tctx, TupleDesc tupdesc) { tctx->expanded_tupdesc = tuple_desc_expand(tupdesc, &tctx->time_attno, tctx->hypertable->time_column_name); tctx->sort = tuple_sort_state_init(tctx->expanded_tupdesc, tctx->time_attno, tctx->hypertable->time_column_type); } static void -insert_trigger_ctx_free(InsertTriggerCtx * tctx) +insert_trigger_ctx_free(InsertTriggerCtx *tctx) { if (tctx->chunk_ctx != NULL) chunk_insert_ctx_destroy(tctx->chunk_ctx); @@ -400,7 +400,7 @@ heap_tuple_add_partition_index(HeapTuple tuple, TupleDesc tupdesc, TupleDesc new } static void -insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 time_pt) +insert_tuple(InsertTriggerCtx *tctx, TupleInfo *ti, int partition_index, int64 time_pt) { if (tctx->chunk_ctx != NULL && !chunk_timepoint_is_member(tctx->chunk_ctx->chunk, time_pt)) { @@ -444,7 +444,7 @@ insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 } static Partition * -insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple, +insert_trigger_ctx_lookup_partition(InsertTriggerCtx *tctx, HeapTuple tuple, TupleDesc tupdesc, int64 *timepoint_out) { Datum datum; @@ -489,7 +489,7 @@ insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple, static void -insert_trigger_ctx_tuplesort_put(InsertTriggerCtx * tctx, HeapTuple tuple, TupleDesc tupdesc) +insert_trigger_ctx_tuplesort_put(InsertTriggerCtx *tctx, HeapTuple tuple, TupleDesc tupdesc) { Partition *part; TupleTableSlot *slot; @@ -554,48 +554,68 @@ insert_main_table_trigger(PG_FUNCTION_ARGS) InsertTriggerCtx *tctx = insert_trigger_ctx; HeapTuple tuple; TupleDesc tupdesc = trigdata->tg_relation->rd_att; - MemoryContext oldctx; + MemoryContext oldctx = NULL; - /* Check that this is called the way it should be */ - if (!CALLED_AS_TRIGGER(fcinfo)) - elog(ERROR, "Trigger not called by trigger manager"); - - if (!TRIGGER_FIRED_BEFORE(trigdata->tg_event)) - elog(ERROR, "Trigger should only fire before insert"); - - if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) - tuple = trigdata->tg_newtuple; - else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) - tuple = trigdata->tg_trigtuple; - else - elog(ERROR, "Unsupported event for trigger"); - - if (insert_trigger_ctx == NULL) + PG_TRY(); { - /* This is the first row. Allocate a new insert context */ - insert_trigger_ctx = insert_trigger_ctx_create(tuple, trigdata->tg_relation->rd_id); - return PointerGetDatum(NULL); - } + /* Check that this is called the way it should be */ + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "Trigger not called by trigger manager"); - /* - * Use the insert context's memory context so that the state we use - * survives across trigger invocations until the after trigger. - */ - oldctx = MemoryContextSwitchTo(tctx->mctx); + if (!TRIGGER_FIRED_BEFORE(trigdata->tg_event)) + elog(ERROR, "Trigger should only fire before insert"); + + if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + tuple = trigdata->tg_newtuple; + else if (TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + tuple = trigdata->tg_trigtuple; + else + elog(ERROR, "Unsupported event for trigger"); + + if (insert_trigger_ctx == NULL) + { + /* This is the first row. Allocate a new insert context */ + insert_trigger_ctx = insert_trigger_ctx_create(tuple, trigdata->tg_relation->rd_id); + return PointerGetDatum(NULL); + } - if (tctx->sort == NULL) - { /* - * Multi-tuple case, i.e., we must sort the tuples. Initialize the - * sort state and put the first tuple that we saved from the last row - * trigger in the same batch. + * Use the insert context's memory context so that the state we use + * survives across trigger invocations until the after trigger. */ - insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc); - insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc); - } + oldctx = MemoryContextSwitchTo(tctx->mctx); - /* The rest of the tuples in the batch are put into the sort state here */ - insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc); + + if (tctx->sort == NULL) + { + /* + * Multi-tuple case, i.e., we must sort the tuples. Initialize the + * sort state and put the first tuple that we saved from the last + * row trigger in the same batch. + */ + insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc); + insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc); + } + + /* + * The rest of the tuples in the batch are put into the sort state + * here + */ + insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc); + } + PG_CATCH(); + { + if (oldctx != NULL) + MemoryContextSwitchTo(oldctx); + + if (insert_trigger_ctx != NULL) + { + insert_trigger_ctx_free(tctx); + insert_trigger_ctx = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); MemoryContextSwitchTo(oldctx); @@ -610,100 +630,116 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS) InsertTriggerCtx *tctx = insert_trigger_ctx; char *insert_guard; - if (!CALLED_AS_TRIGGER(fcinfo)) - elog(ERROR, "not called by trigger manager"); - - if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event) && - !TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) - elog(ERROR, "Unsupported event for trigger"); - - /* - * This guard protects against calling insert_data() twice in the same - * transaction, which might otherwise cause a deadlock in case the second - * insert_data() involves a chunk that was inserted into in the first call - * to insert_data(). This is a temporary safe guard that should ideally be - * removed once chunk management has been refactored and improved to avoid - * such deadlocks. NOTE: In its current form, this safe guard unfortunately - * prohibits transactions involving INSERTs on two different hypertables. - */ - insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); - - if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) + PG_TRY(); { - ereport(ERROR, - (errcode(ERRCODE_IO_OPERATION_NOT_SUPPORTED), - errmsg("insert_data() can only be called once per transaction"))); - } - /* set the guard locally (for this transaction) */ - set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, - GUC_ACTION_LOCAL, true, 0, false); + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "not called by trigger manager"); + if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event) && + !TRIGGER_FIRED_BY_INSERT(trigdata->tg_event)) + elog(ERROR, "Unsupported event for trigger"); - if (tctx->sort == NULL) - { - /* Single-tuple batch fast path */ - Partition *part; - int64 timepoint; - TupleInfo ti = { - .desc = trigdata->tg_relation->rd_att, - .tuple = tctx->first_tuple, - }; + /* + * This guard protects against inserting twice in the same + * transaction, which might otherwise cause a deadlock in case the + * second insert involves a chunk that was targeted in the first + * insert. This is a temporary safe guard that should ideally be + * removed once chunk management has been refactored and improved to + * avoid such deadlocks. NOTE: In its current form, this safe guard + * unfortunately prohibits transactions involving INSERTs on two + * different hypertables. + */ + insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); - /* We must set the time attno because it was not set on the fast path */ - tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id, - tctx->hypertable->time_column_name); - part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx, - ti.tuple, - ti.desc, - &timepoint); - - insert_tuple(insert_trigger_ctx, &ti, part->index, timepoint); - } - else - { - /* Multi-tuple batch slow path: sort the tuples */ - TupleDesc tupdesc = tctx->expanded_tupdesc; - TupleTableSlot *slot = MakeSingleTupleTableSlot(tupdesc); - bool doreplace[trigdata->tg_relation->rd_att->natts]; - bool validslot; - Datum datum; - - memset(doreplace, 0, sizeof(doreplace)); - - tuplesort_performsort(tctx->sort); - - /* Loop over the sorted tuples and insert one by one */ - validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); - - while (validslot) + if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) { - bool isnull; - int partition_index; + ereport(ERROR, + (errcode(ERRCODE_IO_OPERATION_NOT_SUPPORTED), + errmsg("insert_data() can only be called once per transaction"))); + } + /* set the guard locally (for this transaction) */ + set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + if (tctx->sort == NULL) + { + /* Single-tuple batch fast path */ + Partition *part; int64 timepoint; - HeapTuple tuple = ExecFetchSlotTuple(slot); TupleInfo ti = { .desc = trigdata->tg_relation->rd_att, - - /* - * Strip off the partition attribute from the tuple so that we - * do not add it to the chunk when we insert. - */ - .tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace), + .tuple = tctx->first_tuple, }; - datum = heap_getattr(tuple, tupdesc->natts, tupdesc, &isnull); - partition_index = DatumGetInt32(datum); - datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull); - timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type); + /* + * We must set the time attno because it was not set on the fast + * path + */ + tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id, + tctx->hypertable->time_column_name); + part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx, + ti.tuple, + ti.desc, + &timepoint); - insert_tuple(insert_trigger_ctx, &ti, partition_index, timepoint); - - ExecClearTuple(slot); - validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + insert_tuple(insert_trigger_ctx, &ti, part->index, timepoint); } + else + { + /* Multi-tuple batch slow path: sort the tuples */ + TupleDesc tupdesc = tctx->expanded_tupdesc; + TupleTableSlot *slot = MakeSingleTupleTableSlot(tupdesc); + bool doreplace[trigdata->tg_relation->rd_att->natts]; + bool validslot; + Datum datum; - tuplesort_end(tctx->sort); + memset(doreplace, 0, sizeof(doreplace)); + + tuplesort_performsort(tctx->sort); + + /* Loop over the sorted tuples and insert one by one */ + validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + + while (validslot) + { + bool isnull; + int partition_index; + int64 timepoint; + HeapTuple tuple = ExecFetchSlotTuple(slot); + TupleInfo ti = { + .desc = trigdata->tg_relation->rd_att, + + /* + * Strip off the partition attribute from the tuple so + * that we do not add it to the chunk when we insert. + */ + .tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace), + }; + + datum = heap_getattr(tuple, tupdesc->natts, tupdesc, &isnull); + partition_index = DatumGetInt32(datum); + datum = heap_getattr(tuple, tctx->time_attno, tupdesc, &isnull); + timepoint = time_value_to_internal(datum, tctx->hypertable->time_column_type); + + insert_tuple(insert_trigger_ctx, &ti, partition_index, timepoint); + + ExecClearTuple(slot); + validslot = tuplesort_gettupleslot(tctx->sort, true, slot, NULL); + } + + tuplesort_end(tctx->sort); + } } + PG_CATCH(); + { + if (insert_trigger_ctx != NULL) + { + insert_trigger_ctx_free(tctx); + insert_trigger_ctx = NULL; + } + PG_RE_THROW(); + } + PG_END_TRY(); insert_trigger_ctx_free(tctx); insert_trigger_ctx = NULL; diff --git a/test/expected/insert.out b/test/expected/insert.out index 3918a75d3..bebd7a18e 100644 --- a/test/expected/insert.out +++ b/test/expected/insert.out @@ -490,3 +490,29 @@ SELECT * FROM ONLY "testNs"; ------------+-----------+----------+----------+----------+------------- (0 rows) +CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL); +SELECT create_hypertable('error_test', 'time', 'device', 2); + create_hypertable +------------------- + +(1 row) + +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1'); +\set ON_ERROR_STOP 0 +-- generate insert error +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL); +ERROR: 23502: null value in column "device" violates not-null constraint +DETAIL: Failing row contains (Mon Mar 20 09:18:22.3 2017, 21.1, null). +SCHEMA NAME: _timescaledb_internal +TABLE NAME: _hyper_3_5_0_7_data +COLUMN NAME: device +LOCATION: ExecConstraints, execMain.c:1732 +\set ON_ERROR_STOP 1 +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2'); +SELECT * FROM error_test; + time | temp | device +----------------------------+------+-------- + Mon Mar 20 09:18:20.1 2017 | 21.3 | dev1 + Mon Mar 20 09:18:25.7 2017 | 22.4 | dev2 +(2 rows) + diff --git a/test/sql/insert.sql b/test/sql/insert.sql index 471d8e197..b8211b71d 100644 --- a/test/sql/insert.sql +++ b/test/sql/insert.sql @@ -13,3 +13,15 @@ SELECT * FROM chunk_closing_test; SELECT * FROM ONLY chunk_closing_test; SELECT * FROM "testNs"; SELECT * FROM ONLY "testNs"; + +CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL); +SELECT create_hypertable('error_test', 'time', 'device', 2); + +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1'); + +\set ON_ERROR_STOP 0 +-- generate insert error +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL); +\set ON_ERROR_STOP 1 +INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2'); +SELECT * FROM error_test;