mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-14 17:43:34 +08:00
Improve transaction check in CAgg refresh
Procedures that use multiple transactions cannot be run in a transaction block (from a function, from dynamic SQL) or in a subtransaction (from a procedure block with an EXCEPTION clause). Such procedures use PreventInTransactionBlock function to check whether they can be run. Though currently such checks are incompete, because PreventInTransactionBlock requires isTopLevel argument to throw a consistent error when the call originates from a function. This isTopLevel flag (that is a bit poorly named - see below) is not readily available inside C procedures. The source of truth for it - ProcessUtilityContext parameter is passed to ProcessUtility hooks, but is not included with the function calls. There is an undocumented SPI_inside_nonatomic_context function, that would have been sufficient for isTopLevel flag, but it currently returns false when SPI connection is absent (that is a valid scenario when C procedures are called from top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be used. To work around this the value of ProcessUtilityContext parameter is saved when TS ProcessUtility hook is entered and can be accessed from C procedures using new ts_process_utility_is_context_nonatomic function. The result is called "non-atomic" instead of "top-level" because the way how isTopLevel flag is determined from the ProcessUtilityContext value in standard_ProcessUtility is insufficient for C procedures - it excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from PLPG procedure without an EXCEPTION clause) that is a valid use case for C procedures with transactions. See details in the description of ExecuteCallStmt function. It is expected that calls to C procedures are done with CALL and always pass though the ProcessUtility hook. The ProcessUtilityContext parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In unlikely case when a C procedure is called without passing through ProcessUtility hook and the call is done in atomic context, then PreventInTransactionBlock checks will pass, but SPI_commit will fail when checking that all current active snapshots are portal-owned snapshots (the same behaviour that was observed before this change). In atomic context there will be an additional snapshot set in _SPI_execute_plan, see the snapshot handling invariants description in that function. Closes #6533.
This commit is contained in:
parent
60ecd46052
commit
85437c0b2f
2
.unreleased/pr_7566
Normal file
2
.unreleased/pr_7566
Normal file
@ -0,0 +1,2 @@
|
||||
Fixes: #7566 Improve transaction check in CAgg refresh
|
||||
Thanks: @staticlibs for sending PR to improve transaction check in CAgg refresh
|
@ -93,6 +93,7 @@ void _process_utility_fini(void);
|
||||
static ProcessUtility_hook_type prev_ProcessUtility_hook;
|
||||
|
||||
static bool expect_chunk_modification = false;
|
||||
static ProcessUtilityContext last_process_utility_context = PROCESS_UTILITY_TOPLEVEL;
|
||||
static DDLResult process_altertable_set_options(AlterTableCmd *cmd, Hypertable *ht);
|
||||
static DDLResult process_altertable_reset_options(AlterTableCmd *cmd, Hypertable *ht);
|
||||
|
||||
@ -111,6 +112,13 @@ prev_ProcessUtility(ProcessUtilityArgs *args)
|
||||
args->queryEnv,
|
||||
args->dest,
|
||||
args->completion_tag);
|
||||
|
||||
/*
|
||||
* Reset the last_process_utility_context value that is saved at the
|
||||
* entrance of the TS ProcessUtility hook and can be used for transaction
|
||||
* checks inside refresh_cagg and other procedures.
|
||||
*/
|
||||
ts_process_utility_context_reset();
|
||||
}
|
||||
|
||||
static void
|
||||
@ -5048,6 +5056,8 @@ timescaledb_ddl_command_start(PlannedStmt *pstmt, const char *query_string, bool
|
||||
QueryEnvironment *queryEnv, DestReceiver *dest,
|
||||
QueryCompletion *completion_tag)
|
||||
{
|
||||
last_process_utility_context = context;
|
||||
|
||||
ProcessUtilityArgs args = { .query_string = query_string,
|
||||
.context = context,
|
||||
.params = params,
|
||||
@ -5173,6 +5183,19 @@ ts_process_utility_set_expect_chunk_modification(bool expect)
|
||||
expect_chunk_modification = expect;
|
||||
}
|
||||
|
||||
bool
|
||||
ts_process_utility_is_context_nonatomic(void)
|
||||
{
|
||||
ProcessUtilityContext context = last_process_utility_context;
|
||||
return context == PROCESS_UTILITY_TOPLEVEL || context == PROCESS_UTILITY_QUERY_NONATOMIC;
|
||||
}
|
||||
|
||||
void
|
||||
ts_process_utility_context_reset(void)
|
||||
{
|
||||
last_process_utility_context = PROCESS_UTILITY_TOPLEVEL;
|
||||
}
|
||||
|
||||
static void
|
||||
process_utility_xact_abort(XactEvent event, void *arg)
|
||||
{
|
||||
|
@ -36,3 +36,56 @@ typedef enum
|
||||
typedef DDLResult (*ts_process_utility_handler_t)(ProcessUtilityArgs *args);
|
||||
|
||||
extern void ts_process_utility_set_expect_chunk_modification(bool expect);
|
||||
|
||||
/*
|
||||
* Procedures that use multiple transactions cannot be run in a transaction
|
||||
* block (from a function, from dynamic SQL) or in a subtransaction (from a
|
||||
* procedure block with an EXCEPTION clause). Such procedures use
|
||||
* PreventInTransactionBlock function to check whether they can be run.
|
||||
*
|
||||
* Though currently such checks are incomplete, because
|
||||
* PreventInTransactionBlock requires isTopLevel argument to throw a
|
||||
* consistent error when the call originates from a function. This
|
||||
* isTopLevel flag (that is a bit poorly named - see below) is not readily
|
||||
* available inside C procedures. The source of truth for it -
|
||||
* ProcessUtilityContext parameter is passed to ProcessUtility hooks, but
|
||||
* is not included with the function calls. There is an undocumented
|
||||
* SPI_inside_nonatomic_context function, that would have been sufficient
|
||||
* for isTopLevel flag, but it currently returns false when SPI connection
|
||||
* is absent (that is a valid scenario when C procedures are called from
|
||||
* top-lelev SQL instead of PLPG procedures or DO blocks) so it cannot be
|
||||
* used.
|
||||
*
|
||||
* To work around this the value of ProcessUtilityContext parameter is
|
||||
* saved when TS ProcessUtility hook is entered and can be accessed from
|
||||
* C procedures using new ts_process_utility_is_context_nonatomic function.
|
||||
* The result is called "non-atomic" instead of "top-level" because the way
|
||||
* how isTopLevel flag is determined from the ProcessUtilityContext value
|
||||
* in standard_ProcessUtility is insufficient for C procedures - it
|
||||
* excludes PROCESS_UTILITY_QUERY_NONATOMIC value (used when called from
|
||||
* PLPG procedure without an EXCEPTION clause) that is a valid use case for
|
||||
* C procedures with transactions. See details in the description of
|
||||
* ExecuteCallStmt function.
|
||||
*
|
||||
* It is expected that calls to C procedures are done with CALL and always
|
||||
* pass though the ProcessUtility hook. The ProcessUtilityContext
|
||||
* parameter is set to PROCESS_UTILITY_TOPLEVEL value by default. In
|
||||
* unlikely case when a C procedure is called without passing through
|
||||
* ProcessUtility hook and the call is done in atomic context, then
|
||||
* PreventInTransactionBlock checks will pass, but SPI_commit will fail
|
||||
* when checking that all current active snapshots are portal-owned
|
||||
* snapshots (the same behaviour that was observed before this change).
|
||||
* In atomic context there will be an additional snapshot set in
|
||||
* _SPI_execute_plan, see the snapshot handling invariants description
|
||||
* in that function.
|
||||
*/
|
||||
extern TSDLLEXPORT bool ts_process_utility_is_context_nonatomic(void);
|
||||
|
||||
/*
|
||||
* Currently in TS ProcessUtility hook the saved ProcessUtilityContext
|
||||
* value is reset back to PROCESS_UTILITY_TOPLEVEL on normal exit but
|
||||
* is NOT reset in case of ereport exit. C procedures can call this
|
||||
* function to reset the saved value before doing the checks that can
|
||||
* result in ereport exit.
|
||||
*/
|
||||
extern TSDLLEXPORT void ts_process_utility_context_reset(void);
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "invalidation.h"
|
||||
#include "invalidation_threshold.h"
|
||||
#include "materialize.h"
|
||||
#include "process_utility.h"
|
||||
#include "refresh.h"
|
||||
|
||||
#define CAGG_REFRESH_LOG_LEVEL (callctx == CAGG_REFRESH_POLICY ? LOG : DEBUG1)
|
||||
@ -768,6 +769,24 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
||||
int32 mat_id = cagg->data.mat_hypertable_id;
|
||||
InternalTimeRange refresh_window = *refresh_window_arg;
|
||||
int64 invalidation_threshold;
|
||||
bool nonatomic = ts_process_utility_is_context_nonatomic();
|
||||
|
||||
/* Reset the saved ProcessUtilityContext value promptly before
|
||||
* calling Prevent* checks so the potential unsupported (atomic)
|
||||
* value won't linger there in case of ereport exit.
|
||||
*/
|
||||
ts_process_utility_context_reset();
|
||||
|
||||
PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME);
|
||||
|
||||
/* Prevent running refresh if we're in a transaction block since a refresh
|
||||
* can run two transactions and might take a long time to release locks if
|
||||
* there's a lot to materialize. Strictly, it is optional to prohibit
|
||||
* transaction blocks since there will be only one transaction if the
|
||||
* invalidation threshold needs no update. However, materialization might
|
||||
* still take a long time and it is probably best for consistency to always
|
||||
* prevent transaction blocks. */
|
||||
PreventInTransactionBlock(nonatomic, REFRESH_FUNCTION_NAME);
|
||||
|
||||
/* Connect to SPI manager due to the underlying SPI calls */
|
||||
int rc = SPI_connect_ext(SPI_OPT_NONATOMIC);
|
||||
@ -784,17 +803,6 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
|
||||
get_relkind_objtype(get_rel_relkind(cagg->relid)),
|
||||
get_rel_name(cagg->relid));
|
||||
|
||||
PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME);
|
||||
|
||||
/* Prevent running refresh if we're in a transaction block since a refresh
|
||||
* can run two transactions and might take a long time to release locks if
|
||||
* there's a lot to materialize. Strictly, it is optional to prohibit
|
||||
* transaction blocks since there will be only one transaction if the
|
||||
* invalidation threshold needs no update. However, materialization might
|
||||
* still take a long time and it is probably best for consistency to always
|
||||
* prevent transaction blocks. */
|
||||
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
|
||||
|
||||
/* No bucketing when open ended */
|
||||
if (!(start_isnull && end_isnull))
|
||||
{
|
||||
|
@ -535,3 +535,68 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
|
||||
FROM conditions
|
||||
GROUP BY 1,2 WITH NO DATA;
|
||||
COMMIT;
|
||||
-- refresh_continuous_aggregate can run two transactions, thus it cannot be
|
||||
-- called in a transaction block (from a function, from dynamic SQL) or in a
|
||||
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
|
||||
-- does NOT require a top level context and can be called from a procedure
|
||||
-- block without an EXCEPTION clause.
|
||||
-- DO block
|
||||
DO $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
psql:include/cagg_refresh_common.sql:347: NOTICE: continuous aggregate "daily_temp" is already up-to-date
|
||||
-- Procedure without subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
CALL refresh_cagg_proc_normal();
|
||||
psql:include/cagg_refresh_common.sql:357: NOTICE: continuous aggregate "daily_temp" is already up-to-date
|
||||
\set ON_ERROR_STOP 0
|
||||
-- Procedure with subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
DECLARE
|
||||
errmsg TEXT;
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
|
||||
RAISE EXCEPTION '%', errmsg;
|
||||
END; $$;
|
||||
CALL refresh_cagg_proc_subtransaction();
|
||||
psql:include/cagg_refresh_common.sql:374: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block
|
||||
-- Function
|
||||
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
|
||||
RETURNS INT LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
RETURN 1;
|
||||
END; $$;
|
||||
SELECT * from refresh_cagg_fun();
|
||||
psql:include/cagg_refresh_common.sql:385: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
-- Dynamic SQL
|
||||
DO $$
|
||||
BEGIN
|
||||
EXECUTE $inner$
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
$inner$;
|
||||
END; $$;
|
||||
psql:include/cagg_refresh_common.sql:393: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
-- Trigger
|
||||
CREATE TABLE refresh_cagg_trigger_table(a int);
|
||||
CREATE FUNCTION refresh_cagg_trigger_fun()
|
||||
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
|
||||
EXECUTE FUNCTION refresh_cagg_trigger_fun();
|
||||
INSERT INTO refresh_cagg_trigger_table VALUES(1);
|
||||
psql:include/cagg_refresh_common.sql:407: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
\set ON_ERROR_STOP 1
|
||||
|
@ -536,6 +536,71 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
|
||||
FROM conditions
|
||||
GROUP BY 1,2 WITH NO DATA;
|
||||
COMMIT;
|
||||
-- refresh_continuous_aggregate can run two transactions, thus it cannot be
|
||||
-- called in a transaction block (from a function, from dynamic SQL) or in a
|
||||
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
|
||||
-- does NOT require a top level context and can be called from a procedure
|
||||
-- block without an EXCEPTION clause.
|
||||
-- DO block
|
||||
DO $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
psql:include/cagg_refresh_common.sql:347: NOTICE: continuous aggregate "daily_temp" is already up-to-date
|
||||
-- Procedure without subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
CALL refresh_cagg_proc_normal();
|
||||
psql:include/cagg_refresh_common.sql:357: NOTICE: continuous aggregate "daily_temp" is already up-to-date
|
||||
\set ON_ERROR_STOP 0
|
||||
-- Procedure with subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
DECLARE
|
||||
errmsg TEXT;
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
|
||||
RAISE EXCEPTION '%', errmsg;
|
||||
END; $$;
|
||||
CALL refresh_cagg_proc_subtransaction();
|
||||
psql:include/cagg_refresh_common.sql:374: ERROR: refresh_continuous_aggregate() cannot run inside a transaction block
|
||||
-- Function
|
||||
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
|
||||
RETURNS INT LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
RETURN 1;
|
||||
END; $$;
|
||||
SELECT * from refresh_cagg_fun();
|
||||
psql:include/cagg_refresh_common.sql:385: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
-- Dynamic SQL
|
||||
DO $$
|
||||
BEGIN
|
||||
EXECUTE $inner$
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
$inner$;
|
||||
END; $$;
|
||||
psql:include/cagg_refresh_common.sql:393: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
-- Trigger
|
||||
CREATE TABLE refresh_cagg_trigger_table(a int);
|
||||
CREATE FUNCTION refresh_cagg_trigger_fun()
|
||||
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
|
||||
EXECUTE FUNCTION refresh_cagg_trigger_fun();
|
||||
INSERT INTO refresh_cagg_trigger_table VALUES(1);
|
||||
psql:include/cagg_refresh_common.sql:407: ERROR: refresh_continuous_aggregate() cannot be executed from a function
|
||||
\set ON_ERROR_STOP 1
|
||||
-- Additional tests for MERGE refresh
|
||||
DROP TABLE conditions CASCADE;
|
||||
NOTICE: drop cascades to 10 other objects
|
||||
|
@ -333,3 +333,77 @@ SELECT time_bucket('7 days', time) AS day, device, avg(temp) AS avg_temp
|
||||
FROM conditions
|
||||
GROUP BY 1,2 WITH NO DATA;
|
||||
COMMIT;
|
||||
|
||||
-- refresh_continuous_aggregate can run two transactions, thus it cannot be
|
||||
-- called in a transaction block (from a function, from dynamic SQL) or in a
|
||||
-- subtransaction (from a procedure block with an EXCEPTION clause). Though it
|
||||
-- does NOT require a top level context and can be called from a procedure
|
||||
-- block without an EXCEPTION clause.
|
||||
|
||||
-- DO block
|
||||
DO $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
|
||||
-- Procedure without subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_normal()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
|
||||
CALL refresh_cagg_proc_normal();
|
||||
|
||||
\set ON_ERROR_STOP 0
|
||||
|
||||
-- Procedure with subtransaction
|
||||
CREATE OR REPLACE PROCEDURE refresh_cagg_proc_subtransaction()
|
||||
LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
DECLARE
|
||||
errmsg TEXT;
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS errmsg = MESSAGE_TEXT;
|
||||
RAISE EXCEPTION '%', errmsg;
|
||||
END; $$;
|
||||
|
||||
CALL refresh_cagg_proc_subtransaction();
|
||||
|
||||
-- Function
|
||||
CREATE OR REPLACE FUNCTION refresh_cagg_fun()
|
||||
RETURNS INT LANGUAGE PLPGSQL AS
|
||||
$$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
RETURN 1;
|
||||
END; $$;
|
||||
|
||||
SELECT * from refresh_cagg_fun();
|
||||
|
||||
-- Dynamic SQL
|
||||
DO $$
|
||||
BEGIN
|
||||
EXECUTE $inner$
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
$inner$;
|
||||
END; $$;
|
||||
|
||||
-- Trigger
|
||||
CREATE TABLE refresh_cagg_trigger_table(a int);
|
||||
|
||||
CREATE FUNCTION refresh_cagg_trigger_fun()
|
||||
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
|
||||
BEGIN
|
||||
CALL refresh_continuous_aggregate('daily_temp', '2020-05-03 00:00 UTC', '2020-05-04 00:00 UTC');
|
||||
END; $$;
|
||||
|
||||
CREATE TRIGGER refresh_cagg_trigger AFTER INSERT ON refresh_cagg_trigger_table
|
||||
EXECUTE FUNCTION refresh_cagg_trigger_fun();
|
||||
|
||||
INSERT INTO refresh_cagg_trigger_table VALUES(1);
|
||||
|
||||
\set ON_ERROR_STOP 1
|
||||
|
Loading…
x
Reference in New Issue
Block a user