diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 909561000..5c128f7a0 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -505,6 +506,7 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) refresh_window.end = ts_time_get_noend_or_max(refresh_window.type); continuous_agg_refresh_internal(cagg, &refresh_window, CAGG_REFRESH_WINDOW); + PG_RETURN_VOID(); } @@ -573,6 +575,11 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, int64 computed_invalidation_threshold; int64 invalidation_threshold; int64 max_bucket_width; + int rc; + + /* Connect to SPI manager due to the underlying SPI calls */ + if ((rc = SPI_connect_ext(SPI_OPT_NONATOMIC) != SPI_OK_CONNECT)) + elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); /* Like regular materialized views, require owner to refresh. */ if (!pg_class_ownercheck(cagg->relid, GetUserId())) @@ -651,29 +658,26 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg, if (refresh_window.start >= refresh_window.end) { emit_up_to_date_notice(cagg, callctx); + + if ((rc = SPI_finish()) != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc)); + return; } /* Process invalidations in the hypertable invalidation log */ invalidation_process_hypertable_log(cagg, refresh_window.type); - /* Start a new transaction. Note that this invalidates previous memory - * allocations (and locks). */ - PopActiveSnapshot(); - CommitTransactionCommand(); - /* See PG commit:84f5c2908dad81e8622b0406beea580e40bb03ac - * When we manage multiple transactions in a procedure, ensure that - * an active outer snapshot exists prior to executing SPI - * (see EnsurePortalSnapshotExists) - */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); + /* Commit and Start a new transaction */ + SPI_commit_and_chain(); cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id); if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID)) emit_up_to_date_notice(cagg, callctx); - PopActiveSnapshot(); + + if ((rc = SPI_finish()) != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc)); } /* diff --git a/tsl/test/expected/bgw_custom.out b/tsl/test/expected/bgw_custom.out index 2932ceada..af5f764e3 100644 --- a/tsl/test/expected/bgw_custom.out +++ b/tsl/test/expected/bgw_custom.out @@ -279,7 +279,8 @@ BEGIN LOOP SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r; IF (r.total_failures > 0) THEN - EXIT; + RAISE INFO 'wait_for_job_to_run: job execution failed'; + RETURN false; ELSEIF (r.total_successes = expected_runs) THEN RETURN true; ELSEIF (r.total_successes > expected_runs) THEN @@ -288,6 +289,7 @@ BEGIN PERFORM pg_sleep(0.1); END IF; END LOOP; + RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins; RETURN false; END $BODY$; @@ -322,6 +324,12 @@ BEGIN COMMIT; END $$; +CREATE OR REPLACE PROCEDURE custom_proc5(job_id int, args jsonb) LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('conditions_summary_daily', '2021-08-01 00:00', '2021-08-31 00:00'); +END +$$; -- Remove any default jobs, e.g., telemetry \c :TEST_DBNAME :ROLE_SUPERUSER TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE; @@ -378,6 +386,7 @@ TRUNCATE custom_log; -- Forced Exception SELECT add_job('custom_proc4', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_3 \gset SELECT wait_for_job_to_run(:job_id_3, 1); +INFO: wait_for_job_to_run: job execution failed wait_for_job_to_run --------------------- f @@ -398,12 +407,12 @@ SELECT delete_job(:job_id_3); (1 row) CREATE TABLE conditions ( - time TIMESTAMPTZ NOT NULL, + time TIMESTAMP NOT NULL, location TEXT NOT NULL, location2 char(10) NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL -); +) WITH (autovacuum_enabled = FALSE); SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval); create_hypertable ------------------------- @@ -444,6 +453,34 @@ SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name; public | conditions | _timescaledb_internal | _hyper_1_3_chunk | Compressed | 8192 | 16384 | 8192 | 32768 | 8192 | 16384 | 8192 | 32768 (3 rows) +-- Decompress chunks before create the cagg +SELECT decompress_chunk(c) FROM show_chunks('conditions') c; + decompress_chunk +---------------------------------------- + _timescaledb_internal._hyper_1_1_chunk + _timescaledb_internal._hyper_1_2_chunk + _timescaledb_internal._hyper_1_3_chunk +(3 rows) + +-- Continuous Aggregate +CREATE MATERIALIZED VIEW conditions_summary_daily +WITH (timescaledb.continuous) AS +SELECT location, + time_bucket(INTERVAL '1 day', time) AS bucket, + AVG(temperature), + MAX(temperature), + MIN(temperature) +FROM conditions +GROUP BY location, bucket +WITH NO DATA; +-- Refresh Continous Aggregate by Job +SELECT add_job('custom_proc5', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_5 \gset +SELECT wait_for_job_to_run(:job_id_5, 1); + wait_for_job_to_run +--------------------- + t +(1 row) + -- Stop Background Workers SELECT _timescaledb_internal.stop_background_workers(); stop_background_workers diff --git a/tsl/test/sql/bgw_custom.sql b/tsl/test/sql/bgw_custom.sql index 475444075..60db64b43 100644 --- a/tsl/test/sql/bgw_custom.sql +++ b/tsl/test/sql/bgw_custom.sql @@ -70,7 +70,6 @@ CALL run_job(1004); SELECT * FROM custom_log ORDER BY job_id, extra; - \set ON_ERROR_STOP 0 -- test bad input SELECT delete_job(NULL); @@ -131,7 +130,8 @@ BEGIN LOOP SELECT total_successes, total_failures FROM _timescaledb_internal.bgw_job_stat WHERE job_id=job_param_id INTO r; IF (r.total_failures > 0) THEN - EXIT; + RAISE INFO 'wait_for_job_to_run: job execution failed'; + RETURN false; ELSEIF (r.total_successes = expected_runs) THEN RETURN true; ELSEIF (r.total_successes > expected_runs) THEN @@ -140,6 +140,7 @@ BEGIN PERFORM pg_sleep(0.1); END IF; END LOOP; + RAISE INFO 'wait_for_job_to_run: timeout after % tries', spins; RETURN false; END $BODY$; @@ -179,6 +180,13 @@ BEGIN END $$; +CREATE OR REPLACE PROCEDURE custom_proc5(job_id int, args jsonb) LANGUAGE PLPGSQL AS +$$ +BEGIN + CALL refresh_continuous_aggregate('conditions_summary_daily', '2021-08-01 00:00', '2021-08-31 00:00'); +END +$$; + -- Remove any default jobs, e.g., telemetry \c :TEST_DBNAME :ROLE_SUPERUSER TRUNCATE _timescaledb_config.bgw_job RESTART IDENTITY CASCADE; @@ -215,13 +223,15 @@ SELECT * FROM custom_log ORDER BY job_id, extra; SELECT delete_job(:job_id_3); CREATE TABLE conditions ( - time TIMESTAMPTZ NOT NULL, + time TIMESTAMP NOT NULL, location TEXT NOT NULL, location2 char(10) NOT NULL, temperature DOUBLE PRECISION NULL, humidity DOUBLE PRECISION NULL -); +) WITH (autovacuum_enabled = FALSE); + SELECT create_hypertable('conditions', 'time', chunk_time_interval := '15 days'::interval); + ALTER TABLE conditions SET ( timescaledb.compress, @@ -241,5 +251,24 @@ SELECT wait_for_job_to_run(:job_id_4, 1); -- Chunk compress stats SELECT * FROM _timescaledb_internal.compressed_chunk_stats ORDER BY chunk_name; +-- Decompress chunks before create the cagg +SELECT decompress_chunk(c) FROM show_chunks('conditions') c; + +-- Continuous Aggregate +CREATE MATERIALIZED VIEW conditions_summary_daily +WITH (timescaledb.continuous) AS +SELECT location, + time_bucket(INTERVAL '1 day', time) AS bucket, + AVG(temperature), + MAX(temperature), + MIN(temperature) +FROM conditions +GROUP BY location, bucket +WITH NO DATA; + +-- Refresh Continous Aggregate by Job +SELECT add_job('custom_proc5', '1h', config := '{"type":"procedure"}'::jsonb, initial_start := now()) AS job_id_5 \gset +SELECT wait_for_job_to_run(:job_id_5, 1); + -- Stop Background Workers SELECT _timescaledb_internal.stop_background_workers();