diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index caacd108d..f11359898 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -2265,8 +2265,13 @@ build_update_delete_scankeys(RowDecompressor *decompressor, List *filters, int * * This method will: * 1.scan compressed chunk * 2.decompress the row - * 3.insert decompressed rows to uncompressed chunk - * 4.delete this row from compressed chunk + * 3.delete this row from compressed chunk + * 4.insert decompressed rows to uncompressed chunk + * + * Return value: + * if all 4 steps defined above pass set chunk_status_changed to true and return true + * if step 4 fails return false. Step 3 will fail if there are conflicting concurrent operations on + * same chunk. */ static bool decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num_scankeys, @@ -2307,7 +2312,6 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num decompressor->compressed_datums, decompressor->compressed_is_nulls); - row_decompressor_decompress_row(decompressor, NULL); TM_FailureData tmfd; TM_Result result; result = table_tuple_delete(decompressor->in_rel, @@ -2319,19 +2323,49 @@ decompress_batches(RowDecompressor *decompressor, ScanKeyData *scankeys, int num &tmfd, false); - if (result == TM_Updated || result == TM_Deleted) + switch (result) { - if (IsolationUsesXactSnapshot()) - ereport(ERROR, - (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), - errmsg("could not serialize access due to concurrent update"))); - return false; - } - if (result == TM_Invisible) - { - elog(ERROR, "attempted to lock invisible tuple"); - return false; + /* If the tuple has been already deleted, most likely somebody + * decompressed the tuple already */ + case TM_Deleted: + { + if (IsolationUsesXactSnapshot()) + { + /* For Repeatable Read isolation level report error */ + table_endscan(heapScan); + ereport(ERROR, + (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), + errmsg("could not serialize access due to concurrent update"))); + } + continue; + } + break; + /* + * If another transaction is updating the compressed data, + * we have to abort the transaction to keep consistency. + */ + case TM_Updated: + { + table_endscan(heapScan); + elog(ERROR, "tuple concurrently updated"); + } + break; + case TM_Invisible: + { + table_endscan(heapScan); + elog(ERROR, "attempted to lock invisible tuple"); + } + break; + case TM_Ok: + break; + default: + { + table_endscan(heapScan); + elog(ERROR, "unexpected tuple operation result: %d", result); + } + break; } + row_decompressor_decompress_row(decompressor, NULL); *chunk_status_changed = true; } if (scankeys) diff --git a/tsl/test/isolation/expected/compression_dml_iso.out b/tsl/test/isolation/expected/compression_dml_iso.out index 2f2f3e7fe..4313b22e5 100644 --- a/tsl/test/isolation/expected/compression_dml_iso.out +++ b/tsl/test/isolation/expected/compression_dml_iso.out @@ -1,5 +1,3 @@ -unused step name: IN1 -unused step name: INc unused step name: S1 unused step name: SA unused step name: SC1 @@ -60,3 +58,485 @@ total_chunks|number_compressed_chunks 3| 3 (1 row) + +starting permutation: IN1 INc CA1 CAc SH SS DEL1 UPD1 DELc UPDc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPD1: <... completed> +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS UPD1 DEL1 UPDc DELc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DEL1: <... completed> +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS DEL1 UPDrr DELc UPDc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DEL1: BEGIN; DELETE from ts_device_table WHERE location = 200; +step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPDrr: <... completed> +ERROR: could not serialize access due to concurrent update +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS UPD1 DELrr UPDc DELc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPD1: BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DELrr: <... completed> +ERROR: could not serialize access due to concurrent update +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 4 +(1 row) + + +starting permutation: IN1 INc CA1 CAc SH SS DELrr UPDrr DELc UPDc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; +step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPDrr: <... completed> +ERROR: could not serialize access due to concurrent update +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS UPDrr DELrr UPDc DELc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DELrr: <... completed> +ERROR: could not serialize access due to concurrent update +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 4 +(1 row) + + +starting permutation: IN1 INc CA1 CAc SH SS DELrr UPDs DELc UPDc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; +step UPDs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPDs: <... completed> +ERROR: could not serialize access due to concurrent update +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS UPDrr DELs UPDc DELc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DELs: <... completed> +ERROR: could not serialize access due to concurrent update +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 4 +(1 row) + + +starting permutation: IN1 INc CA1 CAc SH SS DELs UPDrr DELc UPDc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step DELs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200; +step UPDrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELc: COMMIT; +step UPDrr: <... completed> +ERROR: could not serialize access due to concurrent update +step UPDc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- +(0 rows) + + +starting permutation: IN1 INc CA1 CAc SH SS UPDs DELrr UPDc DELc SH SS +step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100); +step INc: COMMIT; +step CA1: + BEGIN; + SELECT + CASE WHEN compress_chunk(ch) IS NOT NULL THEN true ELSE false END AS compress + FROM show_chunks('ts_device_table') AS ch + ORDER BY ch::text; + +compress +-------- +t +t +t +(3 rows) + +step CAc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 100 +(1 row) + +step UPDs: BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200; +step DELrr: BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; +step UPDc: COMMIT; +step DELrr: <... completed> +ERROR: could not serialize access due to concurrent update +step DELc: COMMIT; +step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); +total_chunks|number_compressed_chunks +------------+------------------------ + 3| 3 +(1 row) + +step SS: SELECT * FROM ts_device_table WHERE location = 200; +time|device|location|value +----+------+--------+----- + 1| 1| 200| 4 +(1 row) + diff --git a/tsl/test/isolation/specs/compression_dml_iso.spec b/tsl/test/isolation/specs/compression_dml_iso.spec index f4cea37e2..0c7184b2d 100644 --- a/tsl/test/isolation/specs/compression_dml_iso.spec +++ b/tsl/test/isolation/specs/compression_dml_iso.spec @@ -36,10 +36,14 @@ step "INc" { COMMIT; } session "DEL" step "DEL1" { BEGIN; DELETE from ts_device_table WHERE location = 200; } +step "DELrr" { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; DELETE from ts_device_table WHERE location = 200; } +step "DELs" { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; DELETE from ts_device_table WHERE location = 200; } step "DELc" { COMMIT; } session "UPD" step "UPD1" { BEGIN; UPDATE ts_device_table SET value = 4 WHERE location = 200; } +step "UPDrr" { BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; UPDATE ts_device_table SET value = 4 WHERE location = 200; } +step "UPDs" { BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE; UPDATE ts_device_table SET value = 4 WHERE location = 200; } step "UPDc" { COMMIT; } session "SI" @@ -51,6 +55,7 @@ step "S1" { SELECT count(*) from ts_device_table; } step "SC1" { SELECT (count_chunktable(ch)).* FROM show_chunks('ts_device_table') AS ch ORDER BY ch::text LIMIT 1; } step "SH" { SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table'); } step "SA" { SELECT * FROM ts_device_table; } +step "SS" { SELECT * FROM ts_device_table WHERE location = 200; } session "CompressAll" step "CA1" { @@ -63,4 +68,16 @@ step "CA1" { step "CAc" { COMMIT; } # Test concurrent update/delete operations -permutation "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH" "UPD1" "UPDc" "SH" \ No newline at end of file +permutation "CA1" "CAc" "SH" "I1" "Ic" "SH" "UPD1" "UPDc" "SH" "DEL1" "DELc" "SH" "UPD1" "UPDc" "SH" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPD1" "DELc" "UPDc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DEL1" "UPDc" "DELc" "SH" "SS" + +#Test interaction with upper isolation levels +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DEL1" "UPDrr" "DELc" "UPDc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPD1" "DELrr" "UPDc" "DELc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELrr" "UPDrr" "DELc" "UPDc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDrr" "DELrr" "UPDc" "DELc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELrr" "UPDs" "DELc" "UPDc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDrr" "DELs" "UPDc" "DELc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "DELs" "UPDrr" "DELc" "UPDc" "SH" "SS" +permutation "IN1" "INc" "CA1" "CAc" "SH" "SS" "UPDs" "DELrr" "UPDc" "DELc" "SH" "SS"