Use SPI transaction manipulation functions

The procedures `copy_chunk` and `move_chunk` internally manipulate the
transaction state using plumbing-level commands such as
`StartTransactionCommand` and `CommitTransactionCommand`. Since these
affect the internal state of the SPI execution context, it generates
warnings when used inside a PL/SQL procedures.

This commit fixes this by switching to using SPI-level commands and
connecting and finishing the SPI properly.

Fixes #3495
This commit is contained in:
Mats Kindahl 2021-08-23 10:35:19 +02:00 committed by Mats Kindahl
parent 4e0954e87f
commit e3b9b2bcbe
4 changed files with 102 additions and 11 deletions

View File

@ -25,6 +25,7 @@
#include <funcapi.h> #include <funcapi.h>
#include <miscadmin.h> #include <miscadmin.h>
#include <fmgr.h> #include <fmgr.h>
#include <executor/spi.h>
#if USE_ASSERT_CHECKING #if USE_ASSERT_CHECKING
#include <funcapi.h> #include <funcapi.h>
@ -349,9 +350,9 @@ chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const cha
ts_cache_release(hcache); ts_cache_release(hcache);
MemoryContextSwitchTo(old); MemoryContextSwitchTo(old);
/* Commit to get out of starting transaction */ /* Commit to get out of starting transaction. This will also pop active
PopActiveSnapshot(); * snapshots. */
CommitTransactionCommand(); SPI_commit();
} }
static void static void
@ -361,7 +362,7 @@ chunk_copy_finish(ChunkCopy *cc)
MemoryContextDelete(cc->mcxt); MemoryContextDelete(cc->mcxt);
/* Start a transaction for the final outer transaction */ /* Start a transaction for the final outer transaction */
StartTransactionCommand(); SPI_start_transaction();
} }
static void static void
@ -783,7 +784,7 @@ chunk_copy_execute(ChunkCopy *cc)
*/ */
for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++) for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++)
{ {
StartTransactionCommand(); SPI_start_transaction();
cc->stage = stage; cc->stage = stage;
cc->stage->function(cc); cc->stage->function(cc);
@ -793,7 +794,7 @@ chunk_copy_execute(ChunkCopy *cc)
DEBUG_ERROR_INJECTION(stage->name); DEBUG_ERROR_INJECTION(stage->name);
CommitTransactionCommand(); SPI_commit();
} }
} }
@ -912,7 +913,7 @@ chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx)
/* Cleanup each copy stage in a separate transaction */ /* Cleanup each copy stage in a separate transaction */
do do
{ {
StartTransactionCommand(); SPI_start_transaction();
cc->stage = &chunk_copy_stages[stage_idx]; cc->stage = &chunk_copy_stages[stage_idx];
if (cc->stage->function_cleanup) if (cc->stage->function_cleanup)
@ -924,7 +925,7 @@ chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx)
else else
first = false; first = false;
CommitTransactionCommand(); SPI_commit();
} while (--stage_idx >= 0); } while (--stage_idx >= 0);
} }
@ -974,9 +975,9 @@ chunk_copy_cleanup(const char *operation_id)
errmsg("stage '%s' not found for copy chunk cleanup", errmsg("stage '%s' not found for copy chunk cleanup",
NameStr(cc->fd.completed_stage)))); NameStr(cc->fd.completed_stage))));
/* Commit to get out of starting transaction */ /* Commit to get out of starting transaction, this will also pop active
PopActiveSnapshot(); * snapshots. */
CommitTransactionCommand(); SPI_commit();
/* Run the corresponding cleanup steps to roll back the activity. */ /* Run the corresponding cleanup steps to roll back the activity. */
PG_TRY(); PG_TRY();

View File

@ -51,6 +51,8 @@
#include <utils/snapmgr.h> #include <utils/snapmgr.h>
#include <utils/syscache.h> #include <utils/syscache.h>
#include <utils/tuplesort.h> #include <utils/tuplesort.h>
#include <executor/spi.h>
#include <utils/snapmgr.h>
#include "compat.h" #include "compat.h"
#if PG13_LT #if PG13_LT
@ -210,6 +212,9 @@ tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node)
Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
const char *src_node_name = PG_ARGISNULL(1) ? NULL : NameStr(*PG_GETARG_NAME(1)); const char *src_node_name = PG_ARGISNULL(1) ? NULL : NameStr(*PG_GETARG_NAME(1));
const char *dst_node_name = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); const char *dst_node_name = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2));
int rc;
bool nonatomic = fcinfo->context && IsA(fcinfo->context, CallContext) &&
!castNode(CallContext, fcinfo->context)->atomic;
TS_PREVENT_FUNC_IF_READ_ONLY(); TS_PREVENT_FUNC_IF_READ_ONLY();
@ -224,8 +229,14 @@ tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node)
if (!OidIsValid(chunk_id)) if (!OidIsValid(chunk_id))
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid chunk"))); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid chunk")));
if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc));
/* perform the actual distributed chunk move after a few sanity checks */ /* perform the actual distributed chunk move after a few sanity checks */
chunk_copy(chunk_id, src_node_name, dst_node_name, delete_on_src_node); chunk_copy(chunk_id, src_node_name, dst_node_name, delete_on_src_node);
if ((rc = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc));
} }
Datum Datum
@ -248,6 +259,9 @@ Datum
tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS) tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS)
{ {
const char *operation_id = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0)); const char *operation_id = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0));
int rc;
bool nonatomic = fcinfo->context && IsA(fcinfo->context, CallContext) &&
!castNode(CallContext, fcinfo->context)->atomic;
TS_PREVENT_FUNC_IF_READ_ONLY(); TS_PREVENT_FUNC_IF_READ_ONLY();
@ -259,9 +273,15 @@ tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS)
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid chunk copy operation id"))); errmsg("invalid chunk copy operation id")));
if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT)
elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc));
/* perform the cleanup/repair depending on the stage */ /* perform the cleanup/repair depending on the stage */
chunk_copy_cleanup(operation_id); chunk_copy_cleanup(operation_id);
if ((rc = SPI_finish()) != SPI_OK_FINISH)
elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc));
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -1641,6 +1641,46 @@ SELECT sum(device) FROM dist_test;
846 846
(1 row) (1 row)
-- Check that they can be called from inside a procedure without
-- generating warnings or error messages (#3495).
CREATE OR REPLACE PROCEDURE copy_wrapper(regclass, text, text)
AS $$
BEGIN
CALL timescaledb_experimental.copy_chunk($1, $2, $3);
END
$$
LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE move_wrapper(regclass, text, text)
AS $$
BEGIN
CALL timescaledb_experimental.move_chunk($1, $2, $3);
END
$$
LANGUAGE PLPGSQL;
SELECT chunk_name, replica_nodes, non_replica_nodes
FROM timescaledb_experimental.chunk_replication_status;
chunk_name | replica_nodes | non_replica_nodes
------------------------+---------------------------+---------------------------
_dist_hyper_9_12_chunk | {data_node_1,data_node_3} | {data_node_2}
_dist_hyper_9_13_chunk | {data_node_2} | {data_node_1,data_node_3}
_dist_hyper_9_14_chunk | {data_node_3} | {data_node_1,data_node_2}
_dist_hyper_9_15_chunk | {data_node_1} | {data_node_2,data_node_3}
(4 rows)
CALL copy_wrapper('_timescaledb_internal._dist_hyper_9_14_chunk', 'data_node_3', 'data_node_2');
CALL move_wrapper('_timescaledb_internal._dist_hyper_9_13_chunk', 'data_node_2', 'data_node_1');
SELECT chunk_name, replica_nodes, non_replica_nodes
FROM timescaledb_experimental.chunk_replication_status;
chunk_name | replica_nodes | non_replica_nodes
------------------------+---------------------------+---------------------------
_dist_hyper_9_12_chunk | {data_node_1,data_node_3} | {data_node_2}
_dist_hyper_9_13_chunk | {data_node_1} | {data_node_2,data_node_3}
_dist_hyper_9_14_chunk | {data_node_3,data_node_2} | {data_node_1}
_dist_hyper_9_15_chunk | {data_node_1} | {data_node_2,data_node_3}
(4 rows)
DROP PROCEDURE copy_wrapper;
DROP PROCEDURE move_wrapper;
RESET ROLE; RESET ROLE;
DROP DATABASE :DN_DBNAME_1; DROP DATABASE :DN_DBNAME_1;
DROP DATABASE :DN_DBNAME_2; DROP DATABASE :DN_DBNAME_2;

View File

@ -800,6 +800,36 @@ SELECT * FROM test.remote_exec(NULL, $$ SELECT * from show_chunks('dist_test');
SELECT * FROM test.remote_exec(ARRAY['data_node_3'], $$ SELECT sum(device) FROM _timescaledb_internal._dist_hyper_9_12_chunk; $$); SELECT * FROM test.remote_exec(ARRAY['data_node_3'], $$ SELECT sum(device) FROM _timescaledb_internal._dist_hyper_9_12_chunk; $$);
SELECT sum(device) FROM dist_test; SELECT sum(device) FROM dist_test;
-- Check that they can be called from inside a procedure without
-- generating warnings or error messages (#3495).
CREATE OR REPLACE PROCEDURE copy_wrapper(regclass, text, text)
AS $$
BEGIN
CALL timescaledb_experimental.copy_chunk($1, $2, $3);
END
$$
LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE move_wrapper(regclass, text, text)
AS $$
BEGIN
CALL timescaledb_experimental.move_chunk($1, $2, $3);
END
$$
LANGUAGE PLPGSQL;
SELECT chunk_name, replica_nodes, non_replica_nodes
FROM timescaledb_experimental.chunk_replication_status;
CALL copy_wrapper('_timescaledb_internal._dist_hyper_9_14_chunk', 'data_node_3', 'data_node_2');
CALL move_wrapper('_timescaledb_internal._dist_hyper_9_13_chunk', 'data_node_2', 'data_node_1');
SELECT chunk_name, replica_nodes, non_replica_nodes
FROM timescaledb_experimental.chunk_replication_status;
DROP PROCEDURE copy_wrapper;
DROP PROCEDURE move_wrapper;
RESET ROLE; RESET ROLE;
DROP DATABASE :DN_DBNAME_1; DROP DATABASE :DN_DBNAME_1;
DROP DATABASE :DN_DBNAME_2; DROP DATABASE :DN_DBNAME_2;