diff --git a/src/scan_iterator.c b/src/scan_iterator.c index 94c62f825..03091e81c 100644 --- a/src/scan_iterator.c +++ b/src/scan_iterator.c @@ -7,25 +7,44 @@ #include "scan_iterator.h" +TSDLLEXPORT void +ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int indexid) +{ + iterator->ctx.index = catalog_get_index(ts_catalog_get(), table, indexid); +} + void ts_scan_iterator_close(ScanIterator *iterator) { - ts_scanner_end_scan(&iterator->ctx, &iterator->ictx); + ts_scanner_end_and_close_scan(&iterator->ctx, &iterator->ictx); } TSDLLEXPORT void ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument) { + MemoryContext oldmcxt; + Assert(iterator->ctx.scankey == NULL || iterator->ctx.scankey == iterator->scankey); iterator->ctx.scankey = iterator->scankey; if (iterator->ctx.nkeys >= EMBEDDED_SCAN_KEY_SIZE) elog(ERROR, "cannot scan more than %d keys", EMBEDDED_SCAN_KEY_SIZE); + /* For rescans, when the scan key is reinitialized during the scan, make + * sure the scan eky is initialized on the long-lived scankey memory + * context. */ + oldmcxt = MemoryContextSwitchTo(iterator->scankey_mcxt); ScanKeyInit(&iterator->scankey[iterator->ctx.nkeys++], attributeNumber, strategy, procedure, argument); + MemoryContextSwitchTo(oldmcxt); +} + +TSDLLEXPORT void +ts_scan_iterator_rescan(ScanIterator *iterator) +{ + ts_scanner_rescan(&iterator->ctx, &iterator->ictx, NULL); } diff --git a/src/scan_iterator.h b/src/scan_iterator.h index 843934475..4011ee086 100644 --- a/src/scan_iterator.h +++ b/src/scan_iterator.h @@ -10,6 +10,7 @@ #include #include "scanner.h" +#include "ts_catalog/catalog.h" #define EMBEDDED_SCAN_KEY_SIZE 5 @@ -18,19 +19,25 @@ typedef struct ScanIterator ScannerCtx ctx; TupleInfo *tinfo; InternalScannerCtx ictx; + MemoryContext scankey_mcxt; ScanKeyData scankey[EMBEDDED_SCAN_KEY_SIZE]; } ScanIterator; -#define ts_scan_iterator_create(catalog_table_id, lock_mode, mctx) \ - (ScanIterator) \ - { \ +#define ts_scan_iterator_create(catalog_table_id, lock_mode, mctx) \ + (ScanIterator) \ + { \ .ctx = { \ .table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \ .nkeys = 0, \ .scandirection = ForwardScanDirection, \ .lockmode = lock_mode, \ .result_mctx = mctx, \ - } \ + }, \ + .scankey_mcxt = CurrentMemoryContext, \ + .ictx = { \ + .ended = true, \ + .closed = true, \ + }, \ } static inline TupleInfo * @@ -69,16 +76,44 @@ ts_scan_iterator_alloc_result(const ScanIterator *iterator, Size size) return ts_scanner_alloc_result(iterator->tinfo, size); } -void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator); +static inline void +ts_scan_iterator_start_scan(ScanIterator *iterator) +{ + MemoryContext oldmcxt = MemoryContextSwitchTo(iterator->scankey_mcxt); + ts_scanner_start_scan(&(iterator)->ctx, &(iterator)->ictx); + MemoryContextSwitchTo(oldmcxt); +} +static inline TupleInfo * +ts_scan_iterator_next(ScanIterator *iterator) +{ + iterator->tinfo = ts_scanner_next(&(iterator)->ctx, &(iterator)->ictx); + return iterator->tinfo; +} + +static inline void +ts_scan_iterator_scan_key_reset(ScanIterator *iterator) +{ + iterator->ctx.nkeys = 0; +} + +void TSDLLEXPORT ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, + int indexid); +void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator); void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument); +/* + * Reset the scan to use a new scan key. + * + * Note that the scan key should typically be reinitialized before a rescan. + */ +void TSDLLEXPORT ts_scan_iterator_rescan(ScanIterator *iterator); + /* You must use `ts_scan_iterator_close` if terminating this loop early */ #define ts_scanner_foreach(scan_iterator) \ - for (ts_scanner_start_scan(&(scan_iterator)->ctx, &(scan_iterator)->ictx); \ - ((scan_iterator)->tinfo = \ - ts_scanner_next(&(scan_iterator)->ctx, &(scan_iterator)->ictx)) != NULL;) + for (ts_scan_iterator_start_scan((scan_iterator)); \ + ts_scan_iterator_next(scan_iterator) != NULL;) #endif /* TIMESCALEDB_SCAN_ITERATOR_H */ diff --git a/src/scanner.c b/src/scanner.c index 564b9c92f..5374a697f 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -31,6 +31,7 @@ typedef struct Scanner Relation (*openscan)(InternalScannerCtx *ctx); ScanDesc (*beginscan)(InternalScannerCtx *ctx); bool (*getnext)(InternalScannerCtx *ctx); + void (*rescan)(InternalScannerCtx *ctx); void (*endscan)(InternalScannerCtx *ctx); void (*closescan)(InternalScannerCtx *ctx); } Scanner; @@ -63,6 +64,12 @@ table_scanner_getnext(InternalScannerCtx *ctx) return success; } +static void +table_scanner_rescan(InternalScannerCtx *ctx) +{ + table_rescan(ctx->scan.table_scan, ctx->sctx->scankey); +} + static void table_scanner_endscan(InternalScannerCtx *ctx) { @@ -109,6 +116,14 @@ index_scanner_getnext(InternalScannerCtx *ctx) return success; } +static void +index_scanner_rescan(InternalScannerCtx *ctx) +{ + ScannerCtx *sctx = ctx->sctx; + + index_rescan(ctx->scan.index_scan, sctx->scankey, sctx->nkeys, NULL, sctx->norderbys); +} + static void index_scanner_endscan(InternalScannerCtx *ctx) { @@ -119,9 +134,8 @@ static void index_scanner_close(InternalScannerCtx *ctx) { LOCKMODE lockmode = ctx->sctx->keeplock ? NoLock : ctx->sctx->lockmode; - - table_close(ctx->tablerel, lockmode); index_close(ctx->indexrel, ctx->sctx->lockmode); + table_close(ctx->tablerel, lockmode); } /* @@ -132,6 +146,7 @@ static Scanner scanners[] = { .openscan = table_scanner_open, .beginscan = table_scanner_beginscan, .getnext = table_scanner_getnext, + .rescan = table_scanner_rescan, .endscan = table_scanner_endscan, .closescan = table_scanner_close, }, @@ -139,6 +154,7 @@ static Scanner scanners[] = { .openscan = index_scanner_open, .beginscan = index_scanner_beginscan, .getnext = index_scanner_getnext, + .rescan = index_scanner_rescan, .endscan = index_scanner_endscan, .closescan = index_scanner_close, } @@ -153,6 +169,19 @@ scanner_ctx_get_scanner(ScannerCtx *ctx) return &scanners[ScannerTypeTable]; } +TSDLLEXPORT void +ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, const ScanKey scankey) +{ + Scanner *scanner = scanner_ctx_get_scanner(ctx); + + /* If scankey is NULL, the existing scan key was already updated or the + * old should be reused */ + if (NULL != scankey) + memcpy(ctx->scankey, scankey, sizeof(*ctx->scankey)); + + scanner->rescan(ictx); +} + /* * Perform either a heap or index scan depending on the information in the * ScannerCtx. ScannerCtx must be setup by caller with the proper information @@ -168,6 +197,7 @@ ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx) ictx->sctx = ctx; ictx->closed = false; + ictx->ended = false; ictx->registered_snapshot = false; scanner = scanner_ctx_get_scanner(ctx); @@ -226,16 +256,31 @@ ts_scanner_limit_reached(ScannerCtx *ctx, InternalScannerCtx *ictx) TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx) { - Scanner *scanner = scanner_ctx_get_scanner(ictx->sctx); + Scanner *scanner = scanner_ctx_get_scanner(ctx); - if (ictx->closed) + if (ictx->ended) return; /* Call post-scan handler, if any. */ - if (ictx->sctx->postscan != NULL) - ictx->sctx->postscan(ictx->tinfo.count, ictx->sctx->data); + if (ctx->postscan != NULL) + ctx->postscan(ictx->tinfo.count, ictx->sctx->data); scanner->endscan(ictx); + ictx->ended = true; +} + +TSDLLEXPORT void +ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx) +{ + Scanner *scanner = scanner_ctx_get_scanner(ctx); + + if (ictx->closed) + { + Assert(ictx->ended); + return; + } + + ts_scanner_end_scan(ctx, ictx); if (ictx->registered_snapshot) { @@ -282,7 +327,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx) is_valid = ts_scanner_limit_reached(ctx, ictx) ? false : scanner->getnext(ictx); } - ts_scanner_end_scan(ctx, ictx); + ts_scanner_end_and_close_scan(ctx, ictx); return NULL; } @@ -305,7 +350,7 @@ ts_scanner_scan(ScannerCtx *ctx) /* Call tuple_found handler. Abort the scan if the handler wants us to */ if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE) { - ts_scanner_end_scan(ctx, &ictx); + ts_scanner_end_and_close_scan(ctx, &ictx); break; } } diff --git a/src/scanner.h b/src/scanner.h index 1d1f12b3a..014ee21b2 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -140,10 +140,14 @@ typedef struct InternalScannerCtx ScannerCtx *sctx; bool registered_snapshot; bool closed; + bool ended; } InternalScannerCtx; extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx); extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx); +extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx); +extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, + const ScanKey scankey); extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx); extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti); extern TSDLLEXPORT HeapTuple ts_scanner_fetch_heap_tuple(const TupleInfo *ti, bool materialize,