diff --git a/src/scan_iterator.c b/src/scan_iterator.c index 268d80e38..1742f3d50 100644 --- a/src/scan_iterator.c +++ b/src/scan_iterator.c @@ -13,10 +13,18 @@ ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int index iterator->ctx.index = catalog_get_index(ts_catalog_get(), table, indexid); } +void +ts_scan_iterator_end(ScanIterator *iterator) +{ + ts_scanner_end_scan(&iterator->ctx); +} + void ts_scan_iterator_close(ScanIterator *iterator) { - ts_scanner_end_and_close_scan(&iterator->ctx); + /* Ending a scan is a no-op if already ended */ + ts_scanner_end_scan(&iterator->ctx); + ts_scanner_close(&iterator->ctx); } TSDLLEXPORT void diff --git a/src/scan_iterator.h b/src/scan_iterator.h index 8c800d697..099e781f9 100644 --- a/src/scan_iterator.h +++ b/src/scan_iterator.h @@ -28,13 +28,13 @@ typedef struct ScanIterator .ctx = { \ .internal = { \ .ended = true, \ - .closed = true, \ }, \ .table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \ .nkeys = 0, \ .scandirection = ForwardScanDirection, \ .lockmode = lock_mode, \ .result_mctx = mctx, \ + .flags = SCANNER_F_NOFLAGS, \ }, \ .scankey_mcxt = CurrentMemoryContext, \ } @@ -66,13 +66,13 @@ ts_scan_iterator_tupledesc(const ScanIterator *iterator) static inline MemoryContext ts_scan_iterator_get_result_memory_context(const ScanIterator *iterator) { - return iterator->tinfo->mctx; + return iterator->ctx.result_mctx; } static inline void * ts_scan_iterator_alloc_result(const ScanIterator *iterator, Size size) { - return ts_scanner_alloc_result(iterator->tinfo, size); + return MemoryContextAllocZero(iterator->ctx.result_mctx, size); } static inline void @@ -96,8 +96,15 @@ ts_scan_iterator_scan_key_reset(ScanIterator *iterator) iterator->ctx.nkeys = 0; } +static inline bool +ts_scan_iterator_is_started(ScanIterator *iterator) +{ + return iterator->ctx.internal.started; +} + void TSDLLEXPORT ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int indexid); +void TSDLLEXPORT ts_scan_iterator_end(ScanIterator *iterator); void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator); void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, @@ -110,6 +117,15 @@ void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumb */ void TSDLLEXPORT ts_scan_iterator_rescan(ScanIterator *iterator); +static inline void +ts_scan_iterator_start_or_restart_scan(ScanIterator *iterator) +{ + if (ts_scan_iterator_is_started(iterator)) + ts_scan_iterator_rescan(iterator); + else + ts_scan_iterator_start_scan(iterator); +} + /* You must use `ts_scan_iterator_close` if terminating this loop early */ #define ts_scanner_foreach(scan_iterator) \ for (ts_scan_iterator_start_scan((scan_iterator)); \ diff --git a/src/scanner.c b/src/scanner.c index 4507c803b..84e985617 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -40,15 +40,15 @@ typedef struct Scanner static Relation table_scanner_open(ScannerCtx *ctx) { - ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode); - return ctx->internal.tablerel; + ctx->tablerel = table_open(ctx->table, ctx->lockmode); + return ctx->tablerel; } static ScanDesc table_scanner_beginscan(ScannerCtx *ctx) { ctx->internal.scan.table_scan = - table_beginscan(ctx->internal.tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey); + table_beginscan(ctx->tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey); return ctx->internal.scan; } @@ -78,18 +78,18 @@ table_scanner_endscan(ScannerCtx *ctx) static void table_scanner_close(ScannerCtx *ctx) { - LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode; + LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode; - table_close(ctx->internal.tablerel, lockmode); + table_close(ctx->tablerel, lockmode); } /* Functions implementing index scans */ static Relation index_scanner_open(ScannerCtx *ctx) { - ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode); - ctx->internal.indexrel = index_open(ctx->index, ctx->lockmode); - return ctx->internal.indexrel; + ctx->tablerel = table_open(ctx->table, ctx->lockmode); + ctx->indexrel = index_open(ctx->index, ctx->lockmode); + return ctx->indexrel; } static ScanDesc @@ -98,7 +98,7 @@ index_scanner_beginscan(ScannerCtx *ctx) InternalScannerCtx *ictx = &ctx->internal; ictx->scan.index_scan = - index_beginscan(ictx->tablerel, ictx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys); + index_beginscan(ctx->tablerel, ctx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys); ictx->scan.index_scan->xs_want_itup = ctx->want_itup; index_rescan(ictx->scan.index_scan, ctx->scankey, ctx->nkeys, NULL, ctx->norderbys); return ictx->scan; @@ -132,9 +132,9 @@ index_scanner_endscan(ScannerCtx *ctx) static void index_scanner_close(ScannerCtx *ctx) { - LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode; - index_close(ctx->internal.indexrel, ctx->lockmode); - table_close(ctx->internal.tablerel, lockmode); + LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode; + index_close(ctx->indexrel, ctx->lockmode); + table_close(ctx->tablerel, lockmode); } /* @@ -181,27 +181,11 @@ ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey) scanner->rescan(ctx); } -/* - * Perform either a heap or index scan depending on the information in the - * ScannerCtx. ScannerCtx must be setup by caller with the proper information - * for the scan, including filters and callbacks for found tuples. - * - * Return the number of tuples that where found. - */ -TSDLLEXPORT void -ts_scanner_start_scan(ScannerCtx *ctx) +static void +prepare_scan(ScannerCtx *ctx) { - InternalScannerCtx *ictx = &ctx->internal; - TupleDesc tuple_desc; - Scanner *scanner; - - ictx->closed = false; - ictx->ended = false; - ictx->registered_snapshot = false; - - scanner = scanner_ctx_get_scanner(ctx); - - scanner->openscan(ctx); + ctx->internal.ended = false; + ctx->internal.registered_snapshot = false; if (ctx->snapshot == NULL) { @@ -230,20 +214,75 @@ ts_scanner_start_scan(ScannerCtx *ctx) * mode. */ ctx->snapshot = RegisterSnapshot(GetSnapshotData(SnapshotSelf)); - ictx->registered_snapshot = true; + ctx->internal.registered_snapshot = true; + } +} + +TSDLLEXPORT Relation +ts_scanner_open(ScannerCtx *ctx) +{ + Scanner *scanner = scanner_ctx_get_scanner(ctx); + + Assert(NULL == ctx->tablerel); + prepare_scan(ctx); + + return scanner->openscan(ctx); +} + +/* + * Start either a heap or index scan depending on the information in the + * ScannerCtx. ScannerCtx must be setup by caller with the proper information + * for the scan, including filters and callbacks for found tuples. + */ +TSDLLEXPORT void +ts_scanner_start_scan(ScannerCtx *ctx) +{ + InternalScannerCtx *ictx = &ctx->internal; + Scanner *scanner; + TupleDesc tuple_desc; + + if (ictx->started) + { + Assert(!ictx->ended); + Assert(ctx->tablerel); + Assert(OidIsValid(ctx->table)); + return; } + if (ctx->tablerel == NULL) + { + Assert(NULL == ctx->indexrel); + ts_scanner_open(ctx); + } + else + { + /* + * Relations already opened by caller: Only need to prepare the scan + * and set relation Oids so that the scanner knows which scanner + * implementation to use. Respect the auto-closing behavior set by the + * user, which is to auto close if unspecified. + */ + prepare_scan(ctx); + ctx->table = RelationGetRelid(ctx->tablerel); + + if (NULL != ctx->indexrel) + ctx->index = RelationGetRelid(ctx->indexrel); + } + + scanner = scanner_ctx_get_scanner(ctx); scanner->beginscan(ctx); - tuple_desc = RelationGetDescr(ictx->tablerel); + tuple_desc = RelationGetDescr(ctx->tablerel); - ictx->tinfo.scanrel = ictx->tablerel; + ictx->tinfo.scanrel = ctx->tablerel; ictx->tinfo.mctx = ctx->result_mctx == NULL ? CurrentMemoryContext : ctx->result_mctx; - ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ictx->tablerel)); + ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ctx->tablerel)); /* Call pre-scan handler, if any. */ if (ctx->prescan != NULL) ctx->prescan(ctx->data); + + ictx->started = true; } static inline bool @@ -252,6 +291,24 @@ ts_scanner_limit_reached(ScannerCtx *ctx) return ctx->limit > 0 && ctx->internal.tinfo.count >= ctx->limit; } +static void +scanner_cleanup(ScannerCtx *ctx) +{ + InternalScannerCtx *ictx = &ctx->internal; + + if (ictx->registered_snapshot) + { + UnregisterSnapshot(ctx->snapshot); + ctx->snapshot = NULL; + } + + if (NULL != ictx->tinfo.slot) + { + ExecDropSingleTupleTableSlot(ictx->tinfo.slot); + ictx->tinfo.slot = NULL; + } +} + TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx) { @@ -266,32 +323,24 @@ ts_scanner_end_scan(ScannerCtx *ctx) ctx->postscan(ictx->tinfo.count, ctx->data); scanner->endscan(ctx); + scanner_cleanup(ctx); ictx->ended = true; + ictx->started = false; } TSDLLEXPORT void -ts_scanner_end_and_close_scan(ScannerCtx *ctx) +ts_scanner_close(ScannerCtx *ctx) { - InternalScannerCtx *ictx = &ctx->internal; Scanner *scanner = scanner_ctx_get_scanner(ctx); - if (ictx->closed) + Assert(ctx->internal.ended); + + if (NULL != ctx->tablerel) { - Assert(ictx->ended); - return; + scanner->closescan(ctx); + ctx->tablerel = NULL; + ctx->indexrel = NULL; } - - ts_scanner_end_scan(ctx); - - if (ictx->registered_snapshot) - { - UnregisterSnapshot(ctx->snapshot); - ctx->snapshot = NULL; - } - - scanner->closescan(ctx); - ExecDropSingleTupleTableSlot(ictx->tinfo.slot); - ictx->closed = true; } TSDLLEXPORT TupleInfo * @@ -312,7 +361,7 @@ ts_scanner_next(ScannerCtx *ctx) TupleTableSlot *slot = ictx->tinfo.slot; Assert(ctx->snapshot); - ictx->tinfo.lockresult = table_tuple_lock(ictx->tablerel, + ictx->tinfo.lockresult = table_tuple_lock(ctx->tablerel, &(slot->tts_tid), ctx->snapshot, slot, @@ -329,7 +378,11 @@ ts_scanner_next(ScannerCtx *ctx) is_valid = ts_scanner_limit_reached(ctx) ? false : scanner->getnext(ctx); } - ts_scanner_end_and_close_scan(ctx); + if (!(ctx->flags & SCANNER_F_NOEND)) + ts_scanner_end_scan(ctx); + + if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) + ts_scanner_close(ctx); return NULL; } @@ -353,7 +406,11 @@ ts_scanner_scan(ScannerCtx *ctx) /* Call tuple_found handler. Abort the scan if the handler wants us to */ if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE) { - ts_scanner_end_and_close_scan(ctx); + if (!(ctx->flags & SCANNER_F_NOEND)) + ts_scanner_end_scan(ctx); + + if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE)) + ts_scanner_close(ctx); break; } } diff --git a/src/scanner.h b/src/scanner.h index cabf3ef5f..a74e52023 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -69,17 +69,24 @@ typedef union ScanDesc TableScanDesc table_scan; } ScanDesc; +typedef enum ScannerFlags +{ + SCANNER_F_NOFLAGS = 0x00, + SCANNER_F_KEEPLOCK = 0x01, + SCANNER_F_NOEND = 0x02, + SCANNER_F_NOEND_AND_NOCLOSE = 0x04 | SCANNER_F_NOEND, +} ScannerFlags; + /* * InternalScannerCtx is used for internal state during scanning and shouldn't * be initialized or touched by the user. */ typedef struct InternalScannerCtx { - Relation tablerel, indexrel; TupleInfo tinfo; ScanDesc scan; bool registered_snapshot; - bool closed; + bool started; bool ended; } InternalScannerCtx; @@ -89,11 +96,13 @@ typedef struct ScannerCtx /* Fields below this line can be initialized by the user */ Oid table; Oid index; + Relation tablerel; + Relation indexrel; ScanKey scankey; + int flags; int nkeys, norderbys, limit; /* Limit on number of tuples to return. 0 or * less means no limit */ bool want_itup; - bool keeplock; /* Keep the table lock after the scan finishes */ LOCKMODE lockmode; MemoryContext result_mctx; /* The memory context to allocate the result * on */ @@ -132,12 +141,13 @@ typedef struct ScannerCtx /* Performs an index scan or heap scan and returns the number of matching * tuples. */ +extern TSDLLEXPORT Relation ts_scanner_open(ScannerCtx *ctx); +extern TSDLLEXPORT void ts_scanner_close(ScannerCtx *ctx); extern TSDLLEXPORT int ts_scanner_scan(ScannerCtx *ctx); extern TSDLLEXPORT bool ts_scanner_scan_one(ScannerCtx *ctx, bool fail_if_not_found, const char *item_type); extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx); extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx); -extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx); extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey); extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx); extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti); diff --git a/test/expected/test_utils.out b/test/expected/test_utils.out index 23806aa18..bc1418952 100644 --- a/test/expected/test_utils.out +++ b/test/expected/test_utils.out @@ -64,6 +64,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT) RETURNS VOID AS :MODULE_PATHNAME, 'ts_test_error_injection' LANGUAGE C VOLATILE STRICT; +SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT test_error_injection('test_error'); test_error_injection ---------------------- @@ -92,3 +93,29 @@ SELECT test_error_injection('test_error'); (1 row) +-- Test Scanner +RESET ROLE; +CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID + AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; +SET ROLE :ROLE_DEFAULT_PERM_USER; +-- Create two chunks to scan in the test +CREATE TABLE hyper (time timestamptz, temp float); +SELECT create_hypertable('hyper', 'time'); +NOTICE: adding not-null constraint to column "time" + create_hypertable +-------------------- + (1,public,hyper,t) +(1 row) + +INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0); +SELECT test.scanner(); +NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_1_chunk" +NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_2_chunk" +NOTICE: 2. Scan with filter: "_timescaledb_internal._hyper_1_1_chunk" +NOTICE: 3. ReScan: "_timescaledb_internal._hyper_1_2_chunk" +NOTICE: 4. IndexScan: "_timescaledb_internal._hyper_1_2_chunk" + scanner +--------- + +(1 row) + diff --git a/test/sql/test_utils.sql b/test/sql/test_utils.sql index 865a738eb..6c63f349d 100644 --- a/test/sql/test_utils.sql +++ b/test/sql/test_utils.sql @@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT) RETURNS VOID AS :MODULE_PATHNAME, 'ts_test_error_injection' LANGUAGE C VOLATILE STRICT; +SET ROLE :ROLE_DEFAULT_PERM_USER; SELECT test_error_injection('test_error'); @@ -65,3 +66,15 @@ SELECT test_error_injection('test_error'); SELECT debug_point_release('test_error'); SELECT test_error_injection('test_error'); + +-- Test Scanner +RESET ROLE; +CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID + AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE; +SET ROLE :ROLE_DEFAULT_PERM_USER; + +-- Create two chunks to scan in the test +CREATE TABLE hyper (time timestamptz, temp float); +SELECT create_hypertable('hyper', 'time'); +INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0); +SELECT test.scanner(); diff --git a/test/src/CMakeLists.txt b/test/src/CMakeLists.txt index d4f7b5477..8b68e9844 100644 --- a/test/src/CMakeLists.txt +++ b/test/src/CMakeLists.txt @@ -1,5 +1,11 @@ -set(SOURCES adt_tests.c symbol_conflict.c test_time_to_internal.c - test_with_clause_parser.c test_time_utils.c test_utils.c) +set(SOURCES + adt_tests.c + symbol_conflict.c + test_time_to_internal.c + test_with_clause_parser.c + test_time_utils.c + test_utils.c + test_scanner.c) include(${PROJECT_SOURCE_DIR}/src/build-defs.cmake) diff --git a/test/src/test_scanner.c b/test/src/test_scanner.c new file mode 100644 index 000000000..5a0f452a2 --- /dev/null +++ b/test/src/test_scanner.c @@ -0,0 +1,106 @@ +/* + * This file and its contents are licensed under the Apache License 2.0. + * Please see the included NOTICE for copyright information and + * LICENSE-APACHE for a copy of the license. + */ +#include + +#include "scanner.h" +#include "scan_iterator.h" +#include "chunk.h" +#include "test_utils.h" + +TS_TEST_FN(ts_test_scanner) +{ + ScanIterator it; + Relation chunkrel; + int32 chunk_id[2] = { -1, -1 }; + int i = 0; + + /* Test pre-open relation */ + it = ts_chunk_scan_iterator_create(CurrentMemoryContext); + chunkrel = table_open(it.ctx.table, AccessShareLock); + it.ctx.tablerel = chunkrel; + + /* Explicit start scan to test that we can call it twice without + * issue. The loop will also call it */ + ts_scan_iterator_start_scan(&it); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "1. Scan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + + if (chunk_id[i] == -1 && i < lengthof(chunk_id)) + { + chunk_id[i] = fd.id; + i++; + } + } + + ts_scan_iterator_end(&it); + + /* Add a chunk filter and scan again */ + ts_scan_iterator_scan_key_init(&it, + Anum_chunk_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(chunk_id[0])); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, + "2. Scan with filter: \"%s.%s\"", + NameStr(fd.schema_name), + NameStr(fd.table_name)); + } + + /* Rescan */ + ts_scan_iterator_scan_key_reset(&it); + ts_scan_iterator_scan_key_init(&it, + Anum_chunk_idx_id, + BTEqualStrategyNumber, + F_INT4EQ, + Int32GetDatum(chunk_id[1])); + ts_scan_iterator_rescan(&it); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "3. ReScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + } + + ts_scan_iterator_end(&it); + table_close(chunkrel, AccessShareLock); + + /* Do another scan, but an index scan this time */ + it.ctx.tablerel = NULL; + it.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX); + + ts_scanner_foreach(&it) + { + TupleInfo *ti = ts_scan_iterator_tuple_info(&it); + FormData_chunk fd; + + ts_chunk_formdata_fill(&fd, ti); + + elog(NOTICE, "4. IndexScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name)); + } + + ts_scan_iterator_close(&it); + + PG_RETURN_VOID(); +}