mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-14 17:43:34 +08:00
Fix file trailer handling in the COPY fetcher
The copy fetcher fetches tuples in batches. When the last element in the batch is the file trailer, the trailer was not handled correctly. The existing logic did not perform a PQgetCopyData in that case. Therefore the state of the fetcher was not set to EOF and the copy operation was not correctly finished at this point. Fixes: #5323
This commit is contained in:
parent
a854b2760f
commit
7b8177aa74
@ -13,6 +13,10 @@ accidentally triggering the load of a previous DB version.**
|
||||
|
||||
**Bugfixes**
|
||||
* #5396 Fix SEGMENTBY columns predicates to be pushed down
|
||||
* #5410 Fix file trailer handling in the COPY fetcher
|
||||
|
||||
**Thanks**
|
||||
* @nikolaps for reporting an issue with the COPY fetcher
|
||||
|
||||
## 2.10.1 (2023-03-07)
|
||||
|
||||
|
@ -333,6 +333,50 @@ copy_fetcher_read_fetch_response(CopyFetcher *fetcher)
|
||||
PQclear(res);
|
||||
}
|
||||
|
||||
/*
|
||||
* Read the next data from the connection and store the data in copy_data.
|
||||
* If no data can be read return false, or throw an error, otherwise
|
||||
* return true.
|
||||
*/
|
||||
static bool
|
||||
copy_fetcher_read_data(CopyFetcher *fetcher, PGconn *conn, char *volatile *dataptr,
|
||||
StringInfoData *copy_data)
|
||||
{
|
||||
copy_data->len = PQgetCopyData(conn,
|
||||
©_data->data,
|
||||
/* async = */ false);
|
||||
|
||||
/* Set dataptr to ensure data is freed with PQfreemem() in
|
||||
* PG_CATCH() clause in case error is thrown. */
|
||||
*dataptr = copy_data->data;
|
||||
|
||||
if (copy_data->len == -1)
|
||||
{
|
||||
/* Note: it is possible to get EOF without having received the
|
||||
* file trailer in case there's e.g., a remote error. */
|
||||
fetcher->state.eof = true;
|
||||
|
||||
/* Should read final result with PQgetResult() until it
|
||||
* returns NULL. This happens later in end_copy. */
|
||||
return false;
|
||||
}
|
||||
else if (copy_data->len == -2)
|
||||
{
|
||||
/*
|
||||
* Error. The docs say: consult PQerrorMessage() for the reason.
|
||||
* remote_connection_elog() will do this for us.
|
||||
*/
|
||||
remote_connection_elog(fetcher->state.conn, ERROR);
|
||||
|
||||
/* remote_connection_elog should raise an ERROR */
|
||||
pg_unreachable();
|
||||
}
|
||||
|
||||
copy_data->maxlen = copy_data->len;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Process response for ongoing async request
|
||||
*/
|
||||
@ -378,34 +422,12 @@ copy_fetcher_complete(CopyFetcher *fetcher)
|
||||
MemoryContextSwitchTo(fetcher->state.req_mctx);
|
||||
|
||||
StringInfoData copy_data = { 0 };
|
||||
bool tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data);
|
||||
|
||||
copy_data.len = PQgetCopyData(conn,
|
||||
©_data.data,
|
||||
/* async = */ false);
|
||||
|
||||
/* Set dataptr to ensure data is freed with PQfreemem() in
|
||||
* PG_CATCH() clause in case error is thrown. */
|
||||
dataptr = copy_data.data;
|
||||
|
||||
if (copy_data.len == -1)
|
||||
{
|
||||
/* Note: it is possible to get EOF without having received the
|
||||
* file trailer in case there's e.g., a remote error. */
|
||||
fetcher->state.eof = true;
|
||||
/* Should read final result with PQgetResult() until it
|
||||
* returns NULL. This happens below. */
|
||||
/* Were we able to fetch new data? */
|
||||
if (!tuple_read)
|
||||
break;
|
||||
}
|
||||
else if (copy_data.len == -2)
|
||||
{
|
||||
/*
|
||||
* Error. The docs say: consult PQerrorMessage() for the reason.
|
||||
* remote_connection_elog() will do this for us.
|
||||
*/
|
||||
remote_connection_elog(fetcher->state.conn, ERROR);
|
||||
}
|
||||
|
||||
copy_data.maxlen = copy_data.len;
|
||||
Assert(copy_data.cursor == 0);
|
||||
|
||||
if (fetcher->state.batch_count == 0 && row == 0)
|
||||
@ -432,7 +454,16 @@ copy_fetcher_complete(CopyFetcher *fetcher)
|
||||
/* Next PQgetCopyData() should return -1, indicating EOF and
|
||||
* that the remote side ended the copy. The final result
|
||||
* (PGRES_COMMAND_OK) should then be read with
|
||||
* PQgetResult(). */
|
||||
* PQgetResult().
|
||||
*
|
||||
* Perform a PQgetCopyData directly in this branch because
|
||||
* if row = state.fetch_size - 1 (i.e., file_trailer is the last
|
||||
* tuple of the batch), the for loop will not executed
|
||||
* and PQgetCopyData will never be called.
|
||||
*/
|
||||
tuple_read = copy_fetcher_read_data(fetcher, conn, &dataptr, ©_data);
|
||||
Assert(tuple_read == false);
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -510,15 +541,14 @@ copy_fetcher_complete(CopyFetcher *fetcher)
|
||||
dataptr = NULL;
|
||||
}
|
||||
|
||||
/* Don't count the file trailer as a row if this was the last batch */
|
||||
fetcher->state.num_tuples = fetcher->file_trailer_received ? row - 1 : row;
|
||||
fetcher->state.num_tuples = row;
|
||||
fetcher->state.next_tuple_idx = 0;
|
||||
|
||||
/* Must be EOF if we didn't get as many tuples as we asked for. */
|
||||
#ifdef USE_ASSERT_CHECKING
|
||||
if (fetcher->state.num_tuples < fetcher->state.fetch_size)
|
||||
{
|
||||
Assert(fetcher->state.eof);
|
||||
}
|
||||
#endif
|
||||
|
||||
fetcher->state.batch_count++;
|
||||
|
||||
|
@ -41,6 +41,26 @@ SELECT setseed(1);
|
||||
INSERT INTO disttable
|
||||
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
|
||||
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;
|
||||
-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
|
||||
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
|
||||
-- copy batch is the file trailer (#5323).
|
||||
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
|
||||
SELECT create_distributed_hypertable('one_batch', 'ts');
|
||||
create_distributed_hypertable
|
||||
-------------------------------
|
||||
(2,public,one_batch,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;
|
||||
-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
|
||||
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
|
||||
SELECT create_distributed_hypertable('one_batch_default', 'ts');
|
||||
create_distributed_hypertable
|
||||
--------------------------------
|
||||
(3,public,one_batch_default,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;
|
||||
SET client_min_messages TO error;
|
||||
-- Set a smaller fetch size to ensure that the result is split into
|
||||
-- mutliple batches.
|
||||
@ -59,6 +79,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
|
||||
FROM disttable
|
||||
GROUP BY 1,2
|
||||
ORDER BY 1,2;
|
||||
-- Test for #5323 - ensure that no NULL tuples are generated
|
||||
-- if the last element of the batch is the file trailer.
|
||||
SELECT count(*), count(value) FROM one_batch;
|
||||
SELECT count(*), count(value) FROM one_batch_default;
|
||||
\o
|
||||
\set ON_ERROR_STOP 1
|
||||
-- run queries using cursor fetcher
|
||||
@ -74,6 +98,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
|
||||
FROM disttable
|
||||
GROUP BY 1,2
|
||||
ORDER BY 1,2;
|
||||
-- Test for #5323 - ensure that no NULL tuples are generated
|
||||
-- if the last element of the batch is the file trailer.
|
||||
SELECT count(*), count(value) FROM one_batch;
|
||||
SELECT count(*), count(value) FROM one_batch_default;
|
||||
\o
|
||||
-- compare results
|
||||
:DIFF_CMD
|
||||
|
@ -31,6 +31,18 @@ INSERT INTO disttable
|
||||
SELECT t, (abs(timestamp_hash(t::timestamp)) % 10) + 1, random() * 10
|
||||
FROM generate_series('2019-01-01'::timestamptz, '2019-01-02'::timestamptz, '1 second') as t;
|
||||
|
||||
-- This table contains the content for precisely one batch of the copy fetcher. The fetch_size
|
||||
-- will be set to 100 below and this table contains 99 tuples and the last element on the first
|
||||
-- copy batch is the file trailer (#5323).
|
||||
CREATE table one_batch(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
|
||||
SELECT create_distributed_hypertable('one_batch', 'ts');
|
||||
INSERT INTO one_batch SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 99, 1) AS g1(sensor_id) ORDER BY time;
|
||||
|
||||
-- Same but for the DEFAULT_FDW_FETCH_SIZE (10000)
|
||||
CREATE table one_batch_default(ts timestamptz NOT NULL, sensor_id int NOT NULL, value float NOT NULL);
|
||||
SELECT create_distributed_hypertable('one_batch_default', 'ts');
|
||||
INSERT INTO one_batch_default SELECT '2023-01-01'::timestamptz AS time, sensor_id, random() AS value FROM generate_series(1, 9999, 1) AS g1(sensor_id) ORDER BY time;
|
||||
|
||||
SET client_min_messages TO error;
|
||||
|
||||
-- Set a smaller fetch size to ensure that the result is split into
|
||||
|
@ -10,3 +10,10 @@ SELECT time_bucket('1 hour', time) AS time, device, avg(temp)
|
||||
FROM disttable
|
||||
GROUP BY 1,2
|
||||
ORDER BY 1,2;
|
||||
|
||||
-- Test for #5323 - ensure that no NULL tuples are generated
|
||||
-- if the last element of the batch is the file trailer.
|
||||
SELECT count(*), count(value) FROM one_batch;
|
||||
|
||||
SELECT count(*), count(value) FROM one_batch_default;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user