From 4ee8872177908f3ae48be1491cb098fae577e54d Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Thu, 31 Mar 2022 17:49:31 +0300 Subject: [PATCH] Use virtual tuples in row-by-row fetcher We needlessly form/deform the heap tuples currently. Sometimes we do need this when we have row marks and need a ctid (UPDATE RETURNING), but not in this case. The implementation has three parts: 1. Change data fetcher interface to store a tuple into given slot instead of returning a heap tuple. 2. Expose the creation of virtual tuple in tuple factory. 3. Use these facilities in row-by-row fetcher. This gives some small speedup. It will become more important in the future, as other parts of row-by-row fetcher are optimized. --- tsl/src/fdw/scan_exec.c | 13 +----- tsl/src/remote/cursor_fetcher.c | 20 ++------ tsl/src/remote/data_fetcher.c | 26 +++++++---- tsl/src/remote/data_fetcher.h | 7 ++- tsl/src/remote/row_by_row_fetcher.c | 71 +++++++++++++++++++++++------ tsl/src/remote/tuplefactory.c | 56 ++++++++++++++--------- tsl/src/remote/tuplefactory.h | 3 ++ 7 files changed, 120 insertions(+), 76 deletions(-) diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index 553a00c4e..4a6e9a50a 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -283,23 +283,12 @@ TupleTableSlot * fdw_scan_iterate(ScanState *ss, TsFdwScanState *fsstate) { TupleTableSlot *slot = ss->ss_ScanTupleSlot; - HeapTuple tuple; DataFetcher *fetcher = fsstate->fetcher; if (NULL == fetcher) fetcher = create_data_fetcher(ss, fsstate); - tuple = fetcher->funcs->get_next_tuple(fetcher); - - if (NULL == tuple) - return ExecClearTuple(slot); - - /* - * Return the next tuple. Must force the tuple into the slot since - * CustomScan initializes ss_ScanTupleSlot to a VirtualTupleTableSlot - * while we're storing a HeapTuple. - */ - ExecForceStoreHeapTuple(tuple, slot, false); + fetcher->funcs->store_next_tuple(fetcher, slot); return slot; } diff --git a/tsl/src/remote/cursor_fetcher.c b/tsl/src/remote/cursor_fetcher.c index d73c292fb..31ab3d0b8 100644 --- a/tsl/src/remote/cursor_fetcher.c +++ b/tsl/src/remote/cursor_fetcher.c @@ -50,8 +50,7 @@ static void cursor_fetcher_send_fetch_request(DataFetcher *df); static int cursor_fetcher_fetch_data(DataFetcher *df); static void cursor_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); static void cursor_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx); -static HeapTuple cursor_fetcher_get_next_tuple(DataFetcher *df); -static HeapTuple cursor_fetcher_get_tuple(DataFetcher *df, int row); +static void cursor_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot); static void cursor_fetcher_rewind(DataFetcher *df); static void cursor_fetcher_close(DataFetcher *df); @@ -60,8 +59,7 @@ static DataFetcherFuncs funcs = { .fetch_data = cursor_fetcher_fetch_data, .set_fetch_size = cursor_fetcher_set_fetch_size, .set_tuple_mctx = cursor_fetcher_set_tuple_memcontext, - .get_next_tuple = cursor_fetcher_get_next_tuple, - .get_tuple = cursor_fetcher_get_tuple, + .store_next_tuple = cursor_fetcher_store_next_tuple, .rewind = cursor_fetcher_rewind, .close = cursor_fetcher_close, }; @@ -384,20 +382,12 @@ cursor_fetcher_fetch_data(DataFetcher *df) return cursor_fetcher_fetch_data_complete(cursor); } -static HeapTuple -cursor_fetcher_get_tuple(DataFetcher *df, int row) +static void +cursor_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot) { CursorFetcher *cursor = cast_fetcher(CursorFetcher, df); - return data_fetcher_get_tuple(&cursor->state, row); -} - -static HeapTuple -cursor_fetcher_get_next_tuple(DataFetcher *df) -{ - CursorFetcher *cursor = cast_fetcher(CursorFetcher, df); - - return data_fetcher_get_next_tuple(&cursor->state); + data_fetcher_store_next_tuple(&cursor->state, slot); } static void diff --git a/tsl/src/remote/data_fetcher.c b/tsl/src/remote/data_fetcher.c index 29412b886..5a55ff4d6 100644 --- a/tsl/src/remote/data_fetcher.c +++ b/tsl/src/remote/data_fetcher.c @@ -51,14 +51,17 @@ data_fetcher_validate(DataFetcher *df) errhint("Shouldn't fetch new data before consuming existing."))); } -HeapTuple -data_fetcher_get_tuple(DataFetcher *df, int row) +void +data_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot) { if (row >= df->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ if (df->eof || df->funcs->fetch_data(df) == 0) - return NULL; + { + ExecClearTuple(slot); + return; + } /* More data was fetched so need to reset row index */ row = 0; @@ -68,20 +71,23 @@ data_fetcher_get_tuple(DataFetcher *df, int row) Assert(df->tuples != NULL); Assert(row >= 0 && row < df->num_tuples); - return df->tuples[row]; + /* + * Return the next tuple. Must force the tuple into the slot since + * CustomScan initializes ss_ScanTupleSlot to a VirtualTupleTableSlot + * while we're storing a HeapTuple. + */ + ExecForceStoreHeapTuple(df->tuples[row], slot, /* shouldFree = */ false); } -HeapTuple -data_fetcher_get_next_tuple(DataFetcher *df) +void +data_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot) { - HeapTuple tuple = data_fetcher_get_tuple(df, df->next_tuple_idx); + data_fetcher_store_tuple(df, df->next_tuple_idx, slot); - if (tuple != NULL) + if (!TupIsNull(slot)) df->next_tuple_idx++; Assert(df->next_tuple_idx <= df->num_tuples); - - return tuple; } void diff --git a/tsl/src/remote/data_fetcher.h b/tsl/src/remote/data_fetcher.h index 69ad5ed89..cf7cbdd43 100644 --- a/tsl/src/remote/data_fetcher.h +++ b/tsl/src/remote/data_fetcher.h @@ -28,8 +28,7 @@ typedef struct DataFetcherFuncs /* Set the fetch (batch) size */ void (*set_fetch_size)(DataFetcher *data_fetcher, int fetch_size); void (*set_tuple_mctx)(DataFetcher *data_fetcher, MemoryContext mctx); - HeapTuple (*get_next_tuple)(DataFetcher *data_fetcher); - HeapTuple (*get_tuple)(DataFetcher *data_fetcher, int row); + void (*store_next_tuple)(DataFetcher *data_fetcher, TupleTableSlot *slot); void (*rewind)(DataFetcher *data_fetcher); void (*close)(DataFetcher *data_fetcher); } DataFetcherFuncs; @@ -67,8 +66,8 @@ extern void data_fetcher_init(DataFetcher *df, TSConnection *conn, const char *s StmtParams *params, Relation rel, ScanState *ss, List *retrieved_attrs); -extern HeapTuple data_fetcher_get_tuple(DataFetcher *df, int row); -extern HeapTuple data_fetcher_get_next_tuple(DataFetcher *df); +extern void data_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot); +extern void data_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot); extern void data_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); extern void data_fetcher_set_tuple_mctx(DataFetcher *df, MemoryContext mctx); extern void data_fetcher_validate(DataFetcher *df); diff --git a/tsl/src/remote/row_by_row_fetcher.c b/tsl/src/remote/row_by_row_fetcher.c index 4b4223a32..256d9d90a 100644 --- a/tsl/src/remote/row_by_row_fetcher.c +++ b/tsl/src/remote/row_by_row_fetcher.c @@ -13,6 +13,10 @@ typedef struct RowByRowFetcher { DataFetcher state; + + /* Data for virtual tuples of the current retrieved batch. */ + Datum *batch_values; + bool *batch_nulls; } RowByRowFetcher; static void row_by_row_fetcher_send_fetch_request(DataFetcher *df); @@ -20,8 +24,7 @@ static void row_by_row_fetcher_reset(RowByRowFetcher *fetcher); static int row_by_row_fetcher_fetch_data(DataFetcher *df); static void row_by_row_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); static void row_by_row_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx); -static HeapTuple row_by_row_fetcher_get_next_tuple(DataFetcher *df); -static HeapTuple row_by_row_fetcher_get_tuple(DataFetcher *df, int row); +static void row_by_row_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot); static void row_by_row_fetcher_rescan(DataFetcher *df); static void row_by_row_fetcher_close(DataFetcher *df); @@ -30,8 +33,7 @@ static DataFetcherFuncs funcs = { .fetch_data = row_by_row_fetcher_fetch_data, .set_fetch_size = row_by_row_fetcher_set_fetch_size, .set_tuple_mctx = row_by_row_fetcher_set_tuple_memcontext, - .get_next_tuple = row_by_row_fetcher_get_next_tuple, - .get_tuple = row_by_row_fetcher_get_tuple, + .store_next_tuple = row_by_row_fetcher_store_next_tuple, .rewind = row_by_row_fetcher_rescan, .close = row_by_row_fetcher_close, }; @@ -149,7 +151,14 @@ row_by_row_fetcher_complete(RowByRowFetcher *fetcher) */ MemoryContextReset(fetcher->state.batch_mctx); oldcontext = MemoryContextSwitchTo(fetcher->state.batch_mctx); - fetcher->state.tuples = palloc0(fetcher->state.fetch_size * sizeof(HeapTuple)); + const int nattrs = tuplefactory_get_nattrs(fetcher->state.tf); + const int total = nattrs * fetcher->state.fetch_size; + fetcher->batch_nulls = palloc(sizeof(bool) * total); + for (int i = 0; i < total; i++) + { + fetcher->batch_nulls[i] = true; + } + fetcher->batch_values = palloc0(sizeof(Datum) * total); PG_TRY(); { @@ -199,8 +208,19 @@ row_by_row_fetcher_complete(RowByRowFetcher *fetcher) * it explicitly, otherwise same as batch_mctx */ MemoryContextSwitchTo(fetcher->state.tuple_mctx); - fetcher->state.tuples[i] = - tuplefactory_make_tuple(fetcher->state.tf, res, 0, PQbinaryTuples(res)); + PG_USED_FOR_ASSERTS_ONLY ItemPointer ctid = + tuplefactory_make_virtual_tuple(fetcher->state.tf, + res, + 0, + PQbinaryTuples(res), + &fetcher->batch_values[i * nattrs], + &fetcher->batch_nulls[i * nattrs]); + + /* + * This fetcher uses virtual tuples that can't hold ctid, so if we're + * receiving a ctid here, we're doing something wrong. + */ + Assert(ctid == NULL); async_response_result_close(response); response = NULL; @@ -259,20 +279,43 @@ row_by_row_fetcher_fetch_data(DataFetcher *df) return row_by_row_fetcher_complete(fetcher); } -static HeapTuple -row_by_row_fetcher_get_tuple(DataFetcher *df, int row) +static void +row_by_row_fetcher_store_tuple(DataFetcher *df, int row, TupleTableSlot *slot) { RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df); - return data_fetcher_get_tuple(&fetcher->state, row); + ExecClearTuple(slot); + + if (row >= df->num_tuples) + { + if (df->eof || df->funcs->fetch_data(df) == 0) + { + return; + } + + row = 0; + Assert(row == df->next_tuple_idx); + } + + Assert(fetcher->batch_values != NULL); + Assert(fetcher->batch_nulls != NULL); + Assert(row >= 0 && row < df->num_tuples); + + const int nattrs = tuplefactory_get_nattrs(fetcher->state.tf); + slot->tts_values = &fetcher->batch_values[nattrs * row]; + slot->tts_isnull = &fetcher->batch_nulls[nattrs * row]; + ExecStoreVirtualTuple(slot); } -static HeapTuple -row_by_row_fetcher_get_next_tuple(DataFetcher *df) +static void +row_by_row_fetcher_store_next_tuple(DataFetcher *df, TupleTableSlot *slot) { - RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df); + row_by_row_fetcher_store_tuple(df, df->next_tuple_idx, slot); - return data_fetcher_get_next_tuple(&fetcher->state); + if (!TupIsNull(slot)) + df->next_tuple_idx++; + + Assert(df->next_tuple_idx <= df->num_tuples); } DataFetcher * diff --git a/tsl/src/remote/tuplefactory.c b/tsl/src/remote/tuplefactory.c index 9f7f015ba..5fdcc70aa 100644 --- a/tsl/src/remote/tuplefactory.c +++ b/tsl/src/remote/tuplefactory.c @@ -231,24 +231,23 @@ tuplefactory_reset_mctx(TupleFactory *tf) MemoryContextReset(tf->temp_mctx); } -HeapTuple -tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format) +int +tuplefactory_get_nattrs(TupleFactory *tf) +{ + return tf->tupdesc->natts; +} + +ItemPointer +tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row, int format, Datum *values, + bool *nulls) { - HeapTuple tuple; ItemPointer ctid = NULL; - MemoryContext oldcontext; ListCell *lc; int j; StringInfo buf; Assert(row < PQntuples(res)); - /* - * Do the following work in a temp context that we reset after each tuple. - * This cleans up not only the data we have direct access to, but any - * cruft the I/O functions might leak. - */ - oldcontext = MemoryContextSwitchTo(tf->temp_mctx); buf = makeStringInfo(); /* Install error callback */ @@ -290,27 +289,27 @@ tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format) { /* ordinary column */ Assert(i <= tf->tupdesc->natts); - tf->nulls[i - 1] = (valstr == NULL); + nulls[i - 1] = (valstr == NULL); if (format == FORMAT_TEXT) { Assert(!tf->attconv->binary); /* Apply the input function even to nulls, to support domains */ - tf->values[i - 1] = InputFunctionCall(&tf->attconv->conv_funcs[i - 1], - valstr, - tf->attconv->ioparams[i - 1], - tf->attconv->typmods[i - 1]); + values[i - 1] = InputFunctionCall(&tf->attconv->conv_funcs[i - 1], + valstr, + tf->attconv->ioparams[i - 1], + tf->attconv->typmods[i - 1]); } else { Assert(tf->attconv->binary); if (valstr != NULL) - tf->values[i - 1] = ReceiveFunctionCall(&tf->attconv->conv_funcs[i - 1], - buf, - tf->attconv->ioparams[i - 1], - tf->attconv->typmods[i - 1]); + values[i - 1] = ReceiveFunctionCall(&tf->attconv->conv_funcs[i - 1], + buf, + tf->attconv->ioparams[i - 1], + tf->attconv->typmods[i - 1]); else - tf->values[i - 1] = PointerGetDatum(NULL); + values[i - 1] = PointerGetDatum(NULL); } } else if (i == SelfItemPointerAttributeNumber) @@ -341,12 +340,27 @@ tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format) if (j > 0 && j != PQnfields(res)) elog(ERROR, "remote query result does not match the foreign table"); + return ctid; +} + +HeapTuple +tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format) +{ + /* + * Do the following work in a temp context that we reset after each tuple. + * This cleans up not only the data we have direct access to, but any + * cruft the I/O functions might leak. + */ + MemoryContext oldcontext = MemoryContextSwitchTo(tf->temp_mctx); + + ItemPointer ctid = tuplefactory_make_virtual_tuple(tf, res, row, format, tf->values, tf->nulls); + /* * Build the result tuple in caller's memory context. */ MemoryContextSwitchTo(oldcontext); - tuple = heap_form_tuple(tf->tupdesc, tf->values, tf->nulls); + HeapTuple tuple = heap_form_tuple(tf->tupdesc, tf->values, tf->nulls); /* * If we have a CTID to return, install it in both t_self and t_ctid. diff --git a/tsl/src/remote/tuplefactory.h b/tsl/src/remote/tuplefactory.h index 74cd84299..dd2790cf3 100644 --- a/tsl/src/remote/tuplefactory.h +++ b/tsl/src/remote/tuplefactory.h @@ -21,8 +21,11 @@ extern TupleFactory *tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool for extern TupleFactory *tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs); extern TupleFactory *tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs); extern HeapTuple tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row, int format); +extern ItemPointer tuplefactory_make_virtual_tuple(TupleFactory *tf, PGresult *res, int row, + int format, Datum *values, bool *nulls); extern bool tuplefactory_is_binary(TupleFactory *tf); extern void tuplefactory_set_per_tuple_mctx_reset(TupleFactory *tf, bool reset); extern void tuplefactory_reset_mctx(TupleFactory *tf); +extern int tuplefactory_get_nattrs(TupleFactory *tf); #endif /* TIMESCALEDB_TSL_REMOTE_TUPLEFACTORY_H */