Fix execution of refresh_caggs from user actions

Segmentation fault was ocurring when calling the procedure
`refresh_continous_aggregate` from an user defined action (job).

Fixed it by adding the `SPI_connect_ext/SPI_finish` during the
execution because there are underlying SPI calls that was leading
us to an invalid SPI state (nonexistent `_SPI_current` global).

Fixes #3145
This commit is contained in:
Fabrízio de Royes Mello 2021-09-23 17:44:13 -03:00
parent 27d547853d
commit b3380e8174
3 changed files with 89 additions and 19 deletions

View File

@ -14,6 +14,7 @@
#include <storage/lmgr.h>
#include <miscadmin.h>
#include <fmgr.h>
#include <executor/spi.h>
#include <catalog.h>
#include <continuous_agg.h>
@ -505,6 +506,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
refresh_window.end = ts_time_get_noend_or_max(refresh_window.type);
continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW);
PG_RETURN_VOID();
}
@ -573,6 +575,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
int64 computed_invalidation_threshold;
int64 invalidation_threshold;
int64 max_bucket_width;
int rc;
/* Connect to SPI manager due to the underlying SPI calls */
if ((rc = SPI_connect_ext(SPI_OPT_NONATOMIC) != SPI_OK_CONNECT))
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc));
/* Like regular materialized views, require owner to refresh. */
if (!pg_class_ownercheck(cagg->relid, GetUserId()))
@ -651,29 +658,26 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
if (refresh_window.start >= refresh_window.end)
{
emit_up_to_date_notice(cagg, callctx);
if ((rc = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc));
return;
}
/* Process invalidations in the hypertable invalidation log */
invalidation_process_hypertable_log(cagg, refresh_window.type);
/* Start a new transaction. Note that this invalidates previous memory
* allocations (and locks). */
PopActiveSnapshot();
CommitTransactionCommand();
/* See PG commit:84f5c2908dad81e8622b0406beea580e40bb03ac
* When we manage multiple transactions in a procedure, ensure that
* an active outer snapshot exists prior to executing SPI
* (see EnsurePortalSnapshotExists)
*/
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/* Commit and Start a new transaction */
SPI_commit_and_chain();
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id);
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg, callctx);
PopActiveSnapshot();
if ((rc = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc));
}
/*

View File

@ -279,7 +279,8 @@ BEGIN
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r;
IF (r.total_failures > 0) THEN
EXIT;
RAISE INFO 'wait_for_job_to_run: job execution failed';
RETURN false;
ELSEIF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
@ -288,6 +289,7 @@ BEGIN
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins;
RETURN false;
END
$BODY$;
@ -322,6 +324,12 @@ BEGIN
COMMIT;
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc5(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('conditions_summary_daily', '2021-08-01 00:00', '2021-08-31 00:00');
END
$$;
-- Remove any default jobs, e.g., telemetry
\c :TEST_DBNAME :ROLE_SUPERUSER
TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE;
@ -378,6 +386,7 @@ TRUNCATE custom_log;
-- Forced Exception
SELECT add_job('custom_proc4', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_3 \gset
SELECT wait_for_job_to_run(:job_id_3, 1);
INFO: wait_for_job_to_run: job execution failed
wait_for_job_to_run
---------------------
f
@ -398,12 +407,12 @@ SELECT delete_job(:job_id_3);
(1 row)
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
time TIMESTAMP NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
) WITH (autovacuum_enabled = FALSE);
SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval);
create_hypertable
-------------------------
@ -444,6 +453,34 @@ SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768
(3 rows)
-- Decompress chunks before create the cagg
SELECT decompress_chunk(c) FROM show_chunks('conditions') c;
decompress_chunk
----------------------------------------
_timescaledb_internal._hyper_1_1_chunk
_timescaledb_internal._hyper_1_2_chunk
_timescaledb_internal._hyper_1_3_chunk
(3 rows)
-- Continuous Aggregate
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT location,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket
WITH NO DATA;
-- Refresh Continous Aggregate by Job
SELECT add_job('custom_proc5', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_5 \gset
SELECT wait_for_job_to_run(:job_id_5, 1);
wait_for_job_to_run
---------------------
t
(1 row)
-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();
stop_background_workers

View File

@ -70,7 +70,6 @@ CALL run_job(1004);
SELECT * FROM custom_log ORDER BY job_id, extra;
\set ON_ERROR_STOP 0
-- test bad input
SELECT delete_job(NULL);
@ -131,7 +130,8 @@ BEGIN
LOOP
SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r;
IF (r.total_failures > 0) THEN
EXIT;
RAISE INFO 'wait_for_job_to_run: job execution failed';
RETURN false;
ELSEIF (r.total_successes = expected_runs) THEN
RETURN true;
ELSEIF (r.total_successes > expected_runs) THEN
@ -140,6 +140,7 @@ BEGIN
PERFORM pg_sleep(0.1);
END IF;
END LOOP;
RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins;
RETURN false;
END
$BODY$;
@ -179,6 +180,13 @@ BEGIN
END
$$;
CREATE OR REPLACE PROCEDURE custom_proc5(job_id int, args jsonb) LANGUAGE PLPGSQL AS
$$
BEGIN
CALL refresh_continuous_aggregate('conditions_summary_daily', '2021-08-01 00:00', '2021-08-31 00:00');
END
$$;
-- Remove any default jobs, e.g., telemetry
\c :TEST_DBNAME :ROLE_SUPERUSER
TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE;
@ -215,13 +223,15 @@ SELECT * FROM custom_log ORDER BY job_id, extra;
SELECT delete_job(:job_id_3);
CREATE TABLE conditions (
time TIMESTAMPTZ NOT NULL,
time TIMESTAMP NOT NULL,
location TEXT NOT NULL,
location2 char(10) NOT NULL,
temperature DOUBLE PRECISION NULL,
humidity DOUBLE PRECISION NULL
);
) WITH (autovacuum_enabled = FALSE);
SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval);
ALTER TABLE conditions
SET (
timescaledb.compress,
@ -241,5 +251,24 @@ SELECT wait_for_job_to_run(:job_id_4, 1);
-- Chunk compress stats
SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name;
-- Decompress chunks before create the cagg
SELECT decompress_chunk(c) FROM show_chunks('conditions') c;
-- Continuous Aggregate
CREATE MATERIALIZED VIEW conditions_summary_daily
WITH (timescaledb.continuous) AS
SELECT location,
time_bucket(INTERVAL '1 day', time) AS bucket,
AVG(temperature),
MAX(temperature),
MIN(temperature)
FROM conditions
GROUP BY location, bucket
WITH NO DATA;
-- Refresh Continous Aggregate by Job
SELECT add_job('custom_proc5', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_5 \gset
SELECT wait_for_job_to_run(:job_id_5, 1);
-- Stop Background Workers
SELECT _timescaledb_internal.stop_background_workers();