Fix crash for concurrent drop and compress chunk

This change fixes a segfault that occured when `drop_chunks` is
concurrently executed with `compress_chunk` and the same chunk that
gets dropped is also being compressed.

The crash happened because the tuple lock status function for a
dimension slice passed in a pointer to a dimension slice that was
always NULL.

An isolation test is also added to cover concurrent compression and
drop of the same chunk. To make the test pass with identical errors
for PG11 and PG12, additional changes are made to the scanner API to
pass on the lock failure data so that it is possible to distinguish
between an update and delete on PG11.
This commit is contained in:
Erik Nordström 2020-11-26 15:57:38 +01:00 committed by Erik Nordström
parent 1e7f961055
commit c311b44a09
6 changed files with 327 additions and 17 deletions

View File

@ -99,8 +99,27 @@ ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord)
return 0;
}
static bool
tuple_is_deleted(TupleInfo *ti)
{
#if PG12_GE
#ifdef USE_ASSERT_CHECKING
if (ti->lockresult == TM_Deleted)
Assert(ItemPointerEquals(ts_scanner_get_tuple_tid(ti), &ti->lockfd.ctid));
#endif
return ti->lockresult == TM_Deleted;
#else
/* If the tid and ctid in the lock failure data is the same, then this is
* a delete. Otherwise it is an update and ctid is the new tuple ID. This
* applies mostly to PG11, since PG12 has an explicit lockresult for
* deleted tuples. */
return ti->lockresult == TM_Updated &&
ItemPointerEquals(ts_scanner_get_tuple_tid(ti), &ti->lockfd.ctid);
#endif
}
static void
lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
lock_result_ok_or_abort(TupleInfo *ti)
{
switch (ti->lockresult)
{
@ -109,20 +128,14 @@ lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
case TM_SelfModified:
case TM_Ok:
break;
#if PG12_GE
case TM_Deleted:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d deleted by other transaction", slice->fd.id),
errhint("Retry the operation again.")));
pg_unreachable();
break;
#endif
case TM_Updated:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d locked by other transaction", slice->fd.id),
errmsg("chunk %s by other transaction",
tuple_is_deleted(ti) ? "deleted" : "updated"),
errhint("Retry the operation again.")));
pg_unreachable();
break;
@ -130,16 +143,14 @@ lock_result_ok_or_abort(TupleInfo *ti, DimensionSlice *slice)
case TM_BeingModified:
ereport(ERROR,
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
errmsg("dimension slice %d updated by other transaction", slice->fd.id),
errmsg("chunk updated by other transaction"),
errhint("Retry the operation again.")));
pg_unreachable();
break;
case TM_Invisible:
elog(ERROR, "attempt to lock invisible tuple");
pg_unreachable();
break;
case TM_WouldBlock:
default:
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
@ -624,7 +635,7 @@ dimension_slice_tuple_found(TupleInfo *ti, void *data)
DimensionSlice **slice = data;
MemoryContext old;
lock_result_ok_or_abort(ti, *slice);
lock_result_ok_or_abort(ti);
old = MemoryContextSwitchTo(ti->mctx);
*slice = dimension_slice_from_slot(ti->slot);

View File

@ -290,8 +290,6 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
if (ctx->tuplock)
{
TM_FailureData tmfd;
#if PG12_GE
TupleTableSlot *slot = ictx->tinfo.slot;
@ -304,7 +302,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
ctx->tuplock->lockmode,
ctx->tuplock->waitpolicy,
ctx->tuplock->lockflags,
&tmfd);
&ictx->tinfo.lockfd);
#else
HeapTuple tuple = ExecFetchSlotTuple(ictx->tinfo.slot);
@ -317,7 +315,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
ctx->tuplock->waitpolicy,
false,
&buffer,
&tmfd);
&ictx->tinfo.lockfd);
/*
* A tuple lock pins the underlying buffer, so we need to
* unpin it.

View File

@ -42,6 +42,8 @@ typedef struct TupleInfo
* in lockresult.
*/
TM_Result lockresult;
/* Failure data in case of failed tuple lock */
TM_FailureData lockfd;
int count;
/*

View File

@ -0,0 +1,237 @@
Parsed test spec with 2 sessions
starting permutation: s1_drop s1_commit s2_compress_chunk_1 s2_compress_chunk_2 s2_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
count
2
step s1_commit: COMMIT;
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
ERROR: chunk not found
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;
starting permutation: s1_drop s2_compress_chunk_1 s1_commit s2_compress_chunk_2 s2_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
count
2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s1_commit: COMMIT;
step s2_compress_chunk_1: <... completed>
error in steps s1_commit s2_compress_chunk_1: ERROR: chunk deleted by other transaction
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;
starting permutation: s1_drop s2_compress_chunk_1 s2_compress_chunk_2 s1_commit s2_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
count
2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s2_compress_chunk_1: <... completed>
ERROR: canceling statement due to lock timeout
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
ERROR: current transaction is aborted, commands ignored until end of transaction block
step s1_commit: COMMIT;
step s2_commit: COMMIT;
starting permutation: s1_drop s2_compress_chunk_1 s2_compress_chunk_2 s2_commit s1_commit
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
count
2
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
<waiting ...>
step s2_compress_chunk_1: <... completed>
ERROR: canceling statement due to lock timeout
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
ERROR: current transaction is aborted, commands ignored until end of transaction block
step s2_commit: COMMIT;
step s1_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s1_drop s1_commit s2_compress_chunk_2 s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s2_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s1_drop s2_compress_chunk_2 s1_commit s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s1_drop s2_compress_chunk_2 s2_commit s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s2_commit: COMMIT;
step s1_drop: <... completed>
count
2
step s1_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s1_drop s1_commit s2_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s1_drop: <... completed>
ERROR: some chunks could not be read since they are being concurrently updated
step s1_commit: COMMIT;
step s2_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s1_drop s2_commit s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
<waiting ...>
step s2_commit: COMMIT;
step s1_drop: <... completed>
count
2
step s1_commit: COMMIT;
starting permutation: s2_compress_chunk_1 s2_compress_chunk_2 s2_commit s1_drop s1_commit
step s2_compress_chunk_1:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
count
1
step s2_compress_chunk_2:
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
count
1
step s2_commit: COMMIT;
step s1_drop:
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
count
2
step s1_commit: COMMIT;

View File

@ -14,6 +14,7 @@ list(APPEND TEST_FILES
continuous_aggs_insert.spec
continuous_aggs_multi.spec
continuous_aggs_concurrent_refresh.spec
deadlock_drop_chunks_compress.spec
)
if (CMAKE_BUILD_TYPE MATCHES Debug)

View File

@ -0,0 +1,61 @@
# This file and its contents are licensed under the Timescale License.
# Please see the included NOTICE for copyright information and
# LICENSE-TIMESCALE for a copy of the license.
# Test concurrent drop_chunks and compress_chunk
#
# Create three chunks
setup
{
CREATE TABLE conditions (time timestamptz, temp float);
SELECT create_hypertable('conditions', 'time', chunk_time_interval => interval '1 day');
INSERT INTO conditions
SELECT generate_series('2018-12-01 00:00'::timestamp, '2018-12-03 00:00','1 hour'), random() * 100;
ALTER TABLE conditions SET (timescaledb.compress = true);
}
teardown {
DROP TABLE conditions;
}
session "s1"
setup {
BEGIN;
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '300ms';
}
# Drop two chunks
step "s1_drop" {
SELECT count (*)
FROM drop_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz);
}
step "s1_commit" { COMMIT; }
session "s2"
setup {
BEGIN;
SET TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '300ms';
CREATE TEMPORARY TABLE IF NOT EXISTS chunks_to_compress ON COMMIT DROP AS
SELECT chunk
FROM show_chunks('conditions', older_than => '2018-12-03 00:00'::timestamptz) chunk
ORDER BY 1 LIMIT 2;
}
# Compress same two chunks as are being dropped
step "s2_compress_chunk_1" {
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 ASC LIMIT 1) AS chunk;
}
step "s2_compress_chunk_2" {
SELECT count(compress_chunk(chunk))
FROM (SELECT chunk FROM chunks_to_compress ORDER BY 1 DESC LIMIT 1) AS chunk;
}
step "s2_commit" { COMMIT; }