Make move_chunk use AN txns on DN

We used to run transactions in autocommit mode on DN while running the
chunk copy/move activity. This meant that any failures on the access
node were de-coupled from the activity on the DN. This can make future
cleanup messy since we wouldn't know what's failed/succeeded on the
data nodes.

We now drive the entire activity via transactions started on the access
node.
This commit is contained in:
Nikhil 2021-06-29 11:54:14 +05:30 committed by Dmitry Simonenko
parent 38c1781748
commit 478404def5

View File

@ -400,7 +400,7 @@ chunk_copy_stage_create_publication(ChunkCopy *cc)
NameStr(cc->chunk->fd.table_name)));
/* Create the publication in autocommit mode */
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
static void
@ -415,7 +415,7 @@ chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
cmd = psprintf("SELECT pg_create_logical_replication_slot('%s', 'pgoutput')",
NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
static void
@ -432,7 +432,7 @@ chunk_copy_stage_create_subscription(ChunkCopy *cc)
NameStr(cc->fd.operation_id),
connection_string,
NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
}
static void
@ -442,29 +442,52 @@ chunk_copy_stage_sync_start(ChunkCopy *cc)
/* Start data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
}
static void
chunk_copy_stage_sync(ChunkCopy *cc)
{
const char *cmd;
char *cmd;
/*
* Transaction blocks run in REPEATABLE READ mode in the connection pool.
* However this wait_subscription_sync procedure needs to refresh the subcription
* sync status data and hence needs a READ COMMITTED transaction isolation
* level for that.
*/
cmd = psprintf("SET transaction_isolation TO 'READ COMMITTED'");
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
/* Wait until data transfer finishes in its own transaction */
cmd = psprintf("CALL _timescaledb_internal.wait_subscription_sync(%s, %s)",
quote_literal_cstr(NameStr(cc->chunk->fd.schema_name)),
quote_literal_cstr(NameStr(cc->chunk->fd.table_name)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
}
static void
chunk_copy_stage_drop_subscription(ChunkCopy *cc)
{
const char *cmd;
char *cmd;
/* Stop data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
/* Disassociate the subscription from the replication slot first */
cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
/* Drop the subscription now */
cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
pfree(cmd);
}
static void
@ -473,7 +496,7 @@ chunk_copy_stage_drop_publication(ChunkCopy *cc)
const char *cmd;
cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), false);
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
static void
@ -556,7 +579,11 @@ chunk_copy_execute(ChunkCopy *cc)
{
const ChunkCopyStage *stage;
/* Execute each copy stage in a separate transaction */
/*
* Execute each copy stage in a separate transaction. The below will employ
* 2PC by default. This can be later optimized to use 1PC since only one
* datanode is involved in most of the stages.
*/
for (stage = &chunk_copy_stages[0]; stage->name != NULL; stage++)
{
StartTransactionCommand();