Add rescan support to internal Scanner

This change adds rescan support to the internal Scanner module.

When scanning TimescaleDB catalog data, it is sometimes useful to be
able to restart an index scan without having to close and reopen the
scanned relation.
This commit is contained in:
Erik Nordström 2022-01-27 17:14:26 +01:00 committed by Erik Nordström
parent 7d41380991
commit 7f05448d2a
4 changed files with 120 additions and 17 deletions

View File

@ -7,25 +7,44 @@
#include "scan_iterator.h"
TSDLLEXPORT void
ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int indexid)
{
iterator->ctx.index = catalog_get_index(ts_catalog_get(), table, indexid);
}
void
ts_scan_iterator_close(ScanIterator *iterator)
{
ts_scanner_end_scan(&iterator->ctx, &iterator->ictx);
ts_scanner_end_and_close_scan(&iterator->ctx, &iterator->ictx);
}
TSDLLEXPORT void
ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber,
StrategyNumber strategy, RegProcedure procedure, Datum argument)
{
MemoryContext oldmcxt;
Assert(iterator->ctx.scankey == NULL || iterator->ctx.scankey == iterator->scankey);
iterator->ctx.scankey = iterator->scankey;
if (iterator->ctx.nkeys >= EMBEDDED_SCAN_KEY_SIZE)
elog(ERROR, "cannot scan more than %d keys", EMBEDDED_SCAN_KEY_SIZE);
/* For rescans, when the scan key is reinitialized during the scan, make
* sure the scan eky is initialized on the long-lived scankey memory
* context. */
oldmcxt = MemoryContextSwitchTo(iterator->scankey_mcxt);
ScanKeyInit(&iterator->scankey[iterator->ctx.nkeys++],
attributeNumber,
strategy,
procedure,
argument);
MemoryContextSwitchTo(oldmcxt);
}
TSDLLEXPORT void
ts_scan_iterator_rescan(ScanIterator *iterator)
{
ts_scanner_rescan(&iterator->ctx, &iterator->ictx, NULL);
}

View File

@ -10,6 +10,7 @@
#include <utils/palloc.h>
#include "scanner.h"
#include "ts_catalog/catalog.h"
#define EMBEDDED_SCAN_KEY_SIZE 5
@ -18,19 +19,25 @@ typedef struct ScanIterator
ScannerCtx ctx;
TupleInfo *tinfo;
InternalScannerCtx ictx;
MemoryContext scankey_mcxt;
ScanKeyData scankey[EMBEDDED_SCAN_KEY_SIZE];
} ScanIterator;
#define ts_scan_iterator_create(catalog_table_id, lock_mode, mctx) \
(ScanIterator) \
{ \
#define ts_scan_iterator_create(catalog_table_id, lock_mode, mctx) \
(ScanIterator) \
{ \
.ctx = { \
.table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \
.nkeys = 0, \
.scandirection = ForwardScanDirection, \
.lockmode = lock_mode, \
.result_mctx = mctx, \
} \
}, \
.scankey_mcxt = CurrentMemoryContext, \
.ictx = { \
.ended = true, \
.closed = true, \
}, \
}
static inline TupleInfo *
@ -69,16 +76,44 @@ ts_scan_iterator_alloc_result(const ScanIterator *iterator, Size size)
return ts_scanner_alloc_result(iterator->tinfo, size);
}
void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator);
static inline void
ts_scan_iterator_start_scan(ScanIterator *iterator)
{
MemoryContext oldmcxt = MemoryContextSwitchTo(iterator->scankey_mcxt);
ts_scanner_start_scan(&(iterator)->ctx, &(iterator)->ictx);
MemoryContextSwitchTo(oldmcxt);
}
static inline TupleInfo *
ts_scan_iterator_next(ScanIterator *iterator)
{
iterator->tinfo = ts_scanner_next(&(iterator)->ctx, &(iterator)->ictx);
return iterator->tinfo;
}
static inline void
ts_scan_iterator_scan_key_reset(ScanIterator *iterator)
{
iterator->ctx.nkeys = 0;
}
void TSDLLEXPORT ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table,
int indexid);
void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator);
void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber,
StrategyNumber strategy, RegProcedure procedure,
Datum argument);
/*
* Reset the scan to use a new scan key.
*
* Note that the scan key should typically be reinitialized before a rescan.
*/
void TSDLLEXPORT ts_scan_iterator_rescan(ScanIterator *iterator);
/* You must use `ts_scan_iterator_close` if terminating this loop early */
#define ts_scanner_foreach(scan_iterator) \
for (ts_scanner_start_scan(&(scan_iterator)->ctx, &(scan_iterator)->ictx); \
((scan_iterator)->tinfo = \
ts_scanner_next(&(scan_iterator)->ctx, &(scan_iterator)->ictx)) != NULL;)
for (ts_scan_iterator_start_scan((scan_iterator)); \
ts_scan_iterator_next(scan_iterator) != NULL;)
#endif /* TIMESCALEDB_SCAN_ITERATOR_H */

View File

@ -31,6 +31,7 @@ typedef struct Scanner
Relation (*openscan)(InternalScannerCtx *ctx);
ScanDesc (*beginscan)(InternalScannerCtx *ctx);
bool (*getnext)(InternalScannerCtx *ctx);
void (*rescan)(InternalScannerCtx *ctx);
void (*endscan)(InternalScannerCtx *ctx);
void (*closescan)(InternalScannerCtx *ctx);
} Scanner;
@ -63,6 +64,12 @@ table_scanner_getnext(InternalScannerCtx *ctx)
return success;
}
static void
table_scanner_rescan(InternalScannerCtx *ctx)
{
table_rescan(ctx->scan.table_scan, ctx->sctx->scankey);
}
static void
table_scanner_endscan(InternalScannerCtx *ctx)
{
@ -109,6 +116,14 @@ index_scanner_getnext(InternalScannerCtx *ctx)
return success;
}
static void
index_scanner_rescan(InternalScannerCtx *ctx)
{
ScannerCtx *sctx = ctx->sctx;
index_rescan(ctx->scan.index_scan, sctx->scankey, sctx->nkeys, NULL, sctx->norderbys);
}
static void
index_scanner_endscan(InternalScannerCtx *ctx)
{
@ -119,9 +134,8 @@ static void
index_scanner_close(InternalScannerCtx *ctx)
{
LOCKMODE lockmode = ctx->sctx->keeplock ? NoLock : ctx->sctx->lockmode;
table_close(ctx->tablerel, lockmode);
index_close(ctx->indexrel, ctx->sctx->lockmode);
table_close(ctx->tablerel, lockmode);
}
/*
@ -132,6 +146,7 @@ static Scanner scanners[] = {
.openscan = table_scanner_open,
.beginscan = table_scanner_beginscan,
.getnext = table_scanner_getnext,
.rescan = table_scanner_rescan,
.endscan = table_scanner_endscan,
.closescan = table_scanner_close,
},
@ -139,6 +154,7 @@ static Scanner scanners[] = {
.openscan = index_scanner_open,
.beginscan = index_scanner_beginscan,
.getnext = index_scanner_getnext,
.rescan = index_scanner_rescan,
.endscan = index_scanner_endscan,
.closescan = index_scanner_close,
}
@ -153,6 +169,19 @@ scanner_ctx_get_scanner(ScannerCtx *ctx)
return &scanners[ScannerTypeTable];
}
TSDLLEXPORT void
ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, const ScanKey scankey)
{
Scanner *scanner = scanner_ctx_get_scanner(ctx);
/* If scankey is NULL, the existing scan key was already updated or the
* old should be reused */
if (NULL != scankey)
memcpy(ctx->scankey, scankey, sizeof(*ctx->scankey));
scanner->rescan(ictx);
}
/*
* Perform either a heap or index scan depending on the information in the
* ScannerCtx. ScannerCtx must be setup by caller with the proper information
@ -168,6 +197,7 @@ ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ictx->sctx = ctx;
ictx->closed = false;
ictx->ended = false;
ictx->registered_snapshot = false;
scanner = scanner_ctx_get_scanner(ctx);
@ -226,16 +256,31 @@ ts_scanner_limit_reached(ScannerCtx *ctx, InternalScannerCtx *ictx)
TSDLLEXPORT void
ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
{
Scanner *scanner = scanner_ctx_get_scanner(ictx->sctx);
Scanner *scanner = scanner_ctx_get_scanner(ctx);
if (ictx->closed)
if (ictx->ended)
return;
/* Call post-scan handler, if any. */
if (ictx->sctx->postscan != NULL)
ictx->sctx->postscan(ictx->tinfo.count, ictx->sctx->data);
if (ctx->postscan != NULL)
ctx->postscan(ictx->tinfo.count, ictx->sctx->data);
scanner->endscan(ictx);
ictx->ended = true;
}
TSDLLEXPORT void
ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
{
Scanner *scanner = scanner_ctx_get_scanner(ctx);
if (ictx->closed)
{
Assert(ictx->ended);
return;
}
ts_scanner_end_scan(ctx, ictx);
if (ictx->registered_snapshot)
{
@ -282,7 +327,7 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
is_valid = ts_scanner_limit_reached(ctx, ictx) ? false : scanner->getnext(ictx);
}
ts_scanner_end_scan(ctx, ictx);
ts_scanner_end_and_close_scan(ctx, ictx);
return NULL;
}
@ -305,7 +350,7 @@ ts_scanner_scan(ScannerCtx *ctx)
/* Call tuple_found handler. Abort the scan if the handler wants us to */
if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE)
{
ts_scanner_end_scan(ctx, &ictx);
ts_scanner_end_and_close_scan(ctx, &ictx);
break;
}
}

View File

@ -140,10 +140,14 @@ typedef struct InternalScannerCtx
ScannerCtx *sctx;
bool registered_snapshot;
bool closed;
bool ended;
} InternalScannerCtx;
extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx,
const ScanKey scankey);
extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti);
extern TSDLLEXPORT HeapTuple ts_scanner_fetch_heap_tuple(const TupleInfo *ti, bool materialize,