Merged in enordstr/backend-database/enordstr/cleanup-insert-state-on-error (pull request #133)

Cleanup insert state on error

Approved-by: RobAtticus NA <rob.kiefer@gmail.com>
Approved-by: Matvey Arye <cevian@gmail.com>
Approved-by: ci-vast
This commit is contained in:
enordstr NA 2017-03-22 14:32:37 +00:00
commit e4928c0835
3 changed files with 204 additions and 130 deletions

View File

@ -49,7 +49,7 @@ get_close_if_needed_fn()
} }
static void static void
close_if_needed(Hypertable * hci, Chunk * chunk) close_if_needed(Hypertable *hci, Chunk *chunk)
{ {
ChunkReplica *cr; ChunkReplica *cr;
Catalog *catalog = catalog_get(); Catalog *catalog = catalog_get();
@ -102,7 +102,7 @@ chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range
} }
static void static void
chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx) chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx)
{ {
FreeBulkInsertState(rel_ctx->bistate); FreeBulkInsertState(rel_ctx->bistate);
ExecCloseIndices(rel_ctx->resultRelInfo); ExecCloseIndices(rel_ctx->resultRelInfo);
@ -113,7 +113,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel * rel_ctx)
static void static void
chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel * rel_ctx, HeapTuple tuple) chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple)
{ {
int hi_options = 0; /* no optimization */ int hi_options = 0; /* no optimization */
CommandId mycid = GetCurrentCommandId(true); CommandId mycid = GetCurrentCommandId(true);
@ -146,7 +146,7 @@ typedef struct ChunkInsertCtx
} ChunkInsertCtx; } ChunkInsertCtx;
static ChunkInsertCtx * static ChunkInsertCtx *
chunk_insert_ctx_new(Chunk * chunk, Cache * pinned) chunk_insert_ctx_new(Chunk *chunk, Cache *pinned)
{ {
List *rel_ctx_list = NIL; List *rel_ctx_list = NIL;
ChunkInsertCtx *ctx; ChunkInsertCtx *ctx;
@ -223,7 +223,7 @@ chunk_insert_ctx_new(Chunk * chunk, Cache * pinned)
} }
static void static void
chunk_insert_ctx_destroy(ChunkInsertCtx * ctx) chunk_insert_ctx_destroy(ChunkInsertCtx *ctx)
{ {
ListCell *lc; ListCell *lc;
@ -243,7 +243,7 @@ chunk_insert_ctx_destroy(ChunkInsertCtx * ctx)
} }
static void static void
chunk_insert_ctx_insert_tuple(ChunkInsertCtx * ctx, HeapTuple tup) chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup)
{ {
ListCell *lc; ListCell *lc;
@ -359,14 +359,14 @@ insert_trigger_ctx_create(HeapTuple tuple, Oid relid)
} }
static void static void
insert_trigger_ctx_sort_init(InsertTriggerCtx * tctx, TupleDesc tupdesc) insert_trigger_ctx_sort_init(InsertTriggerCtx *tctx, TupleDesc tupdesc)
{ {
tctx->expanded_tupdesc = tuple_desc_expand(tupdesc, &tctx->time_attno, tctx->hypertable->time_column_name); tctx->expanded_tupdesc = tuple_desc_expand(tupdesc, &tctx->time_attno, tctx->hypertable->time_column_name);
tctx->sort = tuple_sort_state_init(tctx->expanded_tupdesc, tctx->time_attno, tctx->hypertable->time_column_type); tctx->sort = tuple_sort_state_init(tctx->expanded_tupdesc, tctx->time_attno, tctx->hypertable->time_column_type);
} }
static void static void
insert_trigger_ctx_free(InsertTriggerCtx * tctx) insert_trigger_ctx_free(InsertTriggerCtx *tctx)
{ {
if (tctx->chunk_ctx != NULL) if (tctx->chunk_ctx != NULL)
chunk_insert_ctx_destroy(tctx->chunk_ctx); chunk_insert_ctx_destroy(tctx->chunk_ctx);
@ -400,7 +400,7 @@ heap_tuple_add_partition_index(HeapTuple tuple, TupleDesc tupdesc, TupleDesc new
} }
static void static void
insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64 time_pt) insert_tuple(InsertTriggerCtx *tctx, TupleInfo *ti, int partition_index, int64 time_pt)
{ {
if (tctx->chunk_ctx != NULL && !chunk_timepoint_is_member(tctx->chunk_ctx->chunk, time_pt)) if (tctx->chunk_ctx != NULL && !chunk_timepoint_is_member(tctx->chunk_ctx->chunk, time_pt))
{ {
@ -444,7 +444,7 @@ insert_tuple(InsertTriggerCtx * tctx, TupleInfo * ti, int partition_index, int64
} }
static Partition * static Partition *
insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple, insert_trigger_ctx_lookup_partition(InsertTriggerCtx *tctx, HeapTuple tuple,
TupleDesc tupdesc, int64 *timepoint_out) TupleDesc tupdesc, int64 *timepoint_out)
{ {
Datum datum; Datum datum;
@ -489,7 +489,7 @@ insert_trigger_ctx_lookup_partition(InsertTriggerCtx * tctx, HeapTuple tuple,
static void static void
insert_trigger_ctx_tuplesort_put(InsertTriggerCtx * tctx, HeapTuple tuple, TupleDesc tupdesc) insert_trigger_ctx_tuplesort_put(InsertTriggerCtx *tctx, HeapTuple tuple, TupleDesc tupdesc)
{ {
Partition *part; Partition *part;
TupleTableSlot *slot; TupleTableSlot *slot;
@ -554,8 +554,10 @@ insert_main_table_trigger(PG_FUNCTION_ARGS)
InsertTriggerCtx *tctx = insert_trigger_ctx; InsertTriggerCtx *tctx = insert_trigger_ctx;
HeapTuple tuple; HeapTuple tuple;
TupleDesc tupdesc = trigdata->tg_relation->rd_att; TupleDesc tupdesc = trigdata->tg_relation->rd_att;
MemoryContext oldctx; MemoryContext oldctx = NULL;
PG_TRY();
{
/* Check that this is called the way it should be */ /* Check that this is called the way it should be */
if (!CALLED_AS_TRIGGER(fcinfo)) if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "Trigger not called by trigger manager"); elog(ERROR, "Trigger not called by trigger manager");
@ -583,19 +585,37 @@ insert_main_table_trigger(PG_FUNCTION_ARGS)
*/ */
oldctx = MemoryContextSwitchTo(tctx->mctx); oldctx = MemoryContextSwitchTo(tctx->mctx);
if (tctx->sort == NULL) if (tctx->sort == NULL)
{ {
/* /*
* Multi-tuple case, i.e., we must sort the tuples. Initialize the * Multi-tuple case, i.e., we must sort the tuples. Initialize the
* sort state and put the first tuple that we saved from the last row * sort state and put the first tuple that we saved from the last
* trigger in the same batch. * row trigger in the same batch.
*/ */
insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc); insert_trigger_ctx_sort_init(insert_trigger_ctx, tupdesc);
insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc); insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tctx->first_tuple, tupdesc);
} }
/* The rest of the tuples in the batch are put into the sort state here */ /*
* The rest of the tuples in the batch are put into the sort state
* here
*/
insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc); insert_trigger_ctx_tuplesort_put(insert_trigger_ctx, tuple, tupdesc);
}
PG_CATCH();
{
if (oldctx != NULL)
MemoryContextSwitchTo(oldctx);
if (insert_trigger_ctx != NULL)
{
insert_trigger_ctx_free(tctx);
insert_trigger_ctx = NULL;
}
PG_RE_THROW();
}
PG_END_TRY();
MemoryContextSwitchTo(oldctx); MemoryContextSwitchTo(oldctx);
@ -610,6 +630,8 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
InsertTriggerCtx *tctx = insert_trigger_ctx; InsertTriggerCtx *tctx = insert_trigger_ctx;
char *insert_guard; char *insert_guard;
PG_TRY();
{
if (!CALLED_AS_TRIGGER(fcinfo)) if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "not called by trigger manager"); elog(ERROR, "not called by trigger manager");
@ -618,13 +640,14 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
elog(ERROR, "Unsupported event for trigger"); elog(ERROR, "Unsupported event for trigger");
/* /*
* This guard protects against calling insert_data() twice in the same * This guard protects against inserting twice in the same
* transaction, which might otherwise cause a deadlock in case the second * transaction, which might otherwise cause a deadlock in case the
* insert_data() involves a chunk that was inserted into in the first call * second insert involves a chunk that was targeted in the first
* to insert_data(). This is a temporary safe guard that should ideally be * insert. This is a temporary safe guard that should ideally be
* removed once chunk management has been refactored and improved to avoid * removed once chunk management has been refactored and improved to
* such deadlocks. NOTE: In its current form, this safe guard unfortunately * avoid such deadlocks. NOTE: In its current form, this safe guard
* prohibits transactions involving INSERTs on two different hypertables. * unfortunately prohibits transactions involving INSERTs on two
* different hypertables.
*/ */
insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true);
@ -638,7 +661,6 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false); GUC_ACTION_LOCAL, true, 0, false);
if (tctx->sort == NULL) if (tctx->sort == NULL)
{ {
/* Single-tuple batch fast path */ /* Single-tuple batch fast path */
@ -649,7 +671,10 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
.tuple = tctx->first_tuple, .tuple = tctx->first_tuple,
}; };
/* We must set the time attno because it was not set on the fast path */ /*
* We must set the time attno because it was not set on the fast
* path
*/
tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id, tctx->time_attno = get_attnum(trigdata->tg_relation->rd_id,
tctx->hypertable->time_column_name); tctx->hypertable->time_column_name);
part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx, part = insert_trigger_ctx_lookup_partition(insert_trigger_ctx,
@ -685,8 +710,8 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
.desc = trigdata->tg_relation->rd_att, .desc = trigdata->tg_relation->rd_att,
/* /*
* Strip off the partition attribute from the tuple so that we * Strip off the partition attribute from the tuple so
* do not add it to the chunk when we insert. * that we do not add it to the chunk when we insert.
*/ */
.tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace), .tuple = heap_modify_tuple(tuple, trigdata->tg_relation->rd_att, NULL, NULL, doreplace),
}; };
@ -704,6 +729,17 @@ insert_main_table_trigger_after(PG_FUNCTION_ARGS)
tuplesort_end(tctx->sort); tuplesort_end(tctx->sort);
} }
}
PG_CATCH();
{
if (insert_trigger_ctx != NULL)
{
insert_trigger_ctx_free(tctx);
insert_trigger_ctx = NULL;
}
PG_RE_THROW();
}
PG_END_TRY();
insert_trigger_ctx_free(tctx); insert_trigger_ctx_free(tctx);
insert_trigger_ctx = NULL; insert_trigger_ctx = NULL;

View File

@ -490,3 +490,29 @@ SELECT * FROM ONLY "testNs";
------------+-----------+----------+----------+----------+------------- ------------+-----------+----------+----------+----------+-------------
(0 rows) (0 rows)
CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL);
SELECT create_hypertable('error_test', 'time', 'device', 2);
create_hypertable
-------------------
(1 row)
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1');
\set ON_ERROR_STOP 0
-- generate insert error
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL);
ERROR: 23502: null value in column "device" violates not-null constraint
DETAIL: Failing row contains (Mon Mar 20 09:18:22.3 2017, 21.1, null).
SCHEMA NAME: _timescaledb_internal
TABLE NAME: _hyper_3_5_0_7_data
COLUMN NAME: device
LOCATION: ExecConstraints, execMain.c:1732
\set ON_ERROR_STOP 1
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2');
SELECT * FROM error_test;
time | temp | device
----------------------------+------+--------
Mon Mar 20 09:18:20.1 2017 | 21.3 | dev1
Mon Mar 20 09:18:25.7 2017 | 22.4 | dev2
(2 rows)

View File

@ -13,3 +13,15 @@ SELECT * FROM chunk_closing_test;
SELECT * FROM ONLY chunk_closing_test; SELECT * FROM ONLY chunk_closing_test;
SELECT * FROM "testNs"; SELECT * FROM "testNs";
SELECT * FROM ONLY "testNs"; SELECT * FROM ONLY "testNs";
CREATE TABLE error_test(time timestamp, temp float8, device text NOT NULL);
SELECT create_hypertable('error_test', 'time', 'device', 2);
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:20.1 2017', 21.3, 'dev1');
\set ON_ERROR_STOP 0
-- generate insert error
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:22.3 2017', 21.1, NULL);
\set ON_ERROR_STOP 1
INSERT INTO error_test VALUES ('Mon Mar 20 09:18:25.7 2017', 22.4, 'dev2');
SELECT * FROM error_test;