diff --git a/tsl/src/chunk_copy.c b/tsl/src/chunk_copy.c index 7592d1359..5f377cd80 100644 --- a/tsl/src/chunk_copy.c +++ b/tsl/src/chunk_copy.c @@ -25,6 +25,7 @@ #include #include #include +#include #if USE_ASSERT_CHECKING #include @@ -349,9 +350,9 @@ chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const cha ts_cache_release(hcache); MemoryContextSwitchTo(old); - /* Commit to get out of starting transaction */ - PopActiveSnapshot(); - CommitTransactionCommand(); + /* Commit to get out of starting transaction. This will also pop active + * snapshots. */ + SPI_commit(); } static void @@ -361,7 +362,7 @@ chunk_copy_finish(ChunkCopy *cc) MemoryContextDelete(cc->mcxt); /* Start a transaction for the final outer transaction */ - StartTransactionCommand(); + SPI_start_transaction(); } static void @@ -783,7 +784,7 @@ chunk_copy_execute(ChunkCopy *cc) */ for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++) { - StartTransactionCommand(); + SPI_start_transaction(); cc->stage = stage; cc->stage->function(cc); @@ -793,7 +794,7 @@ chunk_copy_execute(ChunkCopy *cc) DEBUG_ERROR_INJECTION(stage->name); - CommitTransactionCommand(); + SPI_commit(); } } @@ -912,7 +913,7 @@ chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx) /* Cleanup each copy stage in a separate transaction */ do { - StartTransactionCommand(); + SPI_start_transaction(); cc->stage = &chunk_copy_stages[stage_idx]; if (cc->stage->function_cleanup) @@ -924,7 +925,7 @@ chunk_copy_cleanup_internal(ChunkCopy *cc, int stage_idx) else first = false; - CommitTransactionCommand(); + SPI_commit(); } while (--stage_idx >= 0); } @@ -974,9 +975,9 @@ chunk_copy_cleanup(const char *operation_id) errmsg("stage '%s' not found for copy chunk cleanup", NameStr(cc->fd.completed_stage)))); - /* Commit to get out of starting transaction */ - PopActiveSnapshot(); - CommitTransactionCommand(); + /* Commit to get out of starting transaction, this will also pop active + * snapshots. */ + SPI_commit(); /* Run the corresponding cleanup steps to roll back the activity. */ PG_TRY(); diff --git a/tsl/src/reorder.c b/tsl/src/reorder.c index bb8c3f2f2..a8dd9496a 100644 --- a/tsl/src/reorder.c +++ b/tsl/src/reorder.c @@ -51,6 +51,8 @@ #include #include #include +#include +#include #include "compat.h" #if PG13_LT @@ -210,6 +212,9 @@ tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node) Oid chunk_id = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0); const char *src_node_name = PG_ARGISNULL(1) ? NULL : NameStr(*PG_GETARG_NAME(1)); const char *dst_node_name = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); + int rc; + bool nonatomic = fcinfo->context && IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; TS_PREVENT_FUNC_IF_READ_ONLY(); @@ -224,8 +229,14 @@ tsl_copy_or_move_chunk_proc(FunctionCallInfo fcinfo, bool delete_on_src_node) if (!OidIsValid(chunk_id)) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid chunk"))); + if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); + /* perform the actual distributed chunk move after a few sanity checks */ chunk_copy(chunk_id, src_node_name, dst_node_name, delete_on_src_node); + + if ((rc = SPI_finish()) != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc)); } Datum @@ -248,6 +259,9 @@ Datum tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS) { const char *operation_id = PG_ARGISNULL(0) ? NULL : NameStr(*PG_GETARG_NAME(0)); + int rc; + bool nonatomic = fcinfo->context && IsA(fcinfo->context, CallContext) && + !castNode(CallContext, fcinfo->context)->atomic; TS_PREVENT_FUNC_IF_READ_ONLY(); @@ -259,9 +273,15 @@ tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS) (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid chunk copy operation id"))); + if ((rc = SPI_connect_ext(nonatomic ? SPI_OPT_NONATOMIC : 0)) != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed: %s", SPI_result_code_string(rc)); + /* perform the cleanup/repair depending on the stage */ chunk_copy_cleanup(operation_id); + if ((rc = SPI_finish()) != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed: %s", SPI_result_code_string(rc)); + PG_RETURN_VOID(); } diff --git a/tsl/test/expected/data_node.out b/tsl/test/expected/data_node.out index 90bdeabee..ee142bca8 100644 --- a/tsl/test/expected/data_node.out +++ b/tsl/test/expected/data_node.out @@ -1641,6 +1641,46 @@ SELECT sum(device) FROM dist_test; 846 (1 row) +-- Check that they can be called from inside a procedure without +-- generating warnings or error messages (#3495). +CREATE OR REPLACE PROCEDURE copy_wrapper(regclass, text, text) +AS $$ +BEGIN + CALL timescaledb_experimental.copy_chunk($1, $2, $3); +END +$$ +LANGUAGE PLPGSQL; +CREATE OR REPLACE PROCEDURE move_wrapper(regclass, text, text) +AS $$ +BEGIN + CALL timescaledb_experimental.move_chunk($1, $2, $3); +END +$$ +LANGUAGE PLPGSQL; +SELECT chunk_name, replica_nodes, non_replica_nodes +FROM timescaledb_experimental.chunk_replication_status; + chunk_name | replica_nodes | non_replica_nodes +------------------------+---------------------------+--------------------------- + _dist_hyper_9_12_chunk | {data_node_1,data_node_3} | {data_node_2} + _dist_hyper_9_13_chunk | {data_node_2} | {data_node_1,data_node_3} + _dist_hyper_9_14_chunk | {data_node_3} | {data_node_1,data_node_2} + _dist_hyper_9_15_chunk | {data_node_1} | {data_node_2,data_node_3} +(4 rows) + +CALL copy_wrapper('_timescaledb_internal._dist_hyper_9_14_chunk', 'data_node_3', 'data_node_2'); +CALL move_wrapper('_timescaledb_internal._dist_hyper_9_13_chunk', 'data_node_2', 'data_node_1'); +SELECT chunk_name, replica_nodes, non_replica_nodes +FROM timescaledb_experimental.chunk_replication_status; + chunk_name | replica_nodes | non_replica_nodes +------------------------+---------------------------+--------------------------- + _dist_hyper_9_12_chunk | {data_node_1,data_node_3} | {data_node_2} + _dist_hyper_9_13_chunk | {data_node_1} | {data_node_2,data_node_3} + _dist_hyper_9_14_chunk | {data_node_3,data_node_2} | {data_node_1} + _dist_hyper_9_15_chunk | {data_node_1} | {data_node_2,data_node_3} +(4 rows) + +DROP PROCEDURE copy_wrapper; +DROP PROCEDURE move_wrapper; RESET ROLE; DROP DATABASE :DN_DBNAME_1; DROP DATABASE :DN_DBNAME_2; diff --git a/tsl/test/sql/data_node.sql b/tsl/test/sql/data_node.sql index 4d17a81fb..41aa86be7 100644 --- a/tsl/test/sql/data_node.sql +++ b/tsl/test/sql/data_node.sql @@ -800,6 +800,36 @@ SELECT * FROM test.remote_exec(NULL, $$ SELECT * from show_chunks('dist_test'); SELECT * FROM test.remote_exec(ARRAY['data_node_3'], $$ SELECT sum(device) FROM _timescaledb_internal._dist_hyper_9_12_chunk; $$); SELECT sum(device) FROM dist_test; +-- Check that they can be called from inside a procedure without +-- generating warnings or error messages (#3495). +CREATE OR REPLACE PROCEDURE copy_wrapper(regclass, text, text) +AS $$ +BEGIN + CALL timescaledb_experimental.copy_chunk($1, $2, $3); +END +$$ +LANGUAGE PLPGSQL; + +CREATE OR REPLACE PROCEDURE move_wrapper(regclass, text, text) +AS $$ +BEGIN + CALL timescaledb_experimental.move_chunk($1, $2, $3); +END +$$ +LANGUAGE PLPGSQL; + +SELECT chunk_name, replica_nodes, non_replica_nodes +FROM timescaledb_experimental.chunk_replication_status; + +CALL copy_wrapper('_timescaledb_internal._dist_hyper_9_14_chunk', 'data_node_3', 'data_node_2'); +CALL move_wrapper('_timescaledb_internal._dist_hyper_9_13_chunk', 'data_node_2', 'data_node_1'); + +SELECT chunk_name, replica_nodes, non_replica_nodes +FROM timescaledb_experimental.chunk_replication_status; + +DROP PROCEDURE copy_wrapper; +DROP PROCEDURE move_wrapper; + RESET ROLE; DROP DATABASE :DN_DBNAME_1; DROP DATABASE :DN_DBNAME_2;