From fee27484cec7a01c2a98c16133e364f089689181 Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov Date: Thu, 15 Sep 2022 20:13:15 +0300 Subject: [PATCH] Do not use row-by-row fetcher for parameterized plans We have to prepare the data node statement in this case, and COPY queries don't work with prepared statements. --- src/planner/planner.c | 1 + tsl/src/fdw/scan_exec.c | 21 +++++++++ tsl/src/remote/row_by_row_fetcher.c | 13 +++-- .../shared/expected/dist_fetcher_type.out | 47 +++++++++++++++++++ tsl/test/shared/sql/dist_fetcher_type.sql | 37 +++++++++++++++ 5 files changed, 114 insertions(+), 5 deletions(-) diff --git a/src/planner/planner.c b/src/planner/planner.c index 45414e5ca..0ccd68dff 100644 --- a/src/planner/planner.c +++ b/src/planner/planner.c @@ -615,6 +615,7 @@ timescaledb_planner(Query *parse, int cursor_opts, ParamListInfo bound_params) PG_CATCH(); { ts_baserel_info = NULL; + ts_data_node_fetcher_scan_type = AutoFetcherType; /* Pop the cache, but do not release since caches are auto-released on * error */ planner_hcache_pop(false); diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index 402f02c42..2b5ef2b78 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -152,6 +152,27 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate) } } + /* + * Row-by-row fetcher uses COPY statement that don't work with prepared + * statements. If this plan is parameterized, this means we'll have to + * revert to cursor fetcher. + */ + if (num_params > 0 && fsstate->planned_fetcher_type == RowByRowFetcherType) + { + if (ts_guc_remote_data_fetcher == AutoFetcherType) + { + fsstate->planned_fetcher_type = CursorFetcherType; + } + else + { + ereport(ERROR, + (errmsg("cannot use row-by-row fetcher because the plan is parameterized"), + errhint("Set \"timescaledb.remote_data_fetcher\" to \"cursor\" to explicitly " + "set the fetcher type or use \"auto\" to select the fetcher type " + "automatically."))); + } + } + if (fsstate->planned_fetcher_type == CursorFetcherType) { fetcher = cursor_fetcher_create_for_scan(fsstate->conn, fsstate->query, params, tf); diff --git a/tsl/src/remote/row_by_row_fetcher.c b/tsl/src/remote/row_by_row_fetcher.c index 1295c7cee..3d28e538c 100644 --- a/tsl/src/remote/row_by_row_fetcher.c +++ b/tsl/src/remote/row_by_row_fetcher.c @@ -116,15 +116,18 @@ row_by_row_fetcher_send_fetch_request(DataFetcher *df) fetcher->state.open = true; PGresult *res = PQgetResult(remote_connection_get_pg_conn(fetcher->state.conn)); - if (!res) + if (res == NULL) { - elog(ERROR, "unexpected NULL response when starting COPY mode"); + /* Shouldn't really happen but technically possible. */ + TSConnectionError err; + remote_connection_get_error(fetcher->state.conn, &err); + remote_connection_error_elog(&err, ERROR); } if (PQresultStatus(res) != PGRES_COPY_OUT) { - elog(ERROR, - "unexpected PQresult status %d when starting COPY mode", - PQresultStatus(res)); + TSConnectionError err; + remote_connection_get_result_error(res, &err); + remote_connection_error_elog(&err, ERROR); } PQclear(res); diff --git a/tsl/test/shared/expected/dist_fetcher_type.out b/tsl/test/shared/expected/dist_fetcher_type.out index 61253f30b..d09896dff 100644 --- a/tsl/test/shared/expected/dist_fetcher_type.out +++ b/tsl/test/shared/expected/dist_fetcher_type.out @@ -221,3 +221,50 @@ ORDER BY 1,2; Sun Jan 01 08:01:00 2017 PST | 1 | 7.3 (2 rows) +-- Check that we don't use row-by-row fetcher for parameterized plans. +CREATE TABLE lookup (id SERIAL NOT NULL, key TEXT, val TEXT); +CREATE TABLE metric (ts TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, lookup_id INT NOT NULL); +SELECT 1 FROM create_distributed_hypertable('metric', 'ts'); + ?column? + 1 +(1 row) + +INSERT INTO lookup (key, val) VALUES ('host', 'localhost'); +INSERT INTO metric (ts, val, lookup_id) SELECT s.*, 3.14+1, 1 +FROM generate_series('2021-08-17 00:00:00'::timestamp, '2021-08-17 00:59:59'::timestamp, '1 s'::interval) s; +SELECT + m.ts, + m.val +FROM metric m +WHERE + ARRAY[m.lookup_id] && (SELECT array_agg(l.id)::int[] FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost') + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1; + ts | val +------------------------------+------ + Tue Aug 17 00:59:59 2021 PDT | 4.14 +(1 row) + +SELECT + m.ts, + m.val +FROM metric m +WHERE + m.lookup_id = ANY((SELECT array_agg(l.id) FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost')::int[]) + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1; + ts | val +------------------------------+------ + Tue Aug 17 00:59:59 2021 PDT | 4.14 +(1 row) + +SET timescaledb.remote_data_fetcher = 'rowbyrow'; +SELECT + m.ts, + m.val +FROM metric m +WHERE + m.lookup_id = ANY((SELECT array_agg(l.id) FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost')::int[]) + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1; +ERROR: cannot use row-by-row fetcher because the plan is parameterized diff --git a/tsl/test/shared/sql/dist_fetcher_type.sql b/tsl/test/shared/sql/dist_fetcher_type.sql index 0c8b2ff15..6f381defd 100644 --- a/tsl/test/shared/sql/dist_fetcher_type.sql +++ b/tsl/test/shared/sql/dist_fetcher_type.sql @@ -136,3 +136,40 @@ WHERE EXISTS ( (SELECT device_id FROM metrics_compressed limit 1 offset 3) >= ref_0.device ) ORDER BY 1,2; + +-- Check that we don't use row-by-row fetcher for parameterized plans. +CREATE TABLE lookup (id SERIAL NOT NULL, key TEXT, val TEXT); +CREATE TABLE metric (ts TIMESTAMPTZ NOT NULL, val FLOAT8 NOT NULL, lookup_id INT NOT NULL); +SELECT 1 FROM create_distributed_hypertable('metric', 'ts'); + +INSERT INTO lookup (key, val) VALUES ('host', 'localhost'); +INSERT INTO metric (ts, val, lookup_id) SELECT s.*, 3.14+1, 1 +FROM generate_series('2021-08-17 00:00:00'::timestamp, '2021-08-17 00:59:59'::timestamp, '1 s'::interval) s; + +SELECT + m.ts, + m.val +FROM metric m +WHERE + ARRAY[m.lookup_id] && (SELECT array_agg(l.id)::int[] FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost') + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1; + +SELECT + m.ts, + m.val +FROM metric m +WHERE + m.lookup_id = ANY((SELECT array_agg(l.id) FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost')::int[]) + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1; + +SET timescaledb.remote_data_fetcher = 'rowbyrow'; +SELECT + m.ts, + m.val +FROM metric m +WHERE + m.lookup_id = ANY((SELECT array_agg(l.id) FROM lookup l WHERE l.key = 'host' AND l.val = 'localhost')::int[]) + AND m.ts BETWEEN '2021-08-17 00:00:00' AND '2021-08-17 01:00:00' +ORDER BY 1 DESC LIMIT 1;