From 32c1e3aef293bcece63550e09c3dd3496368a687 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Thu, 10 Feb 2022 16:42:56 +0100 Subject: [PATCH] Allow control of relation open/close in Scanner Make the Scanner module more flexible by allowing optional control over when the scanned relation is opened and closed. Relations can then remain open over multiple scans, which can improve performance and efficiency. Closes #2173 --- src/scan_iterator.c | 10 ++- src/scan_iterator.h | 22 ++++- src/scanner.c | 169 +++++++++++++++++++++++------------ src/scanner.h | 18 +++- test/expected/test_utils.out | 27 ++++++ test/sql/test_utils.sql | 13 +++ test/src/CMakeLists.txt | 10 ++- test/src/test_scanner.c | 106 ++++++++++++++++++++++ 8 files changed, 309 insertions(+), 66 deletions(-) create mode 100644 test/src/test_scanner.c diff --git a/src/scan_iterator.c b/src/scan_iterator.c index 268d80e38..1742f3d50 100644 --- a/src/scan_iterator.c +++ b/src/scan_iterator.c @@ -13,10 +13,18 @@ ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int index iterator->ctx.index = catalog_get_index(ts_catalog_get(), table, indexid); } +void +ts_scan_iterator_end(ScanIterator *iterator) +{ + ts_scanner_end_scan(&iterator->ctx); +} + void ts_scan_iterator_close(ScanIterator *iterator) { - ts_scanner_end_and_close_scan(&iterator->ctx); + /* Ending a scan is a no-op if already ended */ + ts_scanner_end_scan(&iterator->ctx); + ts_scanner_close(&iterator->ctx); } TSDLLEXPORT void diff --git a/src/scan_iterator.h b/src/scan_iterator.h index 8c800d697..099e781f9 100644 --- a/src/scan_iterator.h +++ b/src/scan_iterator.h @@ -28,13 +28,13 @@ typedef struct ScanIterator .ctx = { \ .internal = { \ .ended = true, \ - .closed = true, \ }, \ .table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \ .nkeys = 0, \ .scandirection = ForwardScanDirection, \ .lockmode = lock_mode, \ .result_mctx = mctx, \ + .flags = SCANNER_F_NOFLAGS, \ }, \ .scankey_mcxt = CurrentMemoryContext, \ } @@ -66,13 +66,13 @@ ts_scan_iterator_tupledesc(const ScanIterator *iterator) static inline MemoryContext ts_scan_iterator_get_result_memory_context(const ScanIterator *iterator) { - return iterator->tinfo->mctx; + return iterator->ctx.result_mctx; } static inline void * ts_scan_iterator_alloc_result(const ScanIterator *iterator, Size size) { - return ts_scanner_alloc_result(iterator->tinfo, size); + return MemoryContextAllocZero(iterator->ctx.result_mctx, size); } static inline void @@ -96,8 +96,15 @@ ts_scan_iterator_scan_key_reset(ScanIterator *iterator) iterator->ctx.nkeys = 0; } +static inline bool +ts_scan_iterator_is_started(ScanIterator *iterator) +{ + return iterator->ctx.internal.started; +} + void TSDLLEXPORT ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int indexid); +void TSDLLEXPORT ts_scan_iterator_end(ScanIterator *iterator); void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator); void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, @@ -110,6 +117,15 @@ void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumb */ void TSDLLEXPORT ts_scan_iterator_rescan(ScanIterator *iterator); +static inline void +ts_scan_iterator_start_or_restart_scan(ScanIterator *iterator) +{ + if (ts_scan_iterator_is_started(iterator)) + ts_scan_iterator_rescan(iterator); + else + ts_scan_iterator_start_scan(iterator); +} + /* You must use `ts_scan_iterator_close` if terminating this loop early */ #define ts_scanner_foreach(scan_iterator) \ for (ts_scan_iterator_start_scan((scan_iterator)); \ diff --git a/src/scanner.c b/src/scanner.c index 4507c803b..84e985617 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -40,15 +40,15 @@ typedef struct Scanner static Relation table_scanner_open(ScannerCtx *ctx) { - ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode); - return ctx->internal.tablerel; + ctx->tablerel = table_open(ctx->table, ctx->lockmode); + return ctx->tablerel; } static ScanDesc table_scanner_beginscan(ScannerCtx *ctx) { ctx->internal.scan.table_scan = - table_beginscan(ctx->internal.tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey); + table_beginscan(ctx->tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey); return ctx->internal.scan; } @@ -78,18 +78,18 @@ table_scanner_endscan(ScannerCtx *ctx) static void table_scanner_close(ScannerCtx *ctx) { - LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode; + LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode; - table_close(ctx->internal.tablerel, lockmode); + table_close(ctx->tablerel, lockmode); } /* Functions implementing index scans */ static Relation index_scanner_open(ScannerCtx *ctx) { - ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode); - ctx->internal.indexrel = index_open(ctx->index, ctx->lockmode); - return ctx->internal.indexrel; + ctx->tablerel = table_open(ctx->table, ctx->lockmode); + ctx->indexrel = index_open(ctx->index, ctx->lockmode); + return ctx->indexrel; } static ScanDesc @@ -98,7 +98,7 @@ index_scanner_beginscan(ScannerCtx *ctx) InternalScannerCtx *ictx = &ctx->internal; ictx->scan.index_scan = - index_beginscan(ictx->tablerel, ictx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys); + index_beginscan(ctx->tablerel, ctx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys); ictx->scan.index_scan->xs_want_itup = ctx->want_itup; index_rescan(ictx->scan.index_scan, ctx->scankey, ctx->nkeys, NULL, ctx->norderbys); return ictx->scan; @@ -132,9 +132,9 @@ index_scanner_endscan(ScannerCtx *ctx) static void index_scanner_close(ScannerCtx *ctx) { - LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode; - index_close(ctx->internal.indexrel, ctx->lockmode); - table_close(ctx->internal.tablerel, lockmode); + LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode; + index_close(ctx->indexrel, ctx->lockmode); + table_close(ctx->tablerel, lockmode); } /* @@ -181,27 +181,11 @@ ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey) scanner->rescan(ctx); } -/* - * Perform either a heap or index scan depending on the information in the - * ScannerCtx. ScannerCtx must be setup by caller with the proper information - * for the scan, including filters and callbacks for found tuples. - * - * Return the number of tuples that where found. - */ -TSDLLEXPORT void -ts_scanner_start_scan(ScannerCtx *ctx) +static void +prepare_scan(ScannerCtx *ctx) { - InternalScannerCtx *ictx = &ctx->internal; - TupleDesc tuple_desc; - Scanner *scanner; - - ictx->closed = false; - ictx->ended = false; - ictx->registered_snapshot = false; - - scanner = scanner_ctx_get_scanner(ctx); - - scanner->openscan(ctx); + ctx->internal.ended = false; + ctx->internal.registered_snapshot = false; if (ctx->snapshot == NULL) { @@ -230,20 +214,75 @@ ts_scanner_start_scan(ScannerCtx *ctx) * mode. */ ctx->snapshot = RegisterSnapshot(GetSnapshotData(SnapshotSelf)); - ictx->registered_snapshot = true; + ctx->internal.registered_snapshot = true; + } +} + +TSDLLEXPORT Relation +ts_scanner_open(ScannerCtx *ctx) +{ + Scanner *scanner = scanner_ctx_get_scanner(ctx); + + Assert(NULL == ctx->tablerel); + prepare_scan(ctx); + + return scanner->openscan(ctx); +} + +/* + * Start either a heap or index scan depending on the information in the + * ScannerCtx. ScannerCtx must be setup by caller with the proper information + * for the scan, including filters and callbacks for found tuples. + */ +TSDLLEXPORT void +ts_scanner_start_scan(ScannerCtx *ctx) +{ + InternalScannerCtx *ictx = &ctx->internal; + Scanner *scanner; + TupleDesc tuple_desc; + + if (ictx->started) + { + Assert(!ictx->ended); + Assert(ctx->tablerel); + Assert(OidIsValid(ctx->table)); + return; } + if (ctx->tablerel == NULL) + { + Assert(NULL == ctx->indexrel); + ts_scanner_open(ctx); + } + else + { + /* + * Relations already opened by caller: Only need to prepare the scan + * and set relation Oids so that the scanner knows which scanner + * implementation to use. Respect the auto-closing behavior set by the + * user, which is to auto close if unspecified. + */ + prepare_scan(ctx); + ctx->table = RelationGetRelid(ctx->tablerel); + + if (NULL != ctx->indexrel) + ctx->index = RelationGetRelid(ctx->indexrel); + } + + scanner = scanner_ctx_get_scanner(ctx); scanner->beginscan(ctx); - tuple_desc = RelationGetDescr(ictx->tablerel); + tuple_desc = RelationGetDescr(ctx->tablerel); - ictx->tinfo.scanrel = ictx->tablerel; + ictx->tinfo.scanrel = ctx->tablerel; ictx->tinfo.mctx = ctx->result_mctx == NULL ? CurrentMemoryContext : ctx->result_mctx; - ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ictx->tablerel)); + ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ctx->tablerel)); /* Call pre-scan handler, if any. */ if (ctx->prescan != NULL) ctx->prescan(ctx->data); + + ictx->started = true; } static inline bool @@ -252,6 +291,24 @@ ts_scanner_limit_reached(ScannerCtx *ctx) return ctx->limit > 0 && ctx->internal.tinfo.count >= ctx->limit; } +static void +scanner_cleanup(ScannerCtx *ctx) +{ + InternalScannerCtx *ictx = &ctx->internal; + + if (ictx->registered_snapshot) + { + UnregisterSnapshot(ctx->snapshot); + ctx->snapshot = NULL; + } + + if (NULL != ictx->tinfo.slot) + { + ExecDropSingleTupleTableSlot(ictx->tinfo.slot); + ictx->tinfo.slot = NULL; + } +} + TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx) { @@ -266,32 +323,24 @@ ts_scanner_end_scan(ScannerCtx *ctx) ctx->postscan(ictx->tinfo.count, ctx->data); scanner->endscan(ctx); + scanner_cleanup(ctx); ictx->ended = true; + ictx->started = false; } TSDLLEXPORT void -ts_scanner_end_and_close_scan(ScannerCtx *ctx) +ts_scanner_close(ScannerCtx *ctx) { - InternalScannerCtx *ictx = &ctx->internal; Scanner *scanner = scanner_ctx_get_scanner(ctx); - if (ictx->closed) + Assert(ctx->internal.ended); + + if (NULL != ctx->tablerel) { - Assert(ictx->ended); - return; + scanner->closescan(ctx); + ctx->tablerel = NULL; + ctx->indexrel = NULL; } - - ts_scanner_end_scan(ctx); - - if (ictx->registered_snapshot) - { - UnregisterSnapshot(ctx->snapshot); - ctx->snapshot = NULL; - } - - scanner->closescan(ctx); - ExecDropSingleTupleTableSlot(ictx->tinfo.slot); - ictx->closed = true; } TSDLLEXPORT TupleInfo * @@ -312,7 +361,7 @@ ts_scanner_next(ScannerCtx *ctx) TupleTableSlot *slot = ictx->tinfo.slot; Assert(ctx->snapshot); - ictx->tinfo.lockresult = table_tuple_lock(ictx->tablerel, + ictx->tinfo.lockresult = table_tuple_lock(ctx->tablerel, &(slot->tts_tid), ctx->snapshot, slot, @@ -329,7 +378,11 @@ ts_scanner_next(ScannerCtx *ctx) is_valid = ts_scanner_limit_reached(ctx) ? false : scanner->getnext(ctx); } - ts_scanner_end_and_close_scan(ctx); + if (!(ctx->flags & SCANNER_F_NOEND)) + ts_scanner_end_scan(ctx); + + if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) + ts_scanner_close(ctx); return NULL; } @@ -353,7 +406,11 @@ ts_scanner_scan(ScannerCtx *ctx) /* Call tuple_found handler. Abort the scan if the handler wants us to */ if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE) { - ts_scanner_end_and_close_scan(ctx); + if (!(ctx->flags & SCANNER_F_NOEND)) + ts_scanner_end_scan(ctx); + + if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) + ts_scanner_close(ctx); break; } } diff --git a/src/scanner.h b/src/scanner.h index cabf3ef5f..a74e52023 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -69,17 +69,24 @@ typedef union ScanDesc TableScanDesc table_scan; } ScanDesc; +typedef enum ScannerFlags +{ + SCANNER_F_NOFLAGS = 0x00, + SCANNER_F_KEEPLOCK = 0x01, + SCANNER_F_NOEND = 0x02, + SCANNER_F_NOEND_AND_NOCLOSE = 0x04 | SCANNER_F_NOEND, +} ScannerFlags; + /* * InternalScannerCtx is used for internal state during scanning and shouldn't * be initialized or touched by the user. */ typedef struct InternalScannerCtx { - Relation tablerel, indexrel; TupleInfo tinfo; ScanDesc scan; bool registered_snapshot; - bool closed; + bool started; bool ended; } InternalScannerCtx; @@ -89,11 +96,13 @@ typedef struct ScannerCtx /* Fields below this line can be initialized by the user */ Oid table; Oid index; + Relation tablerel; + Relation indexrel; ScanKey scankey; + int flags; int nkeys, norderbys, limit; /* Limit on number of tuples to return. 0 or * less means no limit */ bool want_itup; - bool keeplock; /* Keep the table lock after the scan finishes */ LOCKMODE lockmode; MemoryContext result_mctx; /* The memory context to allocate the result * on */ @@ -132,12 +141,13 @@ typedef struct ScannerCtx /* Performs an index scan or heap scan and returns the number of matching * tuples. */ +extern TSDLLEXPORT Relation ts_scanner_open(ScannerCtx *ctx); +extern TSDLLEXPORT void ts_scanner_close(ScannerCtx *ctx); extern TSDLLEXPORT int ts_scanner_scan(ScannerCtx *ctx); extern TSDLLEXPORT bool ts_scanner_scan_one(ScannerCtx *ctx, bool fail_if_not_found, const char *item_type); extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx); extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx); -extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx); extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey); extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx); extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti); diff --git a/test/expected/test_utils.out b/test/expected/test_utils.out index 23806aa18..bc1418952 100644 --- a/test/expected/test_utils.out +++ b/test/expected/test_utils.out @@ -64,6 +64,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT) RETURNS VOID AS :MODULE_PATHNAME, 'ts_test_error_injection' LANGUAGE C VOLATILE STRICT; +SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT test_error_injection('test_error'); test_error_injection ---------------------- @@ -92,3 +93,29 @@ SELECT test_error_injection('test_error'); (1 row) +-- Test Scanner +RESET ROLE; +CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID + AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; +SET ROLE :ROLE_DEFAULT_PERM_USER; +-- Create two chunks to scan in the test +CREATE TABLE hyper (time timestamptz, temp float); +SELECT create_hypertable('hyper', 'time'); +NOTICE: adding not-null constraint to column "time" + create_hypertable +-------------------- + (1,public,hyper,t) +(1 row) + +INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0); +SELECT test.scanner(); +NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_1_chunk" +NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_2_chunk" +NOTICE: 2. Scan with filter: "_timescaledb_internal._hyper_1_1_chunk" +NOTICE: 3. ReScan: "_timescaledb_internal._hyper_1_2_chunk" +NOTICE: 4. IndexScan: "_timescaledb_internal._hyper_1_2_chunk" + scanner +--------- + +(1 row) + diff --git a/test/sql/test_utils.sql b/test/sql/test_utils.sql index 865a738eb..6c63f349d 100644 --- a/test/sql/test_utils.sql +++ b/test/sql/test_utils.sql @@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT) RETURNS VOID AS :MODULE_PATHNAME, 'ts_test_error_injection' LANGUAGE C VOLATILE STRICT; +SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT test_error_injection('test_error'); @@ -65,3 +66,15 @@ SELECT test_error_injection('test_error'); SELECT debug_point_release('test_error'); SELECT test_error_injection('test_error'); + +-- Test Scanner +RESET ROLE; +CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID + AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; +SET ROLE :ROLE_DEFAULT_PERM_USER; + +-- Create two chunks to scan in the test +CREATE TABLE hyper (time timestamptz, temp float); +SELECT create_hypertable('hyper', 'time'); +INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0); +SELECT test.scanner(); diff --git a/test/src/CMakeLists.txt b/test/src/CMakeLists.txt index d4f7b5477..8b68e9844 100644 --- a/test/src/CMakeLists.txt +++ b/test/src/CMakeLists.txt @@ -1,5 +1,11 @@ -set(SOURCES adt_tests.c symbol_conflict.c test_time_to_internal.c - test_with_clause_parser.c test_time_utils.c test_utils.c) +set(SOURCES + adt_tests.c + symbol_conflict.c + test_time_to_internal.c + test_with_clause_parser.c + test_time_utils.c + test_utils.c + test_scanner.c) include(${PROJECT_SOURCE_DIR}/src/build-defs.cmake) diff --git a/test/src/test_scanner.c b/test/src/test_scanner.c new file mode 100644 index 000000000..5a0f452a2 --- /dev/null +++ b/test/src/test_scanner.c @@ -0,0 +1,106 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#include + +#include "scanner.h" +#include "scan_iterator.h" +#include "chunk.h" +#include "test_utils.h" + +TS_TEST_FN(ts_test_scanner) +{ + ScanIterator it; + Relation chunkrel; + int32 chunk_id[2] = { -1, -1 }; + int i = 0; + + /* Test pre-open relation */ + it = ts_chunk_scan_iterator_create(CurrentMemoryContext); + chunkrel = table_open(it.ctx.table, AccessShareLock); + it.ctx.tablerel = chunkrel; + + /* Explicit start scan to test that we can call it twice without + * issue. The loop will also call it */ + ts_scan_iterator_start_scan(&it); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "1. Scan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + + if (chunk_id[i] == -1 && i < lengthof(chunk_id)) + { + chunk_id[i] = fd.id; + i++; + } + } + + ts_scan_iterator_end(&it); + + /* Add a chunk filter and scan again */ + ts_scan_iterator_scan_key_init(&it, + Anum_chunk_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(chunk_id[0])); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, + "2. Scan with filter: \"%s.%s\"", + NameStr(fd.schema_name), + NameStr(fd.table_name)); + } + + /* Rescan */ + ts_scan_iterator_scan_key_reset(&it); + ts_scan_iterator_scan_key_init(&it, + Anum_chunk_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(chunk_id[1])); + ts_scan_iterator_rescan(&it); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "3. ReScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + } + + ts_scan_iterator_end(&it); + table_close(chunkrel, AccessShareLock); + + /* Do another scan, but an index scan this time */ + it.ctx.tablerel = NULL; + it.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "4. IndexScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + } + + ts_scan_iterator_close(&it); + + PG_RETURN_VOID(); +}