diff --git a/src/ts_catalog/catalog.c b/src/ts_catalog/catalog.c index 514c2ec05..c25210f4a 100644 --- a/src/ts_catalog/catalog.c +++ b/src/ts_catalog/catalog.c @@ -808,3 +808,130 @@ ts_catalog_scan_all(CatalogTable table, int indexid, ScanKeyData *scankey, int n ts_scanner_scan(&scanctx); } + +extern TSDLLEXPORT ResultRelInfo * +ts_catalog_open_indexes(Relation heapRel) +{ + ResultRelInfo *resultRelInfo; + + resultRelInfo = makeNode(ResultRelInfo); + resultRelInfo->ri_RangeTableIndex = 0; /* dummy */ + resultRelInfo->ri_RelationDesc = heapRel; + resultRelInfo->ri_TrigDesc = NULL; /* we don't fire triggers */ + + ExecOpenIndices(resultRelInfo, false); + + return resultRelInfo; +} + +extern TSDLLEXPORT void +ts_catalog_close_indexes(ResultRelInfo *indstate) +{ + ExecCloseIndices(indstate); + pfree(indstate); +} + +/* + * Copied verbatim from postgres source CatalogIndexInsert which is static + * in postgres source code. + * We need to have this function available because we do not want to use + * simple_heap_insert which is used by CatalogTupleInsert which would + * prevent using bulk inserts. + */ +extern TSDLLEXPORT void +ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple) +{ + int i; + int numIndexes; + RelationPtr relationDescs; + Relation heapRelation; + TupleTableSlot *slot; + IndexInfo **indexInfoArray; + Datum values[INDEX_MAX_KEYS]; + bool isnull[INDEX_MAX_KEYS]; + + /* + * HOT update does not require index inserts. But with asserts enabled we + * want to check that it'd be legal to currently insert into the + * table/index. + */ +#ifndef USE_ASSERT_CHECKING + if (HeapTupleIsHeapOnly(heapTuple)) + return; +#endif + + /* + * Get information from the state structure. Fall out if nothing to do. + */ + numIndexes = indstate->ri_NumIndices; + if (numIndexes == 0) + return; + relationDescs = indstate->ri_IndexRelationDescs; + indexInfoArray = indstate->ri_IndexRelationInfo; + heapRelation = indstate->ri_RelationDesc; + + /* Need a slot to hold the tuple being examined */ + slot = MakeSingleTupleTableSlot(RelationGetDescr(heapRelation), &TTSOpsHeapTuple); + ExecStoreHeapTuple(heapTuple, slot, false); + + /* + * for each index, form and insert the index tuple + */ + for (i = 0; i < numIndexes; i++) + { + IndexInfo *indexInfo; + Relation index; + + indexInfo = indexInfoArray[i]; + index = relationDescs[i]; + + /* If the index is marked as read-only, ignore it */ + if (!indexInfo->ii_ReadyForInserts) + continue; + + /* + * Expressional and partial indexes on system catalogs are not + * supported, nor exclusion constraints, nor deferred uniqueness + */ + Assert(indexInfo->ii_Expressions == NIL); + Assert(indexInfo->ii_Predicate == NIL); + Assert(indexInfo->ii_ExclusionOps == NULL); + Assert(index->rd_index->indimmediate); + Assert(indexInfo->ii_NumIndexKeyAttrs != 0); + + /* see earlier check above */ +#ifdef USE_ASSERT_CHECKING + if (HeapTupleIsHeapOnly(heapTuple)) + { + Assert(!ReindexIsProcessingIndex(RelationGetRelid(index))); + continue; + } +#endif /* USE_ASSERT_CHECKING */ + + /* + * FormIndexDatum fills in its values and isnull parameters with the + * appropriate values for the column(s) of the index. + */ + FormIndexDatum(indexInfo, + slot, + NULL, /* no expression eval to do */ + values, + isnull); + + /* + * The index AM does the rest. + */ + index_insert(index, /* index relation */ + values, /* array of index Datums */ + isnull, /* is-null flags */ + &(heapTuple->t_self), /* tid of heap tuple */ + heapRelation, + index->rd_index->indisunique ? UNIQUE_CHECK_YES : UNIQUE_CHECK_NO, +#if PG14_GE + false, +#endif + indexInfo); + } + + ExecDropSingleTupleTableSlot(slot); +} diff --git a/src/ts_catalog/catalog.h b/src/ts_catalog/catalog.h index bc0408a40..0538b6cc3 100644 --- a/src/ts_catalog/catalog.h +++ b/src/ts_catalog/catalog.h @@ -1473,6 +1473,9 @@ extern TSDLLEXPORT void ts_catalog_update(Relation rel, HeapTuple tuple); extern TSDLLEXPORT void ts_catalog_delete_tid_only(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_delete_tid(Relation rel, ItemPointer tid); extern TSDLLEXPORT void ts_catalog_invalidate_cache(Oid catalog_relid, CmdType operation); +extern TSDLLEXPORT ResultRelInfo *ts_catalog_open_indexes(Relation heapRel); +extern TSDLLEXPORT void ts_catalog_close_indexes(ResultRelInfo *indstate); +extern TSDLLEXPORT void ts_catalog_index_insert(ResultRelInfo *indstate, HeapTuple heapTuple); bool TSDLLEXPORT ts_catalog_scan_one(CatalogTable table, int indexid, ScanKeyData *scankey, int num_keys, tuple_found_func tuple_found, LOCKMODE lockmode, diff --git a/tsl/src/compression/compression.c b/tsl/src/compression/compression.c index 9ea6e617a..88f136e20 100644 --- a/tsl/src/compression/compression.c +++ b/tsl/src/compression/compression.c @@ -1503,6 +1503,7 @@ typedef struct RowDecompressor TupleDesc out_desc; Relation out_rel; + ResultRelInfo *indexstate; CommandId mycid; BulkInsertState bistate; @@ -1545,6 +1546,7 @@ build_decompressor(Relation in_rel, Relation out_rel) .out_desc = out_desc, .out_rel = out_rel, + .indexstate = ts_catalog_open_indexes(out_rel), .mycid = GetCurrentCommandId(true), .bistate = GetBulkInsertState(), @@ -1587,7 +1589,7 @@ decompress_chunk(Oid in_table, Oid out_table) * We may as well allow readers to keep reading the compressed data while * we are compressing, so we only take an ExclusiveLock instead of AccessExclusive. */ - Relation out_rel = table_open(out_table, ExclusiveLock); + Relation out_rel = table_open(out_table, AccessExclusiveLock); Relation in_rel = relation_open(in_table, ExclusiveLock); RowDecompressor decompressor = build_decompressor(in_rel, out_rel); @@ -1611,24 +1613,7 @@ decompress_chunk(Oid in_table, Oid out_table) FreeBulkInsertState(decompressor.bistate); MemoryContextDelete(decompressor.per_compressed_row_ctx); - - /* Recreate all indexes on out rel, we already have an exclusive lock on it, - * so the strong locks taken by reindex_relation shouldn't matter. */ -#if PG14_LT - int options = 0; -#else - ReindexParams params = { 0 }; - ReindexParams *options = ¶ms; -#endif - - /* The reindex_relation() function creates an AccessExclusiveLock on the - * chunk index (if present). After calling this function, concurrent - * SELECTs have to wait until the index lock is released. When no - * index is present concurrent SELECTs can be still performed in - * parallel. */ - DEBUG_WAITPOINT("decompress_chunk_impl_before_reindex"); - reindex_relation(out_table, 0, options); - DEBUG_WAITPOINT("decompress_chunk_impl_after_reindex"); + ts_catalog_close_indexes(decompressor.indexstate); table_close(out_rel, NoLock); table_close(in_rel, NoLock); @@ -1767,6 +1752,8 @@ row_decompressor_decompress_row(RowDecompressor *decompressor) 0 /*=options*/, decompressor->bistate); + ts_catalog_index_insert(decompressor->indexstate, decompressed_tuple); + heap_freetuple(decompressed_tuple); wrote_data = true; } diff --git a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out index b1e7d0ddc..1eab298fc 100644 --- a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out +++ b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query.out @@ -1,80 +1,27 @@ -Parsed test spec with 3 sessions +Parsed test spec with 2 sessions -starting permutation: s3_lock_decompression_locks s2_read_sensor_data s1_decompress s2_read_sensor_data s3_unlock_decompression_before_reindex_lock s2_read_sensor_data s3_unlock_decompression_after_reindex_lock +starting permutation: s2_read_sensor_data s1_begin s1_decompress s2_read_sensor_data s1_commit s2_read_sensor_data compression_status ------------------ Compressed (1 row) -step s3_lock_decompression_locks: - -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this - -- point parallel SELECTs should be possible. - SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); - - -- This waitpoint is defined after all locks for the decompression and the deletion - -- of the compressed chunk are requested. - SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); - -debug_waitpoint_enable ----------------------- - -(1 row) - -debug_waitpoint_enable ----------------------- - -(1 row) - step s2_read_sensor_data: SELECT FROM sensor_data; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +compression_status +------------------ +Compressed +(1 row) + +step s1_begin: + BEGIN; step s1_decompress: SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; SELECT compression_status FROM chunk_compression_stats('sensor_data'); - -step s2_read_sensor_data: - SELECT FROM sensor_data; -step s3_unlock_decompression_before_reindex_lock: - -- Ensure that we are waiting on our debug waitpoint - -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) - -- decompress_chunk_impl_before_reindex = 3966149665. - SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; - - SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); - -locktype|mode |granted| objid ---------+---------+-------+---------- -advisory|ShareLock|f |3966149665 -(1 row) - -debug_waitpoint_release ------------------------ - -(1 row) - -step s2_read_sensor_data: - SELECT FROM sensor_data; - -step s3_unlock_decompression_after_reindex_lock: - -- Ensure that we are waiting on our debug waitpoint - -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) - -- decompress_chunk_impl_after_reindex = 1858017383. - SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; - - SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); - -locktype|mode |granted| objid ---------+---------+-------+---------- -advisory|ShareLock|f |1858017383 -(1 row) - -debug_waitpoint_release ------------------------ - -(1 row) - -step s1_decompress: <... completed> count ----- 1 @@ -85,4 +32,25 @@ compression_status Uncompressed (1 row) +step s2_read_sensor_data: + SELECT FROM sensor_data; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +step s1_commit: + COMMIT; + step s2_read_sensor_data: <... completed> +compression_status +------------------ +Uncompressed +(1 row) + +step s2_read_sensor_data: + SELECT FROM sensor_data; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); + +compression_status +------------------ +Uncompressed +(1 row) + diff --git a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out index 334ea727d..a445d4dd2 100644 --- a/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out +++ b/tsl/test/isolation/expected/decompression_chunk_and_parallel_query_wo_idx.out @@ -31,7 +31,17 @@ step s2_read_sensor_data: step s1_decompress: SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; SELECT compression_status FROM chunk_compression_stats('sensor_data'); - + +count +----- + 1 +(1 row) + +compression_status +------------------ +Uncompressed +(1 row) + step s2_read_sensor_data: SELECT FROM sensor_data; @@ -43,10 +53,9 @@ step s3_unlock_decompression_before_reindex_lock: SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); -locktype|mode |granted| objid ---------+---------+-------+---------- -advisory|ShareLock|f |3966149665 -(1 row) +locktype|mode|granted|objid +--------+----+-------+----- +(0 rows) debug_waitpoint_release ----------------------- @@ -64,27 +73,15 @@ step s3_unlock_decompression_after_reindex_lock: SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); -locktype|mode |granted| objid ---------+---------+-------+---------- -advisory|ShareLock|f |1858017383 -(1 row) +locktype|mode|granted|objid +--------+----+-------+----- +(0 rows) debug_waitpoint_release ----------------------- (1 row) -step s1_decompress: <... completed> -count ------ - 1 -(1 row) - -compression_status ------------------- -Uncompressed -(1 row) - step s2_read_sensor_data: SELECT FROM sensor_data; diff --git a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in index 7b2697874..9e37cfdbf 100644 --- a/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in +++ b/tsl/test/isolation/specs/decompression_chunk_and_parallel_query.in @@ -10,11 +10,6 @@ ### setup { - CREATE OR REPLACE FUNCTION debug_waitpoint_enable(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT - AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_enable'; - - CREATE OR REPLACE FUNCTION debug_waitpoint_release(TEXT) RETURNS VOID LANGUAGE C VOLATILE STRICT - AS '@TS_MODULE_PATHNAME@', 'ts_debug_point_release'; CREATE TABLE sensor_data ( time timestamptz not null, @@ -57,56 +52,25 @@ teardown { session "s1" +step "s1_begin" { + BEGIN; +} + step "s1_decompress" { SELECT count(*) FROM (SELECT decompress_chunk(i, if_compressed => true) FROM show_chunks('sensor_data') i) i; SELECT compression_status FROM chunk_compression_stats('sensor_data'); } +step "s1_commit" { + COMMIT; +} + session "s2" step "s2_read_sensor_data" { SELECT FROM sensor_data; + SELECT compression_status FROM chunk_compression_stats('sensor_data'); } -session "s3" +permutation "s2_read_sensor_data" "s1_begin" "s1_decompress" "s2_read_sensor_data" "s1_commit" "s2_read_sensor_data" -step "s3_lock_decompression_locks" { - -- This waitpoint is defined before the decompressed chunk is re-indexed. Up to this - -- point parallel SELECTs should be possible. - SELECT debug_waitpoint_enable('decompress_chunk_impl_before_reindex'); - - -- This waitpoint is defined after all locks for the decompression and the deletion - -- of the compressed chunk are requested. - SELECT debug_waitpoint_enable('decompress_chunk_impl_after_reindex'); -} - -step "s3_unlock_decompression_before_reindex_lock" { - -- Ensure that we are waiting on our debug waitpoint - -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) - -- decompress_chunk_impl_before_reindex = 3966149665. - SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; - - SELECT debug_waitpoint_release('decompress_chunk_impl_before_reindex'); -} - -step "s3_unlock_decompression_after_reindex_lock" { - -- Ensure that we are waiting on our debug waitpoint - -- Note: The OIDs of the advisory locks are based on the hash value of the lock name (see debug_point_init()) - -- decompress_chunk_impl_after_reindex = 1858017383. - SELECT locktype, mode, granted, objid FROM pg_locks WHERE not granted AND locktype = 'advisory' ORDER BY relation, locktype, mode, granted; - - SELECT debug_waitpoint_release('decompress_chunk_impl_after_reindex'); -} - -# Desired execution: -# s3_lock_decompression_locks - Locks the decompression waitpoints. -# s2_read_sensor_data - Read the compressed chunk. This should be executed without blocking. -# s1_decompress - Start the decompression and block on the first waitpoint. -# s2_read_sensor_data - Read the compressed chunk again. This should be still possible without blocking. -# s3_unlock_decompression_before_reindex_lock - Releases the decompress_chunk_impl_before_reindex waitpoint. -# s1_decompress continues - The chunk is reindexed and the index is locked. -# s2_read_sensor_data - Read the chunk. This blocks due to the locked index. -# s3_unlock_decompression_after_reindex_lock - Releases the decompress_chunk_impl_after_compressed_chunk_lock. -# s1_decompress continues - Finishes the decompression operation and releases the locks. -# s2_read_sensor_data continues. -permutation "s3_lock_decompression_locks" "s2_read_sensor_data" "s1_decompress" "s2_read_sensor_data" "s3_unlock_decompression_before_reindex_lock" "s2_read_sensor_data" "s3_unlock_decompression_after_reindex_lock"