1
0
mirror of https://github.com/timescale/timescaledb.git synced 2025-05-20 20:54:29 +08:00

Support non-superuser move chunk operations

The non-superuser needs to have REPLICATION privileges atleast. A
new function "subscription_cmd" has been added to allow running
subscription related commands on datanodes. This function implicitly
upgrades to the bootstrapped superuser and then performs subscription
creation/alteration/deletion commands. It only accepts subscriptions
related commands and errors out otherwise.
This commit is contained in:
Nikhil Sontakke 2022-05-09 18:46:10 +05:30 committed by Nikhil
parent 4988dac273
commit ddd02922c9
12 changed files with 193 additions and 37 deletions

@ -35,6 +35,10 @@ CREATE OR REPLACE PROCEDURE timescaledb_experimental.copy_chunk(
operation_id NAME = NULL)
AS '@MODULE_PATHNAME@', 'ts_copy_chunk_proc' LANGUAGE C;
CREATE OR REPLACE FUNCTION timescaledb_experimental.subscription_exec(
subscription_command TEXT
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_subscription_exec' LANGUAGE C VOLATILE;
-- A copy_chunk or move_chunk procedure call involves multiple nodes and
-- depending on the data size can take a long time. Failures are possible
-- when this long running activity is ongoing. We need to be able to recover

@ -90,3 +90,7 @@ ALTER TABLE _timescaledb_catalog.continuous_agg
DROP PROCEDURE IF EXISTS timescaledb_experimental.move_chunk(REGCLASS, NAME, NAME);
DROP PROCEDURE IF EXISTS timescaledb_experimental.copy_chunk(REGCLASS, NAME, NAME);
CREATE OR REPLACE FUNCTION timescaledb_experimental.subscription_exec(
subscription_command TEXT
) RETURNS VOID AS '@MODULE_PATHNAME@', 'ts_subscription_exec' LANGUAGE C VOLATILE;

@ -105,3 +105,4 @@ ANALYZE _timescaledb_catalog.continuous_agg;
DROP PROCEDURE timescaledb_experimental.move_chunk(REGCLASS, NAME, NAME, NAME);
DROP PROCEDURE timescaledb_experimental.copy_chunk(REGCLASS, NAME, NAME, NAME);
DROP FUNCTION IF EXISTS timescaledb_experimental.subscription_exec(TEXT);

@ -47,6 +47,7 @@ CROSSMODULE_WRAPPER(move_chunk);
CROSSMODULE_WRAPPER(move_chunk_proc);
CROSSMODULE_WRAPPER(copy_chunk_proc);
CROSSMODULE_WRAPPER(copy_chunk_cleanup_proc);
CROSSMODULE_WRAPPER(subscription_exec);
/* partialize/finalize aggregate */
CROSSMODULE_WRAPPER(partialize_agg);
@ -374,6 +375,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.move_chunk_proc = error_no_default_fn_pg_community,
.copy_chunk_proc = error_no_default_fn_pg_community,
.copy_chunk_cleanup_proc = error_no_default_fn_pg_community,
.subscription_exec = error_no_default_fn_pg_community,
.reorder_chunk = error_no_default_fn_pg_community,
.partialize_agg = error_no_default_fn_pg_community,

@ -81,6 +81,7 @@ typedef struct CrossModuleFunctions
PGFunction move_chunk;
PGFunction move_chunk_proc;
PGFunction copy_chunk_proc;
PGFunction subscription_exec;
PGFunction copy_chunk_cleanup_proc;
void (*ddl_command_start)(ProcessUtilityArgs *args);
void (*ddl_command_end)(EventTriggerData *command);

@ -5,6 +5,7 @@
*/
#include <postgres.h>
#include <foreign/foreign.h>
#include <catalog/pg_authid.h>
#include <catalog/pg_foreign_server.h>
#include <catalog/pg_foreign_table.h>
#include <catalog/dependency.h>
@ -260,10 +261,11 @@ chunk_copy_setup(ChunkCopy *cc, Oid chunk_relid, const char *src_node, const cha
Cache *hcache;
MemoryContext old, mcxt;
if (!superuser())
if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to copy/move chunk to data node"))));
(errmsg(
"must be superuser or replication role to copy/move chunk to data node"))));
if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
ereport(ERROR,
@ -431,7 +433,7 @@ chunk_copy_stage_create_publication(ChunkCopy *cc)
/* Create publication on the source data node */
cmd = psprintf("CREATE PUBLICATION %s FOR TABLE %s",
NameStr(cc->fd.operation_id),
quote_identifier(NameStr(cc->fd.operation_id)),
quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name),
NameStr(cc->chunk->fd.table_name)));
@ -449,7 +451,7 @@ chunk_copy_stage_create_replication_slot(ChunkCopy *cc)
* create the replication slot separately before creating the subscription
*/
cmd = psprintf("SELECT pg_create_logical_replication_slot('%s', 'pgoutput')",
NameStr(cc->fd.operation_id));
quote_identifier(NameStr(cc->fd.operation_id)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
@ -463,7 +465,7 @@ chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc)
/* Check if the slot exists on the source data node */
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_replication_slots WHERE slot_name = '%s'",
NameStr(cc->fd.operation_id));
quote_identifier(NameStr(cc->fd.operation_id)));
dist_res =
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
@ -475,7 +477,8 @@ chunk_copy_stage_create_replication_slot_cleanup(ChunkCopy *cc)
/* Drop replication slot on the source data node only if it exists */
if (PQntuples(res) != 0)
{
cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
cmd = psprintf("SELECT pg_drop_replication_slot('%s')",
quote_identifier(NameStr(cc->fd.operation_id)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
@ -498,7 +501,7 @@ chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
/* Check if the publication exists on the source data node */
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_publication WHERE pubname = '%s'",
NameStr(cc->fd.operation_id));
quote_identifier(NameStr(cc->fd.operation_id)));
dist_res =
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.source_node_name));
@ -510,7 +513,7 @@ chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
/* Drop publication on the source node only if it exists */
if (PQntuples(res) != 0)
{
cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
cmd = psprintf("DROP PUBLICATION %s", quote_identifier(NameStr(cc->fd.operation_id)));
/* Drop the publication */
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
@ -519,10 +522,21 @@ chunk_copy_stage_create_publication_cleanup(ChunkCopy *cc)
ts_dist_cmd_close_response(dist_res);
}
/* Execute a logical SUBSCRIPTION related command on the data node */
static void
chunk_copy_exec_subscription_command(const char *command, List *data_nodes)
{
char *cmd;
cmd = psprintf("SELECT timescaledb_experimental.subscription_exec($sql$%s$sql$)", command);
ts_dist_cmd_run_on_data_nodes(cmd, data_nodes, true);
pfree(cmd);
}
static void
chunk_copy_stage_create_subscription(ChunkCopy *cc)
{
const char *cmd;
char *cmd;
const char *connection_string;
/* Prepare connection string to the source node */
@ -530,10 +544,11 @@ chunk_copy_stage_create_subscription(ChunkCopy *cc)
cmd = psprintf("CREATE SUBSCRIPTION %s CONNECTION '%s' PUBLICATION %s"
" WITH (create_slot = false, enabled = false)",
NameStr(cc->fd.operation_id),
quote_identifier(NameStr(cc->fd.operation_id)),
connection_string,
NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, list_make1(NameStr(cc->fd.dest_node_name)));
pfree(cmd);
}
static void
@ -545,7 +560,7 @@ chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
/* Check if the subscription exists on the destination data node */
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
NameStr(cc->fd.operation_id));
quote_identifier(NameStr(cc->fd.operation_id)));
dist_res =
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
@ -557,17 +572,24 @@ chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
/* Cleanup only if the subscription exists */
if (PQntuples(res) != 0)
{
List *nodes = list_make1(NameStr(cc->fd.dest_node_name));
List *dest_dn_list = list_make1(NameStr(cc->fd.dest_node_name));
/* Stop data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE",
quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
/* Disassociate the subscription from the replication slot first */
cmd =
psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
/* Drop the subscription now */
cmd = psprintf("DROP SUBSCRIPTION %s", quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, nodes, true);
}
ts_dist_cmd_close_response(dist_res);
@ -576,11 +598,12 @@ chunk_copy_stage_create_subscription_cleanup(ChunkCopy *cc)
static void
chunk_copy_stage_sync_start(ChunkCopy *cc)
{
const char *cmd;
char *cmd;
/* Start data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
cmd = psprintf("ALTER SUBSCRIPTION %s ENABLE", quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, list_make1(NameStr(cc->fd.dest_node_name)));
pfree(cmd);
}
static void
@ -592,7 +615,7 @@ chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc)
/* Check if the subscription exists on the destination data node */
cmd = psprintf("SELECT 1 FROM pg_catalog.pg_subscription WHERE subname = '%s'",
NameStr(cc->fd.operation_id));
quote_identifier(NameStr(cc->fd.operation_id)));
dist_res =
ts_dist_cmd_invoke_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
res = ts_dist_cmd_get_result_by_node_name(dist_res, NameStr(cc->fd.dest_node_name));
@ -605,7 +628,8 @@ chunk_copy_stage_sync_start_cleanup(ChunkCopy *cc)
if (PQntuples(res) != 0)
{
/* Stop data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE",
quote_identifier(NameStr(cc->fd.operation_id)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
}
@ -640,20 +664,22 @@ static void
chunk_copy_stage_drop_subscription(ChunkCopy *cc)
{
char *cmd;
List *dest_dn_list = list_make1(NameStr(cc->fd.dest_node_name));
/* Stop data transfer on the destination node */
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
cmd = psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
/* Disassociate the subscription from the replication slot first */
cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
cmd = psprintf("ALTER SUBSCRIPTION %s SET (slot_name = NONE)",
quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
/* Drop the subscription now */
cmd = psprintf("DROP SUBSCRIPTION %s", NameStr(cc->fd.operation_id));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.dest_node_name)), true);
cmd = psprintf("DROP SUBSCRIPTION %s", quote_identifier(NameStr(cc->fd.operation_id)));
chunk_copy_exec_subscription_command(cmd, dest_dn_list);
pfree(cmd);
}
@ -662,10 +688,11 @@ chunk_copy_stage_drop_publication(ChunkCopy *cc)
{
char *cmd;
cmd = psprintf("SELECT pg_drop_replication_slot('%s')", NameStr(cc->fd.operation_id));
cmd = psprintf("SELECT pg_drop_replication_slot('%s')",
quote_identifier(NameStr(cc->fd.operation_id)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
cmd = psprintf("DROP PUBLICATION %s", NameStr(cc->fd.operation_id));
cmd = psprintf("DROP PUBLICATION %s", quote_identifier(NameStr(cc->fd.operation_id)));
ts_dist_cmd_run_on_data_nodes(cmd, list_make1(NameStr(cc->fd.source_node_name)), true);
}
@ -948,10 +975,11 @@ chunk_copy_cleanup(const char *operation_id)
bool found = false;
int stage_idx;
if (!superuser())
if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser to cleanup a chunk copy operation"))));
(errmsg(
"must be superuser or replication role to cleanup a chunk copy operation"))));
if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
ereport(ERROR,

@ -127,6 +127,7 @@ CrossModuleFunctions tsl_cm_functions = {
.move_chunk_proc = tsl_move_chunk_proc,
.copy_chunk_proc = tsl_copy_chunk_proc,
.copy_chunk_cleanup_proc = tsl_copy_chunk_cleanup_proc,
.subscription_exec = tsl_subscription_exec,
/* Continuous Aggregates */
.partialize_agg = tsl_partialize_agg,

@ -21,6 +21,7 @@
#include <access/transam.h>
#include <access/xact.h>
#include <access/xlog.h>
#include <catalog/pg_authid.h>
#include <catalog/catalog.h>
#include <catalog/dependency.h>
#include <catalog/heap.h>
@ -40,7 +41,9 @@
#include <storage/lmgr.h>
#include <storage/predicate.h>
#include <storage/smgr.h>
#include <tcop/tcopprot.h>
#include <utils/acl.h>
#include <utils/builtins.h>
#include <utils/fmgroids.h>
#include <utils/guc.h>
#include <utils/inval.h>
@ -259,6 +262,80 @@ tsl_copy_chunk_proc(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
Datum
tsl_subscription_exec(PG_FUNCTION_ARGS)
{
Oid save_userid;
int save_sec_context;
const char *subscription_cmd = PG_ARGISNULL(0) ? NULL : text_to_cstring(PG_GETARG_TEXT_P(0));
int res;
List *parsetree_list;
ListCell *parsetree_item;
/*
* Subscription command needs a superuser
* so switch to that context. But first check that the passed in user has atleast
* REPLICATION privileges to justify the use of this function
*/
if (!superuser() && !has_rolreplication(GetUserId()))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("must be superuser or replication role to use this function"))));
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(BOOTSTRAP_SUPERUSERID, save_sec_context | SECURITY_LOCAL_USERID_CHANGE);
/*
* Parse the SQL string into a list of raw parse trees.
*/
parsetree_list = pg_parse_query(subscription_cmd);
/*
* Check that we have received a "SUBSCRIPTION" related command only. Anything else
* needs to error out
*/
foreach (parsetree_item, parsetree_list)
{
RawStmt *parsetree = lfirst_node(RawStmt, parsetree_item);
/* We are only interested in "CREATE/DROP SUBSCRIPTION" and "ALTER SUBSCRIPTION" stmts */
switch (nodeTag(parsetree->stmt))
{
case T_CreateSubscriptionStmt:
break;
case T_AlterSubscriptionStmt:
break;
case T_DropSubscriptionStmt:
break;
default:
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("this function only accepts SUBSCRIPTION commands")));
}
}
if (SPI_connect() != SPI_OK_CONNECT)
elog(ERROR, "could not connect to SPI");
res = SPI_execute(subscription_cmd, false /* read_only */, 0 /*count*/);
if (res < 0)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
(errmsg("error in subscription cmd \"%s\"", subscription_cmd))));
res = SPI_finish();
Assert(res == SPI_OK_FINISH);
/* Restore the earlier user */
SetUserIdAndSecContext(save_userid, save_sec_context);
PG_RETURN_VOID();
}
Datum
tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS)
{

@ -14,6 +14,7 @@ extern Datum tsl_move_chunk(PG_FUNCTION_ARGS);
extern Datum tsl_move_chunk_proc(PG_FUNCTION_ARGS);
extern Datum tsl_copy_chunk_proc(PG_FUNCTION_ARGS);
extern Datum tsl_copy_chunk_cleanup_proc(PG_FUNCTION_ARGS);
extern Datum tsl_subscription_exec(PG_FUNCTION_ARGS);
extern void reorder_chunk(Oid chunk_id, Oid index_id, bool verbose, Oid wait_id,
Oid destination_tablespace, Oid index_tablespace);

@ -133,7 +133,7 @@ ROLLBACK;
SET ROLE :ROLE_DEFAULT_PERM_USER;
\set ON_ERROR_STOP 0
CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'data_node_1', destination_node => 'data_node_2');
ERROR: must be superuser to copy/move chunk to data node
ERROR: must be superuser or replication role to copy/move chunk to data node
\set ON_ERROR_STOP 1
SET ROLE :ROLE_1;
-- can't run copy/move chunk on a data node

@ -209,6 +209,7 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
timescaledb_experimental.cleanup_copy_chunk_operation(name)
timescaledb_experimental.copy_chunk(regclass,name,name,name)
timescaledb_experimental.move_chunk(regclass,name,name,name)
timescaledb_experimental.subscription_exec(text)
timescaledb_experimental.time_bucket_ng(interval,date)
timescaledb_experimental.time_bucket_ng(interval,date,date)
timescaledb_experimental.time_bucket_ng(interval,timestamp with time zone)

@ -8,7 +8,7 @@ use warnings;
use AccessNode;
use DataNode;
use TestLib;
use Test::More tests => 272;
use Test::More tests => 274;
#Initialize all the multi-node instances
my $an = AccessNode->create('an');
@ -83,9 +83,45 @@ while ($curr_index < $arrSize)
$curr_index++;
}
for my $node ($an, $dn1, $dn2)
{
$node->safe_psql('postgres', "CREATE ROLE testrole LOGIN");
}
#Error out the move if user doesn't have superuser nor replication perms
($ret, $stdout, $stderr) = $an->psql('postgres',
"SET ROLE testrole; CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn1', destination_node => 'dn2')"
);
like(
$stderr,
qr/must be superuser or replication role to copy\/move chunk to data node/,
'Expected failure due to no credentials');
#Provide REPLICATION creds to this user now
for my $node ($an, $dn1, $dn2)
{
$node->safe_psql('postgres', "ALTER ROLE testrole REPLICATION;");
}
#Check that function errors out if any non SUBSCRIPTON command is passed to it
($ret, $stdout, $stderr) = $an->psql('postgres',
"SET ROLE testrole; SELECT timescaledb_experimental.subscription_exec('DROP ROLE testrole')"
);
like(
$stderr,
qr/this function only accepts SUBSCRIPTION commands/,
'Expected failure due to wrong command to function');
#Also grant it ownership of the table and related permissions
$an->safe_psql('postgres',
"ALTER TABLE test OWNER TO testrole; GRANT ALL PRIVILEGES ON DATABASE postgres TO testrole; GRANT USAGE, SELECT ON SEQUENCE _timescaledb_catalog.chunk_copy_operation_id_seq TO testrole; GRANT USAGE ON FOREIGN SERVER dn1, dn2 TO testrole;"
);
#Move chunk _timescaledb_internal._dist_hyper_1_1_chunk to DN2 from AN
$an->safe_psql('postgres',
"CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn1', destination_node => 'dn2')"
"SET ROLE testrole; CALL timescaledb_experimental.move_chunk(chunk=>'_timescaledb_internal._dist_hyper_1_1_chunk', source_node=> 'dn1', destination_node => 'dn2')"
);
#Query datanode1 after the above move