Fix leak during concurrent UPDATE/DELETE

When updating and deleting the same tuple while both transactions are
running at the same time, we end up with reference leak. This is because
one of the query in a transaction fails and we take error path. However
we fail to close the table.

This patch fixes the above mentioned problem by closing the required
tables.

Fixes #5674
This commit is contained in:
Bharathy 2023-05-11 17:55:27 +05:30
parent 656daf45f6
commit 2d71a5bca9
3 changed files with 548 additions and 17 deletions

View File

@ -2265,8 +2265,13 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int *
* This method will: * This method will:
* 1.scan compressed chunk * 1.scan compressed chunk
* 2.decompress the row * 2.decompress the row
* 3.insert decompressed rows to uncompressed chunk * 3.delete this row from compressed chunk
* 4.delete this row from compressed chunk * 4.insert decompressed rows to uncompressed chunk
*
* Return value:
* if all 4 steps defined above pass set chunk_status_changed to true and return true
* if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on
* same chunk.
*/ */
static bool static bool
decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys, decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys,
@ -2307,7 +2312,6 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
decompressor->compressed_datums, decompressor->compressed_datums,
decompressor->compressed_is_nulls); decompressor->compressed_is_nulls);
row_decompressor_decompress_row(decompressor, NULL);
TM_FailureData tmfd; TM_FailureData tmfd;
TM_Result result; TM_Result result;
result = table_tuple_delete(decompressor->in_rel, result = table_tuple_delete(decompressor->in_rel,
@ -2319,19 +2323,49 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num
&tmfd, &tmfd,
false); false);
if (result == TM_Updated || result == TM_Deleted) switch (result)
{ {
if (IsolationUsesXactSnapshot()) /* If the tuple has been already deleted, most likely somebody
ereport(ERROR, * decompressed the tuple already */
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), case TM_Deleted:
errmsg("could not serialize access due to concurrent update"))); {
return false; if (IsolationUsesXactSnapshot())
} {
if (result == TM_Invisible) /* For Repeatable Read isolation level report error */
{ table_endscan(heapScan);
elog(ERROR, "attempted to lock invisible tuple"); ereport(ERROR,
return false; (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
errmsg("could not serialize access due to concurrent update")));
}
continue;
}
break;
/*
* If another transaction is updating the compressed data,
* we have to abort the transaction to keep consistency.
*/
case TM_Updated:
{
table_endscan(heapScan);
elog(ERROR, "tuple concurrently updated");
}
break;
case TM_Invisible:
{
table_endscan(heapScan);
elog(ERROR, "attempted to lock invisible tuple");
}
break;
case TM_Ok:
break;
default:
{
table_endscan(heapScan);
elog(ERROR, "unexpected tuple operation result: %d", result);
}
break;
} }
row_decompressor_decompress_row(decompressor, NULL);
*chunk_status_changed = true; *chunk_status_changed = true;
} }
if (scankeys) if (scankeys)

View File

@ -1,5 +1,3 @@
unused step name: IN1
unused step name: INc
unused step name: S1 unused step name: S1
unused step name: SA unused step name: SA
unused step name: SC1 unused step name: SC1
@ -60,3 +58,485 @@ total_chunks|number_compressed_chunks
3| 3 3| 3
(1 row) (1 row)
starting permutation: IN1 INc CA1 CAc SH SS DEL1 UPD1 DELc UPDc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200;
step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; <waiting ...>
step DELc: COMMIT;
step UPD1: <... completed>
step UPDc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS UPD1 DEL1 UPDc DELc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200;
step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; <waiting ...>
step UPDc: COMMIT;
step DEL1: <... completed>
step DELc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS DEL1 UPDrr DELc UPDc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200;
step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; <waiting ...>
step DELc: COMMIT;
step UPDrr: <... completed>
ERROR: could not serialize access due to concurrent update
step UPDc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS UPD1 DELrr UPDc DELc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200;
step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; <waiting ...>
step UPDc: COMMIT;
step DELrr: <... completed>
ERROR: could not serialize access due to concurrent update
step DELc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 4
(1 row)
starting permutation: IN1 INc CA1 CAc SH SS DELrr UPDrr DELc UPDc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200;
step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; <waiting ...>
step DELc: COMMIT;
step UPDrr: <... completed>
ERROR: could not serialize access due to concurrent update
step UPDc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS UPDrr DELrr UPDc DELc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200;
step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; <waiting ...>
step UPDc: COMMIT;
step DELrr: <... completed>
ERROR: could not serialize access due to concurrent update
step DELc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 4
(1 row)
starting permutation: IN1 INc CA1 CAc SH SS DELrr UPDs DELc UPDc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200;
step UPDs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200; <waiting ...>
step DELc: COMMIT;
step UPDs: <... completed>
ERROR: could not serialize access due to concurrent update
step UPDc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS UPDrr DELs UPDc DELc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200;
step DELs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200; <waiting ...>
step UPDc: COMMIT;
step DELs: <... completed>
ERROR: could not serialize access due to concurrent update
step DELc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 4
(1 row)
starting permutation: IN1 INc CA1 CAc SH SS DELs UPDrr DELc UPDc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step DELs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200;
step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; <waiting ...>
step DELc: COMMIT;
step UPDrr: <... completed>
ERROR: could not serialize access due to concurrent update
step UPDc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
(0 rows)
starting permutation: IN1 INc CA1 CAc SH SS UPDs DELrr UPDc DELc SH SS
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100);
step INc: COMMIT;
step CA1:
BEGIN;
SELECT
CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress
FROM show_chunks('ts_device_table') AS ch
ORDER BY ch::text;
compress
--------
t
t
t
(3 rows)
step CAc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 100
(1 row)
step UPDs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200;
step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; <waiting ...>
step UPDc: COMMIT;
step DELrr: <... completed>
ERROR: could not serialize access due to concurrent update
step DELc: COMMIT;
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
total_chunks|number_compressed_chunks
------------+------------------------
3| 3
(1 row)
step SS: SELECT * FROM ts_device_table WHERE location = 200;
time|device|location|value
----+------+--------+-----
1| 1| 200| 4
(1 row)

View File

@ -36,10 +36,14 @@ step "INc" { COMMIT; }
session "DEL" session "DEL"
step "DEL1" { BEGIN; DELETE from ts_device_table WHERE location = 200; } step "DEL1" { BEGIN; DELETE from ts_device_table WHERE location = 200; }
step "DELrr" { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; }
step "DELs" { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200; }
step "DELc" { COMMIT; } step "DELc" { COMMIT; }
session "UPD" session "UPD"
step "UPD1" { BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; } step "UPD1" { BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; }
step "UPDrr" { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; }
step "UPDs" { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200; }
step "UPDc" { COMMIT; } step "UPDc" { COMMIT; }
session "SI" session "SI"
@ -51,6 +55,7 @@ step "S1" { SELECT count(*) from ts_device_table; }
step "SC1" { SELECT (count_chunktable(ch)).* FROM show_chunks('ts_device_table') AS ch ORDER BY ch::text LIMIT 1; } step "SC1" { SELECT (count_chunktable(ch)).* FROM show_chunks('ts_device_table') AS ch ORDER BY ch::text LIMIT 1; }
step "SH" { SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); } step "SH" { SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); }
step "SA" { SELECT * FROM ts_device_table; } step "SA" { SELECT * FROM ts_device_table; }
step "SS" { SELECT * FROM ts_device_table WHERE location = 200; }
session "CompressAll" session "CompressAll"
step "CA1" { step "CA1" {
@ -63,4 +68,16 @@ step "CA1" {
step "CAc" { COMMIT; } step "CAc" { COMMIT; }
# Test concurrent update/delete operations # Test concurrent update/delete operations
permutation "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH" "UPD1" "UPDc" "SH" permutation "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH" "UPD1" "UPDc" "SH"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPD1" "DELc" "UPDc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DEL1" "UPDc" "DELc" "SH" "SS"
#Test interaction with upper isolation levels
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPDrr" "DELc" "UPDc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DELrr" "UPDc" "DELc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELrr" "UPDrr" "DELc" "UPDc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDrr" "DELrr" "UPDc" "DELc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELrr" "UPDs" "DELc" "UPDc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDrr" "DELs" "UPDc" "DELc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELs" "UPDrr" "DELc" "UPDc" "SH" "SS"
permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDs" "DELrr" "UPDc" "DELc" "SH" "SS"