Fix failure on job execution by background worker

If the registered procedure has COMMIT inside the code, the execution
of the job by the background worker was failing.

Fixed it checking if there are an ActivePortal, if NO then create a
Portal from scratch and execute the existing job execution code.
This commit is contained in:
Fabrízio de Royes Mello 2021-09-14 16:58:02 -03:00
parent 9ee9a1e1c0
commit e22aaad48c
3 changed files with 333 additions and 21 deletions

View File

@ -16,8 +16,10 @@
#include <nodes/primnodes.h>
#include <parser/parse_func.h>
#include <parser/parser.h>
#include <tcop/pquery.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <utils/portal.h>
#include <utils/syscache.h>
#include <utils/snapmgr.h>
#include <utils/timestamp.h>
@ -32,6 +34,7 @@
#include "bgw_policy/policy_utils.h"
#include "bgw_policy/reorder_api.h"
#include "bgw_policy/retention_api.h"
#include "compat/compat.h"
#include "compression/compress_utils.h"
#include "continuous_aggs/materialize.h"
#include "continuous_aggs/refresh.h"
@ -752,25 +755,30 @@ bool
job_execute(BgwJob *job)
{
Const *arg1, *arg2;
bool transaction_started = false;
bool pushed_snapshot = false;
bool portal_created = false;
char prokind;
Oid proc;
ObjectWithArgs *object;
FuncExpr *funcexpr;
MemoryContext parent_ctx = CurrentMemoryContext;
StringInfo query;
if (!IsTransactionOrTransactionBlock())
{
transaction_started = true;
StartTransactionCommand();
}
Portal portal = ActivePortal;
/* executing sql functions requires snapshot. */
if (!ActiveSnapshotSet())
/* Create a portal if there's no active */
if (!PortalIsValid(portal))
{
pushed_snapshot = true;
portal_created = true;
portal = CreatePortal("", true, true);
portal->visible = false;
portal->resowner = CurrentResourceOwner;
ActivePortal = portal;
StartTransactionCommand();
#if (PG12 && PG_VERSION_NUM >= 120008) || (PG13 && PG_VERSION_NUM >= 130004) || PG14_GE
EnsurePortalSnapshotExists();
#else
PushActiveSnapshot(GetTransactionSnapshot());
#endif
}
object = makeNode(ObjectWithArgs);
@ -829,16 +837,15 @@ job_execute(BgwJob *job)
break;
}
/* Both checks are needed: if the executed procedure commit the
* transaction---which `continuous_agg_refresh_internal` does, for
* example---it will remove the active snapshot and start a new
* transaction with no active snapshots. In that case, we should not pop a
* snapshot. */
if (pushed_snapshot && ActiveSnapshotSet())
PopActiveSnapshot();
if (transaction_started)
/* Drop portal if it was created */
if (portal_created)
{
if (ActiveSnapshotSet())
PopActiveSnapshot();
CommitTransactionCommand();
PortalDrop(portal, false);
ActivePortal = NULL;
}
return true;
}

View File

@ -245,7 +245,7 @@ SELECT add_job( proc=>'custom_func',
1005
(1 row)
SELECT job_id, next_start, scheduled, schedule_interval
SELECT job_id, next_start, scheduled, schedule_interval
FROM timescaledb_information.jobs WHERE job_id > 1000;
job_id | next_start | scheduled | schedule_interval
--------+------------------------------+-----------+-------------------
@ -269,3 +269,185 @@ total_successes | 0
total_failures | 0
\x
-- tests for #3545
CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
DECLARE
r RECORD;
BEGIN
FOR i in 1..spins
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r;
IF (r.total_failures > 0) THEN
EXIT;
ELSEIF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
RAISE 'num_runs > expected';
ELSE
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RETURN false;
END
$BODY$;
TRUNCATE custom_log;
-- Nested procedure call
CREATE OR REPLACE PROCEDURE custom_proc_nested(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 1 COMMIT');
COMMIT;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 2 ROLLBACK');
ROLLBACK;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 3 COMMIT');
COMMIT;
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc3(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
CALL custom_proc_nested(job_id, args);
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc4(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 1 COMMIT');
COMMIT;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 2 ROLLBACK');
ROLLBACK;
RAISE EXCEPTION 'forced exception';
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 3 ABORT');
COMMIT;
END
$$;
-- Remove any default jobs, e.g., telemetry
\c :TEST_DBNAME :ROLE_SUPERUSER
TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE;
NOTICE: truncate cascades to table "bgw_job_stat"
NOTICE: truncate cascades to table "bgw_policy_chunk_stats"
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
SELECT add_job('custom_proc2', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_1 \gset
SELECT add_job('custom_proc3', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_2 \gset
\c :TEST_DBNAME :ROLE_SUPERUSER
-- Start Background Workers
SELECT _timescaledb_internal.start_background_workers();
start_background_workers
--------------------------
t
(1 row)
-- Wait for jobs
SELECT wait_for_job_to_run(:job_id_1, 1);
wait_for_job_to_run
---------------------
t
(1 row)
SELECT wait_for_job_to_run(:job_id_2, 1);
wait_for_job_to_run
---------------------
t
(1 row)
-- Check results
SELECT * FROM custom_log ORDER BY job_id, extra;
job_id | args | extra | runner
--------+-----------------------+-----------------------------+-------------------
1000 | {"type": "procedure"} | custom_proc 1 COMMIT | default_perm_user
1000 | {"type": "procedure"} | custom_proc 3 COMMIT | default_perm_user
1001 | {"type": "procedure"} | custom_proc_nested 1 COMMIT | default_perm_user
1001 | {"type": "procedure"} | custom_proc_nested 3 COMMIT | default_perm_user
(4 rows)
-- Delete previous jobs
SELECT delete_job(:job_id_1);
delete_job
------------
(1 row)
SELECT delete_job(:job_id_2);
delete_job
------------
(1 row)
TRUNCATE custom_log;
-- Forced Exception
SELECT add_job('custom_proc4', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_3 \gset
SELECT wait_for_job_to_run(:job_id_3, 1);
wait_for_job_to_run
---------------------
f
(1 row)
-- Check results
SELECT * FROM custom_log ORDER BY job_id, extra;
job_id | args | extra | runner
--------+-----------------------+-----------------------+------------
1002 | {"type": "procedure"} | custom_proc4 1 COMMIT | super_user
(1 row)
-- Delete previous jobs
SELECT delete_job(:job_id_3);
delete_job
------------
(1 row)
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval);
create_hypertable
-------------------------
(1,public,conditions,t)
(1 row)
ALTER TABLE conditions
SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'location',
timescaledb.compress_orderby = 'time'
);
INSERT INTO conditions
SELECT generate_series('2021-08-01 00:00'::timestamp, '2021-08-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
-- Chunk compress stats
SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
hypertable_schema | hypertable_name | chunk_schema | chunk_name | compression_status | uncompressed_heap_size | uncompressed_index_size | uncompressed_toast_size | uncompressed_total_size | compressed_heap_size | compressed_index_size | compressed_toast_size | compressed_total_size
-------------------+-----------------+-----------------------+------------------+--------------------+------------------------+-------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-----------------------
public | conditions | _timescaledb_internal | _hyper_1_1_chunk | Uncompressed | | | | | | | |
public | conditions | _timescaledb_internal | _hyper_1_2_chunk | Uncompressed | | | | | | | |
public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Uncompressed | | | | | | | |
(3 rows)
-- Compression policy
SELECT add_compression_policy('conditions', interval '1 day') AS job_id_4 \gset
SELECT wait_for_job_to_run(:job_id_4, 1);
wait_for_job_to_run
---------------------
t
(1 row)
-- Chunk compress stats
SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
hypertable_schema | hypertable_name | chunk_schema | chunk_name | compression_status | uncompressed_heap_size | uncompressed_index_size | uncompressed_toast_size | uncompressed_total_size | compressed_heap_size | compressed_index_size | compressed_toast_size | compressed_total_size
-------------------+-----------------+-----------------------+------------------+--------------------+------------------------+-------------------------+-------------------------+-------------------------+----------------------+-----------------------+-----------------------+-----------------------
public | conditions | _timescaledb_internal | _hyper_1_1_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
public | conditions | _timescaledb_internal | _hyper_1_2_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
(3 rows)
-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers
-------------------------
t
(1 row)

View File

@ -115,8 +115,131 @@ SELECT * FROM timescaledb_information.jobs WHERE job_id = 1;
SELECT add_job( proc=>'custom_func',
schedule_interval=>'1h', initial_start =>'2018-01-01 10:00:00-05');
SELECT job_id, next_start, scheduled, schedule_interval
SELECT job_id, next_start, scheduled, schedule_interval
FROM timescaledb_information.jobs WHERE job_id > 1000;
\x
SELECT * FROM timescaledb_information.job_stats WHERE job_id > 1000;
\x
-- tests for #3545
CREATE FUNCTION wait_for_job_to_run(job_param_id INTEGER, expected_runs INTEGER, spins INTEGER=:TEST_SPINWAIT_ITERS) RETURNS BOOLEAN LANGUAGE PLPGSQL AS
$BODY$
DECLARE
r RECORD;
BEGIN
FOR i in 1..spins
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r;
IF (r.total_failures > 0) THEN
EXIT;
ELSEIF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
RAISE 'num_runs > expected';
ELSE
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RETURN false;
END
$BODY$;
TRUNCATE custom_log;
-- Nested procedure call
CREATE OR REPLACE PROCEDURE custom_proc_nested(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 1 COMMIT');
COMMIT;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 2 ROLLBACK');
ROLLBACK;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc_nested 3 COMMIT');
COMMIT;
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc3(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
CALL custom_proc_nested(job_id, args);
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc4(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 1 COMMIT');
COMMIT;
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 2 ROLLBACK');
ROLLBACK;
RAISE EXCEPTION 'forced exception';
INSERT INTO custom_log VALUES($1, $2, 'custom_proc4 3 ABORT');
COMMIT;
END
$$;
-- Remove any default jobs, e.g., telemetry
\c :TEST_DBNAME :ROLE_SUPERUSER
TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE;
\c :TEST_DBNAME :ROLE_DEFAULT_PERM_USER
SELECT add_job('custom_proc2', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_1 \gset
SELECT add_job('custom_proc3', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_2 \gset
\c :TEST_DBNAME :ROLE_SUPERUSER
-- Start Background Workers
SELECT _timescaledb_internal.start_background_workers();
-- Wait for jobs
SELECT wait_for_job_to_run(:job_id_1, 1);
SELECT wait_for_job_to_run(:job_id_2, 1);
-- Check results
SELECT * FROM custom_log ORDER BY job_id, extra;
-- Delete previous jobs
SELECT delete_job(:job_id_1);
SELECT delete_job(:job_id_2);
TRUNCATE custom_log;
-- Forced Exception
SELECT add_job('custom_proc4', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_3 \gset
SELECT wait_for_job_to_run(:job_id_3, 1);
-- Check results
SELECT * FROM custom_log ORDER BY job_id, extra;
-- Delete previous jobs
SELECT delete_job(:job_id_3);
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval);
ALTER TABLE conditions
SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'location',
timescaledb.compress_orderby = 'time'
);
INSERT INTO conditions
SELECT generate_series('2021-08-01 00:00'::timestamp, '2021-08-31 00:00'::timestamp, '1 day'), 'POR', 'klick', 55, 75;
-- Chunk compress stats
SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
-- Compression policy
SELECT add_compression_policy('conditions', interval '1 day') AS job_id_4 \gset
SELECT wait_for_job_to_run(:job_id_4, 1);
-- Chunk compress stats
SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();