Don't reindex relation during decompress_chunk

Reindexing a relation requires AccessExclusiveLock which prevents
queries on that chunk. This patch changes decompress_chunk to update
the index during decompression instead of reindexing. This patch
does not change the required locks as there are locking adjustments
needed in other places to make it safe to weaken that lock.
This commit is contained in:
Sven Klemm 2023-02-05 19:23:14 +01:00 committed by Sven Klemm
parent 20ea406616
commit c02cb76b38
6 changed files with 195 additions and 149 deletions

View File

@ -808,3 +808,130 @@ ts_catalog_scan_all(CatalogTable table, int indexid, ScanKeyData *scankey, int n
ts_scanner_scan(&scanctx);
}
extern TSDLLEXPORT ResultRelInfo *
ts_catalog_open_indexes(Relation heapRel)
{
ResultRelInfo *resultRelInfo;
resultRelInfo = makeNode(ResultRelInfo);
resultRelInfo->ri_RangeTableIndex = 0; /* dummy */
resultRelInfo->ri_RelationDesc = heapRel;
resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */
ExecOpenIndices(resultRelInfo, false);
return resultRelInfo;
}
extern TSDLLEXPORT void
ts_catalog_close_indexes(ResultRelInfo *indstate)
{
ExecCloseIndices(indstate);
pfree(indstate);
}
/*
* Copied verbatim from postgres source CatalogIndexInsert which is static
* in postgres source code.
* We need to have this function available because we do not want to use
* simple_heap_insert which is used by CatalogTupleInsert which would
* prevent using bulk inserts.
*/
extern TSDLLEXPORT void
ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple)
{
int i;
int numIndexes;
RelationPtr relationDescs;
Relation heapRelation;
TupleTableSlot *slot;
IndexInfo **indexInfoArray;
Datum values[INDEX_MAX_KEYS];
bool isnull[INDEX_MAX_KEYS];
/*
* HOT update does not require index inserts. But with asserts enabled we
* want to check that it'd be legal to currently insert into the
* table/index.
*/
#ifndef USE_ASSERT_CHECKING
if (HeapTupleIsHeapOnly(heapTuple))
return;
#endif
/*
* Get information from the state structure. Fall out if nothing to do.
*/
numIndexes = indstate->ri_NumIndices;
if (numIndexes == 0)
return;
relationDescs = indstate->ri_IndexRelationDescs;
indexInfoArray = indstate->ri_IndexRelationInfo;
heapRelation = indstate->ri_RelationDesc;
/* Need a slot to hold the tuple being examined */
slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation), &TTSOpsHeapTuple);
ExecStoreHeapTuple(heapTuple, slot, false);
/*
* for each index, form and insert the index tuple
*/
for (i = 0; i < numIndexes; i++)
{
IndexInfo *indexInfo;
Relation index;
indexInfo = indexInfoArray[i];
index = relationDescs[i];
/* If the index is marked as read-only, ignore it */
if (!indexInfo->ii_ReadyForInserts)
continue;
/*
* Expressional and partial indexes on system catalogs are not
* supported, nor exclusion constraints, nor deferred uniqueness
*/
Assert(indexInfo->ii_Expressions == NIL);
Assert(indexInfo->ii_Predicate == NIL);
Assert(indexInfo->ii_ExclusionOps == NULL);
Assert(index->rd_index->indimmediate);
Assert(indexInfo->ii_NumIndexKeyAttrs != 0);
/* see earlier check above */
#ifdef USE_ASSERT_CHECKING
if (HeapTupleIsHeapOnly(heapTuple))
{
Assert(!ReindexIsProcessingIndex(RelationGetRelid(index)));
continue;
}
#endif /* USE_ASSERT_CHECKING */
/*
* FormIndexDatum fills in its values and isnull parameters with the
* appropriate values for the column(s) of the index.
*/
FormIndexDatum(indexInfo,
slot,
NULL, /* no expression eval to do */
values,
isnull);
/*
* The index AM does the rest.
*/
index_insert(index, /* index relation */
values, /* array of index Datums */
isnull, /* is-null flags */
&(heapTuple->t_self), /* tid of heap tuple */
heapRelation,
index->rd_index->indisunique ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO,
#if PG14_GE
false,
#endif
indexInfo);
}
ExecDropSingleTupleTableSlot(slot);
}

View File

@ -1473,6 +1473,9 @@ extern TSDLLEXPORT void ts_catalog_update(Relation rel, HeapTuple tuple);
extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid);
extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation);
extern TSDLLEXPORT ResultRelInfo *ts_catalog_open_indexes(Relation heapRel);
extern TSDLLEXPORT void ts_catalog_close_indexes(ResultRelInfo *indstate);
extern TSDLLEXPORT void ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple);
bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey,
int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode,

View File

@ -1503,6 +1503,7 @@ typedef struct RowDecompressor
TupleDesc out_desc;
Relation out_rel;
ResultRelInfo *indexstate;
CommandId mycid;
BulkInsertState bistate;
@ -1545,6 +1546,7 @@ build_decompressor(Relation in_rel, Relation out_rel)
.out_desc = out_desc,
.out_rel = out_rel,
.indexstate = ts_catalog_open_indexes(out_rel),
.mycid = GetCurrentCommandId(true),
.bistate = GetBulkInsertState(),
@ -1587,7 +1589,7 @@ decompress_chunk(Oid in_table, Oid out_table)
* We may as well allow readers to keep reading the compressed data while
* we are compressing, so we only take an ExclusiveLock instead of AccessExclusive.
*/
Relation out_rel = table_open(out_table, ExclusiveLock);
Relation out_rel = table_open(out_table, AccessExclusiveLock);
Relation in_rel = relation_open(in_table, ExclusiveLock);
RowDecompressor decompressor = build_decompressor(in_rel, out_rel);
@ -1611,24 +1613,7 @@ decompress_chunk(Oid in_table, Oid out_table)
FreeBulkInsertState(decompressor.bistate);
MemoryContextDelete(decompressor.per_compressed_row_ctx);
/* Recreate all indexes on out rel, we already have an exclusive lock on it,
* so the strong locks taken by reindex_relation shouldn't matter. */
#if PG14_LT
int options = 0;
#else
ReindexParams params = { 0 };
ReindexParams *options = &params;
#endif
/* The reindex_relation() function creates an AccessExclusiveLock on the
* chunk index (if present). After calling this function, concurrent
* SELECTs have to wait until the index lock is released. When no
* index is present concurrent SELECTs can be still performed in
* parallel. */
DEBUG_WAITPOINT("decompress_chunk_impl_before_reindex");
reindex_relation(out_table, 0, options);
DEBUG_WAITPOINT("decompress_chunk_impl_after_reindex");
ts_catalog_close_indexes(decompressor.indexstate);
table_close(out_rel, NoLock);
table_close(in_rel, NoLock);
@ -1767,6 +1752,8 @@ row_decompressor_decompress_row(RowDecompressor *decompressor)
0 /*=options*/,
decompressor->bistate);
ts_catalog_index_insert(decompressor->indexstate, decompressed_tuple);
heap_freetuple(decompressed_tuple);
wrote_data = true;
}

View File

@ -1,80 +1,27 @@
Parsed test spec with 3 sessions
Parsed test spec with 2 sessions
starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock
starting permutation: s2_read_sensor_data s1_begin s1_decompress s2_read_sensor_data s1_commit s2_read_sensor_data
compression_status
------------------
Compressed
(1 row)
step s3_lock_decompression_locks:
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
debug_waitpoint_enable
----------------------
(1 row)
debug_waitpoint_enable
----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
compression_status
------------------
Compressed
(1 row)
step s1_begin:
BEGIN;
step s1_decompress:
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
<waiting ...>
step s2_read_sensor_data:
SELECT FROM sensor_data;
step s3_unlock_decompression_before_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |3966149665
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
<waiting ...>
step s3_unlock_decompression_after_reindex_lock:
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |1858017383
(1 row)
debug_waitpoint_release
-----------------------
(1 row)
step s1_decompress: <... completed>
count
-----
1
@ -85,4 +32,25 @@ compression_status
Uncompressed
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
<waiting ...>
step s1_commit:
COMMIT;
step s2_read_sensor_data: <... completed>
compression_status
------------------
Uncompressed
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
compression_status
------------------
Uncompressed
(1 row)

View File

@ -31,7 +31,17 @@ step s2_read_sensor_data:
step s1_decompress:
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
<waiting ...>
count
-----
1
(1 row)
compression_status
------------------
Uncompressed
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;
@ -43,10 +53,9 @@ step s3_unlock_decompression_before_reindex_lock:
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |3966149665
(1 row)
locktype|mode|granted|objid
--------+----+-------+-----
(0 rows)
debug_waitpoint_release
-----------------------
@ -64,27 +73,15 @@ step s3_unlock_decompression_after_reindex_lock:
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
locktype|mode |granted| objid
--------+---------+-------+----------
advisory|ShareLock|f |1858017383
(1 row)
locktype|mode|granted|objid
--------+----+-------+-----
(0 rows)
debug_waitpoint_release
-----------------------
(1 row)
step s1_decompress: <... completed>
count
-----
1
(1 row)
compression_status
------------------
Uncompressed
(1 row)
step s2_read_sensor_data:
SELECT FROM sensor_data;

View File

@ -10,11 +10,6 @@
###
setup {
CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable';
CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT
AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release';
CREATE TABLE sensor_data (
time timestamptz not null,
@ -57,56 +52,25 @@ teardown {
session "s1"
step "s1_begin" {
BEGIN;
}
step "s1_decompress" {
SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
step "s1_commit" {
COMMIT;
}
session "s2"
step "s2_read_sensor_data" {
SELECT FROM sensor_data;
SELECT compression_status FROM chunk_compression_stats('sensor_data');
}
session "s3"
permutation "s2_read_sensor_data" "s1_begin" "s1_decompress" "s2_read_sensor_data" "s1_commit" "s2_read_sensor_data"
step "s3_lock_decompression_locks" {
-- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this
-- point parallel SELECTs should be possible.
SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex');
-- This waitpoint is defined after all locks for the decompression and the deletion
-- of the compressed chunk are requested.
SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex');
}
step "s3_unlock_decompression_before_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_before_reindex = 3966149665.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex');
}
step "s3_unlock_decompression_after_reindex_lock" {
-- Ensure that we are waiting on our debug waitpoint
-- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init())
-- decompress_chunk_impl_after_reindex = 1858017383.
SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted;
SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex');
}
# Desired execution:
# s3_lock_decompression_locks - Locks the decompression waitpoints.
# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking.
# s1_decompress - Start the decompression and block on the first waitpoint.
# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking.
# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint.
# s1_decompress continues - The chunk is reindexed and the index is locked.
# s2_read_sensor_data - Read the chunk. This blocks due to the locked index.
# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock.
# s1_decompress continues - Finishes the decompression operation and releases the locks.
# s2_read_sensor_data continues.
permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock"