Simplify Scanner by embedding internal state

As part of adding a scan iterator interface on top of the Scanner
module (commit 8baaa98), the internal scanner state that was
previously private, was made public. Now that it is public, it makes
more sense to make it part of the standard user-facing `ScannerCtx`
struct, which also simplifies the code elsewhere.
This commit is contained in:
Erik Nordström 2022-02-10 15:55:28 +01:00 committed by Erik Nordström
parent 91820e26f6
commit 0f351ff612
5 changed files with 114 additions and 121 deletions

View File

@ -16,7 +16,7 @@ ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int index
void
ts_scan_iterator_close(ScanIterator *iterator)
{
ts_scanner_end_and_close_scan(&iterator->ctx, &iterator->ictx);
ts_scanner_end_and_close_scan(&iterator->ctx);
}
TSDLLEXPORT void
@ -48,5 +48,5 @@ ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumbe
TSDLLEXPORT void
ts_scan_iterator_rescan(ScanIterator *iterator)
{
ts_scanner_rescan(&iterator->ctx, &iterator->ictx, NULL);
ts_scanner_rescan(&iterator->ctx, NULL);
}

View File

@ -18,7 +18,6 @@ typedef struct ScanIterator
{
ScannerCtx ctx;
TupleInfo *tinfo;
InternalScannerCtx ictx;
MemoryContext scankey_mcxt;
ScanKeyData scankey[EMBEDDED_SCAN_KEY_SIZE];
} ScanIterator;
@ -27,6 +26,10 @@ typedef struct ScanIterator
(ScanIterator) \
{ \
.ctx = { \
.internal = { \
.ended = true, \
.closed = true, \
}, \
.table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \
.nkeys = 0, \
.scandirection = ForwardScanDirection, \
@ -34,10 +37,6 @@ typedef struct ScanIterator
.result_mctx = mctx, \
}, \
.scankey_mcxt = CurrentMemoryContext, \
.ictx = { \
.ended = true, \
.closed = true, \
}, \
}
static inline TupleInfo *
@ -80,14 +79,14 @@ static inline void
ts_scan_iterator_start_scan(ScanIterator *iterator)
{
MemoryContext oldmcxt = MemoryContextSwitchTo(iterator->scankey_mcxt);
ts_scanner_start_scan(&(iterator)->ctx, &(iterator)->ictx);
ts_scanner_start_scan(&(iterator)->ctx);
MemoryContextSwitchTo(oldmcxt);
}
static inline TupleInfo *
ts_scan_iterator_next(ScanIterator *iterator)
{
iterator->tinfo = ts_scanner_next(&(iterator)->ctx, &(iterator)->ictx);
iterator->tinfo = ts_scanner_next(&(iterator)->ctx);
return iterator->tinfo;
}

View File

@ -28,114 +28,113 @@ enum ScannerType
*/
typedef struct Scanner
{
Relation (*openscan)(InternalScannerCtx *ctx);
ScanDesc (*beginscan)(InternalScannerCtx *ctx);
bool (*getnext)(InternalScannerCtx *ctx);
void (*rescan)(InternalScannerCtx *ctx);
void (*endscan)(InternalScannerCtx *ctx);
void (*closescan)(InternalScannerCtx *ctx);
Relation (*openscan)(ScannerCtx *ctx);
ScanDesc (*beginscan)(ScannerCtx *ctx);
bool (*getnext)(ScannerCtx *ctx);
void (*rescan)(ScannerCtx *ctx);
void (*endscan)(ScannerCtx *ctx);
void (*closescan)(ScannerCtx *ctx);
} Scanner;
/* Functions implementing heap scans */
static Relation
table_scanner_open(InternalScannerCtx *ctx)
table_scanner_open(ScannerCtx *ctx)
{
ctx->tablerel = table_open(ctx->sctx->table, ctx->sctx->lockmode);
return ctx->tablerel;
ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode);
return ctx->internal.tablerel;
}
static ScanDesc
table_scanner_beginscan(InternalScannerCtx *ctx)
table_scanner_beginscan(ScannerCtx *ctx)
{
ScannerCtx *sctx = ctx->sctx;
ctx->internal.scan.table_scan =
table_beginscan(ctx->internal.tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey);
ctx->scan.table_scan =
table_beginscan(ctx->tablerel, sctx->snapshot, sctx->nkeys, sctx->scankey);
return ctx->scan;
return ctx->internal.scan;
}
static bool
table_scanner_getnext(InternalScannerCtx *ctx)
table_scanner_getnext(ScannerCtx *ctx)
{
bool success =
table_scan_getnextslot(ctx->scan.table_scan, ForwardScanDirection, ctx->tinfo.slot);
bool success = table_scan_getnextslot(ctx->internal.scan.table_scan,
ForwardScanDirection,
ctx->internal.tinfo.slot);
return success;
}
static void
table_scanner_rescan(InternalScannerCtx *ctx)
table_scanner_rescan(ScannerCtx *ctx)
{
table_rescan(ctx->scan.table_scan, ctx->sctx->scankey);
table_rescan(ctx->internal.scan.table_scan, ctx->scankey);
}
static void
table_scanner_endscan(InternalScannerCtx *ctx)
table_scanner_endscan(ScannerCtx *ctx)
{
table_endscan(ctx->scan.table_scan);
table_endscan(ctx->internal.scan.table_scan);
}
static void
table_scanner_close(InternalScannerCtx *ctx)
table_scanner_close(ScannerCtx *ctx)
{
LOCKMODE lockmode = ctx->sctx->keeplock ? NoLock : ctx->sctx->lockmode;
LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode;
table_close(ctx->tablerel, lockmode);
table_close(ctx->internal.tablerel, lockmode);
}
/* Functions implementing index scans */
static Relation
index_scanner_open(InternalScannerCtx *ctx)
index_scanner_open(ScannerCtx *ctx)
{
ctx->tablerel = table_open(ctx->sctx->table, ctx->sctx->lockmode);
ctx->indexrel = index_open(ctx->sctx->index, ctx->sctx->lockmode);
return ctx->indexrel;
ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode);
ctx->internal.indexrel = index_open(ctx->index, ctx->lockmode);
return ctx->internal.indexrel;
}
static ScanDesc
index_scanner_beginscan(InternalScannerCtx *ctx)
index_scanner_beginscan(ScannerCtx *ctx)
{
ScannerCtx *sctx = ctx->sctx;
InternalScannerCtx *ictx = &ctx->internal;
ctx->scan.index_scan =
index_beginscan(ctx->tablerel, ctx->indexrel, sctx->snapshot, sctx->nkeys, sctx->norderbys);
ctx->scan.index_scan->xs_want_itup = ctx->sctx->want_itup;
index_rescan(ctx->scan.index_scan, sctx->scankey, sctx->nkeys, NULL, sctx->norderbys);
return ctx->scan;
ictx->scan.index_scan =
index_beginscan(ictx->tablerel, ictx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys);
ictx->scan.index_scan->xs_want_itup = ctx->want_itup;
index_rescan(ictx->scan.index_scan, ctx->scankey, ctx->nkeys, NULL, ctx->norderbys);
return ictx->scan;
}
static bool
index_scanner_getnext(InternalScannerCtx *ctx)
index_scanner_getnext(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
bool success;
success = index_getnext_slot(ctx->scan.index_scan, ctx->sctx->scandirection, ctx->tinfo.slot);
ctx->tinfo.ituple = ctx->scan.index_scan->xs_itup;
ctx->tinfo.ituple_desc = ctx->scan.index_scan->xs_itupdesc;
success = index_getnext_slot(ictx->scan.index_scan, ctx->scandirection, ictx->tinfo.slot);
ictx->tinfo.ituple = ictx->scan.index_scan->xs_itup;
ictx->tinfo.ituple_desc = ictx->scan.index_scan->xs_itupdesc;
return success;
}
static void
index_scanner_rescan(InternalScannerCtx *ctx)
index_scanner_rescan(ScannerCtx *ctx)
{
ScannerCtx *sctx = ctx->sctx;
index_rescan(ctx->scan.index_scan, sctx->scankey, sctx->nkeys, NULL, sctx->norderbys);
index_rescan(ctx->internal.scan.index_scan, ctx->scankey, ctx->nkeys, NULL, ctx->norderbys);
}
static void
index_scanner_endscan(InternalScannerCtx *ctx)
index_scanner_endscan(ScannerCtx *ctx)
{
index_endscan(ctx->scan.index_scan);
index_endscan(ctx->internal.scan.index_scan);
}
static void
index_scanner_close(InternalScannerCtx *ctx)
index_scanner_close(ScannerCtx *ctx)
{
LOCKMODE lockmode = ctx->sctx->keeplock ? NoLock : ctx->sctx->lockmode;
index_close(ctx->indexrel, ctx->sctx->lockmode);
table_close(ctx->tablerel, lockmode);
LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode;
index_close(ctx->internal.indexrel, ctx->lockmode);
table_close(ctx->internal.tablerel, lockmode);
}
/*
@ -170,7 +169,7 @@ scanner_ctx_get_scanner(ScannerCtx *ctx)
}
TSDLLEXPORT void
ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, const ScanKey scankey)
ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey)
{
Scanner *scanner = scanner_ctx_get_scanner(ctx);
@ -179,7 +178,7 @@ ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, const ScanKey scank
if (NULL != scankey)
memcpy(ctx->scankey, scankey, sizeof(*ctx->scankey));
scanner->rescan(ictx);
scanner->rescan(ctx);
}
/*
@ -190,19 +189,19 @@ ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx, const ScanKey scank
* Return the number of tuples that where found.
*/
TSDLLEXPORT void
ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ts_scanner_start_scan(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
TupleDesc tuple_desc;
Scanner *scanner;
ictx->sctx = ctx;
ictx->closed = false;
ictx->ended = false;
ictx->registered_snapshot = false;
scanner = scanner_ctx_get_scanner(ctx);
scanner->openscan(ictx);
scanner->openscan(ctx);
if (ctx->snapshot == NULL)
{
@ -234,7 +233,7 @@ ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ictx->registered_snapshot = true;
}
scanner->beginscan(ictx);
scanner->beginscan(ctx);
tuple_desc = RelationGetDescr(ictx->tablerel);
@ -248,14 +247,15 @@ ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
}
static inline bool
ts_scanner_limit_reached(ScannerCtx *ctx, InternalScannerCtx *ictx)
ts_scanner_limit_reached(ScannerCtx *ctx)
{
return ctx->limit > 0 && ictx->tinfo.count >= ctx->limit;
return ctx->limit > 0 && ctx->internal.tinfo.count >= ctx->limit;
}
TSDLLEXPORT void
ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ts_scanner_end_scan(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
Scanner *scanner = scanner_ctx_get_scanner(ctx);
if (ictx->ended)
@ -263,15 +263,16 @@ ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
/* Call post-scan handler, if any. */
if (ctx->postscan != NULL)
ctx->postscan(ictx->tinfo.count, ictx->sctx->data);
ctx->postscan(ictx->tinfo.count, ctx->data);
scanner->endscan(ictx);
scanner->endscan(ctx);
ictx->ended = true;
}
TSDLLEXPORT void
ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ts_scanner_end_and_close_scan(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
Scanner *scanner = scanner_ctx_get_scanner(ctx);
if (ictx->closed)
@ -280,7 +281,7 @@ ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
return;
}
ts_scanner_end_scan(ctx, ictx);
ts_scanner_end_scan(ctx);
if (ictx->registered_snapshot)
{
@ -288,16 +289,17 @@ ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx)
ctx->snapshot = NULL;
}
scanner->closescan(ictx);
scanner->closescan(ctx);
ExecDropSingleTupleTableSlot(ictx->tinfo.slot);
ictx->closed = true;
}
TSDLLEXPORT TupleInfo *
ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
ts_scanner_next(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
Scanner *scanner = scanner_ctx_get_scanner(ctx);
bool is_valid = ts_scanner_limit_reached(ctx, ictx) ? false : scanner->getnext(ictx);
bool is_valid = ts_scanner_limit_reached(ctx) ? false : scanner->getnext(ctx);
while (is_valid)
{
@ -324,10 +326,10 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
/* stop at a valid tuple */
return &ictx->tinfo;
}
is_valid = ts_scanner_limit_reached(ctx, ictx) ? false : scanner->getnext(ictx);
is_valid = ts_scanner_limit_reached(ctx) ? false : scanner->getnext(ctx);
}
ts_scanner_end_and_close_scan(ctx, ictx);
ts_scanner_end_and_close_scan(ctx);
return NULL;
}
@ -342,20 +344,21 @@ ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx)
TSDLLEXPORT int
ts_scanner_scan(ScannerCtx *ctx)
{
InternalScannerCtx ictx = { 0 };
TupleInfo *tinfo;
for (ts_scanner_start_scan(ctx, &ictx); (tinfo = ts_scanner_next(ctx, &ictx));)
MemSet(&ctx->internal, 0, sizeof(ctx->internal));
for (ts_scanner_start_scan(ctx); (tinfo = ts_scanner_next(ctx));)
{
/* Call tuple_found handler. Abort the scan if the handler wants us to */
if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE)
{
ts_scanner_end_and_close_scan(ctx, &ictx);
ts_scanner_end_and_close_scan(ctx);
break;
}
}
return ictx.tinfo.count;
return ctx->internal.tinfo.count;
}
TSDLLEXPORT bool

View File

@ -63,8 +63,30 @@ typedef ScanTupleResult (*tuple_found_func)(TupleInfo *ti, void *data);
typedef ScanFilterResult (*tuple_filter_func)(const TupleInfo *ti, void *data);
typedef void (*postscan_func)(int num_tuples, void *data);
typedef union ScanDesc
{
IndexScanDesc index_scan;
TableScanDesc table_scan;
} ScanDesc;
/*
* InternalScannerCtx is used for internal state during scanning and shouldn't
* be initialized or touched by the user.
*/
typedef struct InternalScannerCtx
{
Relation tablerel, indexrel;
TupleInfo tinfo;
ScanDesc scan;
bool registered_snapshot;
bool closed;
bool ended;
} InternalScannerCtx;
typedef struct ScannerCtx
{
InternalScannerCtx internal;
/* Fields below this line can be initialized by the user */
Oid table;
Oid index;
ScanKey scankey;
@ -113,42 +135,11 @@ typedef struct ScannerCtx
extern TSDLLEXPORT int ts_scanner_scan(ScannerCtx *ctx);
extern TSDLLEXPORT bool ts_scanner_scan_one(ScannerCtx *ctx, bool fail_if_not_found,
const char *item_type);
/*
* Internal types and functions below.
*
* The below functions and types are really only exposed for the scan_iterator.
* The type definitions are needed for struct embedding and the functions are needed
* for iteration.
*/
typedef union ScanDesc
{
IndexScanDesc index_scan;
TableScanDesc table_scan;
} ScanDesc;
/*
* InternalScannerCtx is the context passed to Scanner functions.
* It holds a pointer to the user-given ScannerCtx as well as
* internal state used during scanning. Should not be used outside scanner.c
* but is embedded in ScanIterator.
*/
typedef struct InternalScannerCtx
{
Relation tablerel, indexrel;
TupleInfo tinfo;
ScanDesc scan;
ScannerCtx *sctx;
bool registered_snapshot;
bool closed;
bool ended;
} InternalScannerCtx;
extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, InternalScannerCtx *ictx,
const ScanKey scankey);
extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx, InternalScannerCtx *ictx);
extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey);
extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx);
extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti);
extern TSDLLEXPORT HeapTuple ts_scanner_fetch_heap_tuple(const TupleInfo *ti, bool materialize,
bool *should_free);

View File

@ -156,8 +156,8 @@ ts_hypertable_compression_get_by_pkey(int32 htid, const char *attname)
F_NAMEEQ,
CStringGetDatum(attname));
ts_scanner_start_scan(&iterator.ctx, &iterator.ictx);
TupleInfo *ti = ts_scanner_next(&iterator.ctx, &iterator.ictx);
ts_scanner_start_scan(&iterator.ctx);
TupleInfo *ti = ts_scanner_next(&iterator.ctx);
if (!ti)
return NULL;
@ -209,8 +209,8 @@ ts_hypertable_compression_delete_by_pkey(int32 htid, const char *attname)
F_NAMEEQ,
CStringGetDatum(attname));
ts_scanner_start_scan(&iterator.ctx, &iterator.ictx);
TupleInfo *ti = ts_scanner_next(&iterator.ctx, &iterator.ictx);
ts_scanner_start_scan(&iterator.ctx);
TupleInfo *ti = ts_scanner_next(&iterator.ctx);
if (!ti)
return false;