Ensure superuser perms during copy/move chunk

There is a security loophole in current core Postgres, due to which
it's possible for a non-superuser to gain superuser access by attaching
dependencies like expression indexes, triggers, etc. before logical
replication commences.

To avoid this, we now ensure that the chunk objects that get created
for the subscription are done so as a superuser. This avoids malicious
dependencies by regular users.
This commit is contained in:
Nikhil Sontakke 2023-03-21 16:30:21 +05:30 committed by Nikhil
parent 38fcd1b76b
commit 7e43f45ccb
2 changed files with 58 additions and 0 deletions

View File

@ -23,6 +23,7 @@ accidentally triggering the load of a previous DB version.**
* #5428 Use consistent snapshots when scanning metadata
* #5442 Decompression may have lost DEFAULT values
* #5446 Add checks for malloc failure in libpq calls
* #5470 Ensure superuser perms during copy/move chunk
**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher

View File

@ -405,6 +405,41 @@ chunk_copy_stage_init_cleanup(ChunkCopy *cc)
chunk_copy_operation_delete_by_id(NameStr(cc->fd.operation_id));
}
/*
* Depending on "to_htowner" boolean change ownership of the chunk on the target node_name to
* "_timescaledb_internal" catalog owner or to the appropriate hypertable owner
*
* The "compressed" boolean specifies if we change ownership of the regular chunk or the
* corresponding compressed chunk.
*/
static void
chunk_copy_alter_chunk_owner(const ChunkCopy *cc, const char *node_name, const bool compressed,
const bool to_htowner)
{
Oid uid;
char *user_name;
char *alter_user_cmd;
if (to_htowner)
uid = ts_rel_get_owner(cc->chunk->hypertable_relid);
else
uid = ts_catalog_database_info_get()->owner_uid;
user_name = GetUserNameFromId(uid, false);
if (compressed)
alter_user_cmd = psprintf("ALTER TABLE %s OWNER TO %s",
quote_qualified_identifier(INTERNAL_SCHEMA_NAME,
NameStr(cc->fd.compressed_chunk_name)),
quote_identifier(user_name));
else
alter_user_cmd = psprintf("ALTER TABLE %s OWNER TO %s",
quote_qualified_identifier(NameStr(cc->chunk->fd.schema_name),
NameStr(cc->chunk->fd.table_name)),
quote_identifier(user_name));
ts_dist_cmd_run_on_data_nodes(alter_user_cmd, list_make1((void *) node_name), true);
}
static void
chunk_copy_stage_create_empty_chunk(ChunkCopy *cc)
{
@ -418,6 +453,13 @@ chunk_copy_stage_create_empty_chunk(ChunkCopy *cc)
chunk_api_call_create_empty_chunk_table(ht, cc->chunk, NameStr(cc->fd.dest_node_name));
/*
* Switch the empty chunk ownership to catalog owner for the lifetime of this operation.
* This entire copy/move operation is run as superuser (including the remote connection
* to the destination datanode), so the ALTER OWNER command will work without issues.
*/
chunk_copy_alter_chunk_owner(cc, NameStr(cc->fd.dest_node_name), false, false);
ts_cache_release(hcache);
}
@ -544,11 +586,19 @@ chunk_copy_create_dest_empty_compressed_chunk(ChunkCopy *cc)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_EXCEPTION), errmsg("%s", PQresultErrorMessage(res))));
ts_dist_cmd_close_response(dist_res);
/*
* Switch the empty compressed chunk ownership to catalog owner for the lifetime of this
* operation. This entire copy/move operation is run as superuser (including the remote
* connection to the destination datanode), so the ALTER OWNER command will work without issues.
*/
chunk_copy_alter_chunk_owner(cc, NameStr(cc->fd.dest_node_name), true, false);
}
static void
chunk_copy_stage_create_empty_compressed_chunk(ChunkCopy *cc)
{
DEBUG_WAITPOINT("chunk_copy_after_empty_chunk");
if (!ts_chunk_is_compressed(cc->chunk))
return;
@ -884,6 +934,9 @@ chunk_copy_stage_attach_chunk(ChunkCopy *cc)
/* Check that the hypertable is already attached to this data node */
data_node_hypertable_get_by_node_name(ht, cc->dst_server->servername, true);
/* Change ownership back to the proper relowner before attaching it finally */
chunk_copy_alter_chunk_owner(cc, NameStr(cc->fd.dest_node_name), false, true);
chunk_data_node = palloc0(sizeof(ChunkDataNode));
chunk_data_node->fd.chunk_id = chunk->fd.id;
@ -918,6 +971,10 @@ chunk_copy_stage_attach_compressed_chunk(ChunkCopy *cc)
if (!ts_chunk_is_compressed(cc->chunk))
return;
/* Change ownership of the compressed chunk back to the proper relowner before attaching it
* finally */
chunk_copy_alter_chunk_owner(cc, NameStr(cc->fd.dest_node_name), true, true);
chunk_name = psprintf("%s.%s",
quote_identifier(cc->chunk->fd.schema_name.data),
quote_identifier(cc->chunk->fd.table_name.data));