Allow control of relation open/close in Scanner

Make the Scanner module more flexible by allowing optional control
over when the scanned relation is opened and closed. Relations can
then remain open over multiple scans, which can improve performance
and efficiency.

Closes #2173
This commit is contained in:
Erik Nordström 2022-02-10 16:42:56 +01:00 committed by Erik Nordström
parent 0f351ff612
commit 32c1e3aef2
8 changed files with 309 additions and 66 deletions

View File

@ -13,10 +13,18 @@ ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table, int index
iterator->ctx.index = catalog_get_index(ts_catalog_get(), table, indexid);
}
void
ts_scan_iterator_end(ScanIterator *iterator)
{
ts_scanner_end_scan(&iterator->ctx);
}
void
ts_scan_iterator_close(ScanIterator *iterator)
{
ts_scanner_end_and_close_scan(&iterator->ctx);
/* Ending a scan is a no-op if already ended */
ts_scanner_end_scan(&iterator->ctx);
ts_scanner_close(&iterator->ctx);
}
TSDLLEXPORT void

View File

@ -28,13 +28,13 @@ typedef struct ScanIterator
.ctx = { \
.internal = { \
.ended = true, \
.closed = true, \
}, \
.table = catalog_get_table_id(ts_catalog_get(), catalog_table_id), \
.nkeys = 0, \
.scandirection = ForwardScanDirection, \
.lockmode = lock_mode, \
.result_mctx = mctx, \
.flags = SCANNER_F_NOFLAGS, \
}, \
.scankey_mcxt = CurrentMemoryContext, \
}
@ -66,13 +66,13 @@ ts_scan_iterator_tupledesc(const ScanIterator *iterator)
static inline MemoryContext
ts_scan_iterator_get_result_memory_context(const ScanIterator *iterator)
{
return iterator->tinfo->mctx;
return iterator->ctx.result_mctx;
}
static inline void *
ts_scan_iterator_alloc_result(const ScanIterator *iterator, Size size)
{
return ts_scanner_alloc_result(iterator->tinfo, size);
return MemoryContextAllocZero(iterator->ctx.result_mctx, size);
}
static inline void
@ -96,8 +96,15 @@ ts_scan_iterator_scan_key_reset(ScanIterator *iterator)
iterator->ctx.nkeys = 0;
}
static inline bool
ts_scan_iterator_is_started(ScanIterator *iterator)
{
return iterator->ctx.internal.started;
}
void TSDLLEXPORT ts_scan_iterator_set_index(ScanIterator *iterator, CatalogTable table,
int indexid);
void TSDLLEXPORT ts_scan_iterator_end(ScanIterator *iterator);
void TSDLLEXPORT ts_scan_iterator_close(ScanIterator *iterator);
void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumber attributeNumber,
StrategyNumber strategy, RegProcedure procedure,
@ -110,6 +117,15 @@ void TSDLLEXPORT ts_scan_iterator_scan_key_init(ScanIterator *iterator, AttrNumb
*/
void TSDLLEXPORT ts_scan_iterator_rescan(ScanIterator *iterator);
static inline void
ts_scan_iterator_start_or_restart_scan(ScanIterator *iterator)
{
if (ts_scan_iterator_is_started(iterator))
ts_scan_iterator_rescan(iterator);
else
ts_scan_iterator_start_scan(iterator);
}
/* You must use `ts_scan_iterator_close` if terminating this loop early */
#define ts_scanner_foreach(scan_iterator) \
for (ts_scan_iterator_start_scan((scan_iterator)); \

View File

@ -40,15 +40,15 @@ typedef struct Scanner
static Relation
table_scanner_open(ScannerCtx *ctx)
{
ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode);
return ctx->internal.tablerel;
ctx->tablerel = table_open(ctx->table, ctx->lockmode);
return ctx->tablerel;
}
static ScanDesc
table_scanner_beginscan(ScannerCtx *ctx)
{
ctx->internal.scan.table_scan =
table_beginscan(ctx->internal.tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey);
table_beginscan(ctx->tablerel, ctx->snapshot, ctx->nkeys, ctx->scankey);
return ctx->internal.scan;
}
@ -78,18 +78,18 @@ table_scanner_endscan(ScannerCtx *ctx)
static void
table_scanner_close(ScannerCtx *ctx)
{
LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode;
LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode;
table_close(ctx->internal.tablerel, lockmode);
table_close(ctx->tablerel, lockmode);
}
/* Functions implementing index scans */
static Relation
index_scanner_open(ScannerCtx *ctx)
{
ctx->internal.tablerel = table_open(ctx->table, ctx->lockmode);
ctx->internal.indexrel = index_open(ctx->index, ctx->lockmode);
return ctx->internal.indexrel;
ctx->tablerel = table_open(ctx->table, ctx->lockmode);
ctx->indexrel = index_open(ctx->index, ctx->lockmode);
return ctx->indexrel;
}
static ScanDesc
@ -98,7 +98,7 @@ index_scanner_beginscan(ScannerCtx *ctx)
InternalScannerCtx *ictx = &ctx->internal;
ictx->scan.index_scan =
index_beginscan(ictx->tablerel, ictx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys);
index_beginscan(ctx->tablerel, ctx->indexrel, ctx->snapshot, ctx->nkeys, ctx->norderbys);
ictx->scan.index_scan->xs_want_itup = ctx->want_itup;
index_rescan(ictx->scan.index_scan, ctx->scankey, ctx->nkeys, NULL, ctx->norderbys);
return ictx->scan;
@ -132,9 +132,9 @@ index_scanner_endscan(ScannerCtx *ctx)
static void
index_scanner_close(ScannerCtx *ctx)
{
LOCKMODE lockmode = ctx->keeplock ? NoLock : ctx->lockmode;
index_close(ctx->internal.indexrel, ctx->lockmode);
table_close(ctx->internal.tablerel, lockmode);
LOCKMODE lockmode = (ctx->flags & SCANNER_F_KEEPLOCK) ? NoLock : ctx->lockmode;
index_close(ctx->indexrel, ctx->lockmode);
table_close(ctx->tablerel, lockmode);
}
/*
@ -181,27 +181,11 @@ ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey)
scanner->rescan(ctx);
}
/*
* Perform either a heap or index scan depending on the information in the
* ScannerCtx. ScannerCtx must be setup by caller with the proper information
* for the scan, including filters and callbacks for found tuples.
*
* Return the number of tuples that where found.
*/
TSDLLEXPORT void
ts_scanner_start_scan(ScannerCtx *ctx)
static void
prepare_scan(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
TupleDesc tuple_desc;
Scanner *scanner;
ictx->closed = false;
ictx->ended = false;
ictx->registered_snapshot = false;
scanner = scanner_ctx_get_scanner(ctx);
scanner->openscan(ctx);
ctx->internal.ended = false;
ctx->internal.registered_snapshot = false;
if (ctx->snapshot == NULL)
{
@ -230,20 +214,75 @@ ts_scanner_start_scan(ScannerCtx *ctx)
* mode.
*/
ctx->snapshot = RegisterSnapshot(GetSnapshotData(SnapshotSelf));
ictx->registered_snapshot = true;
ctx->internal.registered_snapshot = true;
}
}
TSDLLEXPORT Relation
ts_scanner_open(ScannerCtx *ctx)
{
Scanner *scanner = scanner_ctx_get_scanner(ctx);
Assert(NULL == ctx->tablerel);
prepare_scan(ctx);
return scanner->openscan(ctx);
}
/*
* Start either a heap or index scan depending on the information in the
* ScannerCtx. ScannerCtx must be setup by caller with the proper information
* for the scan, including filters and callbacks for found tuples.
*/
TSDLLEXPORT void
ts_scanner_start_scan(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
Scanner *scanner;
TupleDesc tuple_desc;
if (ictx->started)
{
Assert(!ictx->ended);
Assert(ctx->tablerel);
Assert(OidIsValid(ctx->table));
return;
}
if (ctx->tablerel == NULL)
{
Assert(NULL == ctx->indexrel);
ts_scanner_open(ctx);
}
else
{
/*
* Relations already opened by caller: Only need to prepare the scan
* and set relation Oids so that the scanner knows which scanner
* implementation to use. Respect the auto-closing behavior set by the
* user, which is to auto close if unspecified.
*/
prepare_scan(ctx);
ctx->table = RelationGetRelid(ctx->tablerel);
if (NULL != ctx->indexrel)
ctx->index = RelationGetRelid(ctx->indexrel);
}
scanner = scanner_ctx_get_scanner(ctx);
scanner->beginscan(ctx);
tuple_desc = RelationGetDescr(ictx->tablerel);
tuple_desc = RelationGetDescr(ctx->tablerel);
ictx->tinfo.scanrel = ictx->tablerel;
ictx->tinfo.scanrel = ctx->tablerel;
ictx->tinfo.mctx = ctx->result_mctx == NULL ? CurrentMemoryContext : ctx->result_mctx;
ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ictx->tablerel));
ictx->tinfo.slot = MakeSingleTupleTableSlot(tuple_desc, table_slot_callbacks(ctx->tablerel));
/* Call pre-scan handler, if any. */
if (ctx->prescan != NULL)
ctx->prescan(ctx->data);
ictx->started = true;
}
static inline bool
@ -252,6 +291,24 @@ ts_scanner_limit_reached(ScannerCtx *ctx)
return ctx->limit > 0 && ctx->internal.tinfo.count >= ctx->limit;
}
static void
scanner_cleanup(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
if (ictx->registered_snapshot)
{
UnregisterSnapshot(ctx->snapshot);
ctx->snapshot = NULL;
}
if (NULL != ictx->tinfo.slot)
{
ExecDropSingleTupleTableSlot(ictx->tinfo.slot);
ictx->tinfo.slot = NULL;
}
}
TSDLLEXPORT void
ts_scanner_end_scan(ScannerCtx *ctx)
{
@ -266,32 +323,24 @@ ts_scanner_end_scan(ScannerCtx *ctx)
ctx->postscan(ictx->tinfo.count, ctx->data);
scanner->endscan(ctx);
scanner_cleanup(ctx);
ictx->ended = true;
ictx->started = false;
}
TSDLLEXPORT void
ts_scanner_end_and_close_scan(ScannerCtx *ctx)
ts_scanner_close(ScannerCtx *ctx)
{
InternalScannerCtx *ictx = &ctx->internal;
Scanner *scanner = scanner_ctx_get_scanner(ctx);
if (ictx->closed)
Assert(ctx->internal.ended);
if (NULL != ctx->tablerel)
{
Assert(ictx->ended);
return;
scanner->closescan(ctx);
ctx->tablerel = NULL;
ctx->indexrel = NULL;
}
ts_scanner_end_scan(ctx);
if (ictx->registered_snapshot)
{
UnregisterSnapshot(ctx->snapshot);
ctx->snapshot = NULL;
}
scanner->closescan(ctx);
ExecDropSingleTupleTableSlot(ictx->tinfo.slot);
ictx->closed = true;
}
TSDLLEXPORT TupleInfo *
@ -312,7 +361,7 @@ ts_scanner_next(ScannerCtx *ctx)
TupleTableSlot *slot = ictx->tinfo.slot;
Assert(ctx->snapshot);
ictx->tinfo.lockresult = table_tuple_lock(ictx->tablerel,
ictx->tinfo.lockresult = table_tuple_lock(ctx->tablerel,
&(slot->tts_tid),
ctx->snapshot,
slot,
@ -329,7 +378,11 @@ ts_scanner_next(ScannerCtx *ctx)
is_valid = ts_scanner_limit_reached(ctx) ? false : scanner->getnext(ctx);
}
ts_scanner_end_and_close_scan(ctx);
if (!(ctx->flags & SCANNER_F_NOEND))
ts_scanner_end_scan(ctx);
if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
ts_scanner_close(ctx);
return NULL;
}
@ -353,7 +406,11 @@ ts_scanner_scan(ScannerCtx *ctx)
/* Call tuple_found handler. Abort the scan if the handler wants us to */
if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE)
{
ts_scanner_end_and_close_scan(ctx);
if (!(ctx->flags & SCANNER_F_NOEND))
ts_scanner_end_scan(ctx);
if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
ts_scanner_close(ctx);
break;
}
}

View File

@ -69,17 +69,24 @@ typedef union ScanDesc
TableScanDesc table_scan;
} ScanDesc;
typedef enum ScannerFlags
{
SCANNER_F_NOFLAGS = 0x00,
SCANNER_F_KEEPLOCK = 0x01,
SCANNER_F_NOEND = 0x02,
SCANNER_F_NOEND_AND_NOCLOSE = 0x04 | SCANNER_F_NOEND,
} ScannerFlags;
/*
* InternalScannerCtx is used for internal state during scanning and shouldn't
* be initialized or touched by the user.
*/
typedef struct InternalScannerCtx
{
Relation tablerel, indexrel;
TupleInfo tinfo;
ScanDesc scan;
bool registered_snapshot;
bool closed;
bool started;
bool ended;
} InternalScannerCtx;
@ -89,11 +96,13 @@ typedef struct ScannerCtx
/* Fields below this line can be initialized by the user */
Oid table;
Oid index;
Relation tablerel;
Relation indexrel;
ScanKey scankey;
int flags;
int nkeys, norderbys, limit; /* Limit on number of tuples to return. 0 or
* less means no limit */
bool want_itup;
bool keeplock; /* Keep the table lock after the scan finishes */
LOCKMODE lockmode;
MemoryContext result_mctx; /* The memory context to allocate the result
* on */
@ -132,12 +141,13 @@ typedef struct ScannerCtx
/* Performs an index scan or heap scan and returns the number of matching
* tuples. */
extern TSDLLEXPORT Relation ts_scanner_open(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_close(ScannerCtx *ctx);
extern TSDLLEXPORT int ts_scanner_scan(ScannerCtx *ctx);
extern TSDLLEXPORT bool ts_scanner_scan_one(ScannerCtx *ctx, bool fail_if_not_found,
const char *item_type);
extern TSDLLEXPORT void ts_scanner_start_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_end_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_end_and_close_scan(ScannerCtx *ctx);
extern TSDLLEXPORT void ts_scanner_rescan(ScannerCtx *ctx, const ScanKey scankey);
extern TSDLLEXPORT TupleInfo *ts_scanner_next(ScannerCtx *ctx);
extern TSDLLEXPORT ItemPointer ts_scanner_get_tuple_tid(TupleInfo *ti);

View File

@ -64,6 +64,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT)
RETURNS VOID
AS :MODULE_PATHNAME, 'ts_test_error_injection'
LANGUAGE C VOLATILE STRICT;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT test_error_injection('test_error');
test_error_injection
----------------------
@ -92,3 +93,29 @@ SELECT test_error_injection('test_error');
(1 row)
-- Test Scanner
RESET ROLE;
CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID
AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
SET ROLE :ROLE_DEFAULT_PERM_USER;
-- Create two chunks to scan in the test
CREATE TABLE hyper (time timestamptz, temp float);
SELECT create_hypertable('hyper', 'time');
NOTICE: adding not-null constraint to column "time"
create_hypertable
--------------------
(1,public,hyper,t)
(1 row)
INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0);
SELECT test.scanner();
NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_1_chunk"
NOTICE: 1. Scan: "_timescaledb_internal._hyper_1_2_chunk"
NOTICE: 2. Scan with filter: "_timescaledb_internal._hyper_1_1_chunk"
NOTICE: 3. ReScan: "_timescaledb_internal._hyper_1_2_chunk"
NOTICE: 4. IndexScan: "_timescaledb_internal._hyper_1_2_chunk"
scanner
---------
(1 row)

View File

@ -55,6 +55,7 @@ CREATE OR REPLACE FUNCTION test_error_injection(TEXT)
RETURNS VOID
AS :MODULE_PATHNAME, 'ts_test_error_injection'
LANGUAGE C VOLATILE STRICT;
SET ROLE :ROLE_DEFAULT_PERM_USER;
SELECT test_error_injection('test_error');
@ -65,3 +66,15 @@ SELECT test_error_injection('test_error');
SELECT debug_point_release('test_error');
SELECT test_error_injection('test_error');
-- Test Scanner
RESET ROLE;
CREATE OR REPLACE FUNCTION test.scanner() RETURNS VOID
AS :MODULE_PATHNAME, 'ts_test_scanner' LANGUAGE C IMMUTABLE STRICT PARALLEL SAFE;
SET ROLE :ROLE_DEFAULT_PERM_USER;
-- Create two chunks to scan in the test
CREATE TABLE hyper (time timestamptz, temp float);
SELECT create_hypertable('hyper', 'time');
INSERT INTO hyper VALUES ('2021-01-01', 1.0), ('2022-01-01', 2.0);
SELECT test.scanner();

View File

@ -1,5 +1,11 @@
set(SOURCES adt_tests.c symbol_conflict.c test_time_to_internal.c
test_with_clause_parser.c test_time_utils.c test_utils.c)
set(SOURCES
adt_tests.c
symbol_conflict.c
test_time_to_internal.c
test_with_clause_parser.c
test_time_utils.c
test_utils.c
test_scanner.c)
include(${PROJECT_SOURCE_DIR}/src/build-defs.cmake)

106
test/src/test_scanner.c Normal file
View File

@ -0,0 +1,106 @@
/*
* This file and its contents are licensed under the Apache License 2.0.
* Please see the included NOTICE for copyright information and
* LICENSE-APACHE for a copy of the license.
*/
#include <postgres.h>
#include "scanner.h"
#include "scan_iterator.h"
#include "chunk.h"
#include "test_utils.h"
TS_TEST_FN(ts_test_scanner)
{
ScanIterator it;
Relation chunkrel;
int32 chunk_id[2] = { -1, -1 };
int i = 0;
/* Test pre-open relation */
it = ts_chunk_scan_iterator_create(CurrentMemoryContext);
chunkrel = table_open(it.ctx.table, AccessShareLock);
it.ctx.tablerel = chunkrel;
/* Explicit start scan to test that we can call it twice without
* issue. The loop will also call it */
ts_scan_iterator_start_scan(&it);
ts_scanner_foreach(&it)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&it);
FormData_chunk fd;
ts_chunk_formdata_fill(&fd, ti);
elog(NOTICE, "1. Scan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name));
if (chunk_id[i] == -1 && i < lengthof(chunk_id))
{
chunk_id[i] = fd.id;
i++;
}
}
ts_scan_iterator_end(&it);
/* Add a chunk filter and scan again */
ts_scan_iterator_scan_key_init(&it,
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(chunk_id[0]));
ts_scanner_foreach(&it)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&it);
FormData_chunk fd;
ts_chunk_formdata_fill(&fd, ti);
elog(NOTICE,
"2. Scan with filter: \"%s.%s\"",
NameStr(fd.schema_name),
NameStr(fd.table_name));
}
/* Rescan */
ts_scan_iterator_scan_key_reset(&it);
ts_scan_iterator_scan_key_init(&it,
Anum_chunk_idx_id,
BTEqualStrategyNumber,
F_INT4EQ,
Int32GetDatum(chunk_id[1]));
ts_scan_iterator_rescan(&it);
ts_scanner_foreach(&it)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&it);
FormData_chunk fd;
ts_chunk_formdata_fill(&fd, ti);
elog(NOTICE, "3. ReScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name));
}
ts_scan_iterator_end(&it);
table_close(chunkrel, AccessShareLock);
/* Do another scan, but an index scan this time */
it.ctx.tablerel = NULL;
it.ctx.index = catalog_get_index(ts_catalog_get(), CHUNK, CHUNK_ID_INDEX);
ts_scanner_foreach(&it)
{
TupleInfo *ti = ts_scan_iterator_tuple_info(&it);
FormData_chunk fd;
ts_chunk_formdata_fill(&fd, ti);
elog(NOTICE, "4. IndexScan: \"%s.%s\"", NameStr(fd.schema_name), NameStr(fd.table_name));
}
ts_scan_iterator_close(&it);
PG_RETURN_VOID();
}