mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 02:53:51 +08:00
Update connection cache on role changes
The access node maintains a cache of connections to its data nodes. Each entry in the cache is a connection for a user and remote data node pair. Currently, a cache entry is invalidated when a foreign server object representing a data node is changed (e.g., the port could have been updated). The connection will remain in the cache for the duration of the current command, but will be remade with the updated parameters the next time it is fetched from the cache. This change invalidates a connection cache entry if the connection's role/user changes and drops an entry if the role is dropped. One reason for invalidating a connection is that a role rename invalidates the certificate the connection is using in case client certificate authentication is used. Thus, connections that have been authenticated with a certificate that is no longer valid will be remade. In some cases, this extra invalidation leads to purging connections when not strictly necessary. However, this is not a big problem in practice since role objects don't change often.
This commit is contained in:
parent
b449803878
commit
0098829cf5
@ -101,11 +101,20 @@ cache_invalidate_callback(Datum arg, Oid relid)
|
||||
}
|
||||
}
|
||||
|
||||
/* Registration for given cache ids happens at */
|
||||
/* Registration for given cache ids happens in non-TSL code when the extension
|
||||
* is created.
|
||||
*
|
||||
* Cache entries get invalidated when either the foreign server entry or the
|
||||
* role entry in the PostgreSQL catalog changes.
|
||||
*
|
||||
* When the foreign server entry changes, connection paramaters might have
|
||||
* changed. When the role entry changes, the certificate used for client
|
||||
* authentication with backend data nodes might no longer be valid.
|
||||
*/
|
||||
static void
|
||||
cache_invalidate_syscache_callback(Datum arg, int cacheid, uint32 hashvalue)
|
||||
{
|
||||
Assert(cacheid == FOREIGNSERVEROID);
|
||||
Assert(cacheid == FOREIGNSERVEROID || cacheid == AUTHOID);
|
||||
ts_cm_functions->cache_syscache_invalidate(arg, cacheid, hashvalue);
|
||||
}
|
||||
|
||||
@ -179,6 +188,9 @@ _cache_invalidate_init(void)
|
||||
CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
|
||||
cache_invalidate_syscache_callback,
|
||||
PointerGetDatum(NULL));
|
||||
CacheRegisterSyscacheCallback(AUTHOID,
|
||||
cache_invalidate_syscache_callback,
|
||||
PointerGetDatum(NULL));
|
||||
}
|
||||
|
||||
void
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/syscache.h>
|
||||
#include <utils/guc.h>
|
||||
#include <utils/acl.h>
|
||||
#include <fmgr.h>
|
||||
#include <funcapi.h>
|
||||
#include <miscadmin.h>
|
||||
@ -26,7 +27,8 @@ typedef struct ConnectionCacheEntry
|
||||
{
|
||||
TSConnectionId id;
|
||||
TSConnection *conn;
|
||||
int32 hashvalue; /* Hash of server OID for cache invalidation */
|
||||
int32 foreign_server_hashvalue; /* Hash of server OID for cache invalidation */
|
||||
int32 role_hashvalue; /* Hash of role OID for cache invalidation */
|
||||
bool invalidated;
|
||||
} ConnectionCacheEntry;
|
||||
|
||||
@ -154,9 +156,11 @@ connection_cache_create_entry(Cache *cache, CacheQuery *query)
|
||||
* at the end of the transaction */
|
||||
remote_connection_set_autoclose(entry->conn, false);
|
||||
|
||||
/* Set the hash value of the foreign server for cache invalidation
|
||||
* purposes */
|
||||
entry->hashvalue = GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(id->server_id));
|
||||
/* Set the hash values of the foreign server and role for cache
|
||||
* invalidation purposes */
|
||||
entry->foreign_server_hashvalue =
|
||||
GetSysCacheHashValue1(FOREIGNSERVEROID, ObjectIdGetDatum(id->server_id));
|
||||
entry->role_hashvalue = GetSysCacheHashValue1(AUTHOID, ObjectIdGetDatum(id->user_id));
|
||||
entry->invalidated = false;
|
||||
|
||||
return entry;
|
||||
@ -249,14 +253,14 @@ remote_connection_cache_invalidate_callback(Datum arg, int cacheid, uint32 hashv
|
||||
HASH_SEQ_STATUS scan;
|
||||
ConnectionCacheEntry *entry;
|
||||
|
||||
Assert(cacheid == FOREIGNSERVEROID);
|
||||
|
||||
Assert(cacheid == FOREIGNSERVEROID || cacheid == AUTHOID);
|
||||
hash_seq_init(&scan, connection_cache->htab);
|
||||
|
||||
while ((entry = hash_seq_search(&scan)) != NULL)
|
||||
{
|
||||
/* hashvalue == 0 means cache reset, so invalidate entire cache */
|
||||
if (hashvalue == 0 || entry->hashvalue == hashvalue)
|
||||
if (hashvalue == 0 || (cacheid == AUTHOID && hashvalue == entry->role_hashvalue) ||
|
||||
(cacheid == FOREIGNSERVEROID && entry->foreign_server_hashvalue == hashvalue))
|
||||
entry->invalidated = true;
|
||||
}
|
||||
}
|
||||
@ -324,6 +328,30 @@ remote_connection_cache_dropped_db_callback(const char *dbname)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Remove and close connections that belong to roles that are dropped.
|
||||
*
|
||||
* Immediately purging such connections should be safe since the DROP command
|
||||
* must be executed by different user than the one being dropped.
|
||||
*/
|
||||
void
|
||||
remote_connection_cache_dropped_role_callback(const char *rolename)
|
||||
{
|
||||
HASH_SEQ_STATUS scan;
|
||||
ConnectionCacheEntry *entry;
|
||||
Oid roleid = get_role_oid(rolename, true);
|
||||
|
||||
if (!OidIsValid(roleid))
|
||||
return;
|
||||
|
||||
hash_seq_init(&scan, connection_cache->htab);
|
||||
|
||||
while ((entry = hash_seq_search(&scan)) != NULL)
|
||||
{
|
||||
if (entry->id.user_id == roleid)
|
||||
remote_connection_cache_remove(entry->id);
|
||||
}
|
||||
}
|
||||
/*
|
||||
* Functions and data structures for printing the connection cache.
|
||||
*/
|
||||
@ -375,9 +403,15 @@ create_tuple_from_conn_entry(const ConnectionCacheEntry *entry, const TupleDesc
|
||||
bool nulls[Natts_show_conn] = { false };
|
||||
PGconn *pgconn = remote_connection_get_pg_conn(entry->conn);
|
||||
NameData conn_node_name, conn_user_name, conn_db;
|
||||
const char *username = GetUserNameFromId(entry->id.user_id, true);
|
||||
|
||||
namestrcpy(&conn_node_name, remote_connection_node_name(entry->conn));
|
||||
namestrcpy(&conn_user_name, GetUserNameFromId(entry->id.user_id, false));
|
||||
|
||||
if (NULL == username)
|
||||
pg_snprintf(conn_user_name.data, NAMEDATALEN, "%u", entry->id.user_id);
|
||||
else
|
||||
namestrcpy(&conn_user_name, username);
|
||||
|
||||
namestrcpy(&conn_db, PQdb(pgconn));
|
||||
|
||||
values[AttrNumberGetAttrOffset(Anum_show_conn_node_name)] = NameGetDatum(&conn_node_name);
|
||||
|
@ -24,6 +24,7 @@ extern bool remote_connection_cache_remove(TSConnectionId id);
|
||||
|
||||
extern void remote_connection_cache_invalidate_callback(Datum arg, int cacheid, uint32 hashvalue);
|
||||
extern void remote_connection_cache_dropped_db_callback(const char *dbname);
|
||||
extern void remote_connection_cache_dropped_role_callback(const char *rolename);
|
||||
extern Datum remote_connection_cache_show(PG_FUNCTION_ARGS);
|
||||
extern void _remote_connection_cache_init(void);
|
||||
extern void _remote_connection_cache_fini(void);
|
||||
|
@ -22,6 +22,7 @@
|
||||
#include "event_trigger.h"
|
||||
#include "remote/dist_commands.h"
|
||||
#include "remote/dist_ddl.h"
|
||||
#include "remote/connection_cache.h"
|
||||
#include "dist_util.h"
|
||||
|
||||
/* DDL Query execution type */
|
||||
@ -243,14 +244,32 @@ dist_ddl_preprocess(ProcessUtilityArgs *args)
|
||||
* For DROP TABLE and DROP SCHEMA operations hypertable_list will be
|
||||
* empty. Wait for sql_drop events.
|
||||
*/
|
||||
if (tag == T_DropStmt)
|
||||
switch (tag)
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, args->parsetree);
|
||||
case T_DropStmt:
|
||||
{
|
||||
DropStmt *stmt = castNode(DropStmt, args->parsetree);
|
||||
|
||||
if (stmt->removeType == OBJECT_TABLE || stmt->removeType == OBJECT_SCHEMA)
|
||||
set_dist_exec_type(DIST_DDL_EXEC_ON_END);
|
||||
if (stmt->removeType == OBJECT_TABLE || stmt->removeType == OBJECT_SCHEMA)
|
||||
set_dist_exec_type(DIST_DDL_EXEC_ON_END);
|
||||
|
||||
break;
|
||||
}
|
||||
case T_DropRoleStmt:
|
||||
{
|
||||
DropRoleStmt *stmt = castNode(DropRoleStmt, args->parsetree);
|
||||
ListCell *lc;
|
||||
|
||||
foreach (lc, stmt->roles)
|
||||
{
|
||||
RoleSpec *rolspec = lfirst(lc);
|
||||
remote_connection_cache_dropped_role_callback(rolspec->rolename);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -28,5 +28,46 @@ SELECT _timescaledb_internal.test_remote_connection_cache();
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Test that connection cache entries for a role gets invalidated when
|
||||
-- we rename the role
|
||||
GRANT USAGE ON FOREIGN SERVER loopback_1, loopback_2 TO :ROLE_1;
|
||||
SET ROLE :ROLE_1;
|
||||
CREATE TABLE testtable (time timestamptz, location int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('testtable', 'time', 'location');
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
hypertable_id | schema_name | table_name | created
|
||||
---------------+-------------+------------+---------
|
||||
1 | public | testtable | t
|
||||
(1 row)
|
||||
|
||||
INSERT INTO testtable VALUES ('2021-09-19', 1, 13.2);
|
||||
-- Should show valid connections for ROLE_1
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name=:'ROLE_1'
|
||||
ORDER BY 1,2;
|
||||
node_name | user_name | invalidated
|
||||
------------+-------------+-------------
|
||||
loopback_1 | test_role_1 | f
|
||||
loopback_2 | test_role_1 | f
|
||||
(2 rows)
|
||||
|
||||
RESET ROLE;
|
||||
BEGIN;
|
||||
-- Renaming the role should invalidate the connection cache entries
|
||||
-- for ROLE_1/bob. The connections will be recreated on next cache
|
||||
-- fetch.
|
||||
ALTER ROLE :ROLE_1 RENAME TO bob;
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name='bob'
|
||||
ORDER BY 1,2;
|
||||
node_name | user_name | invalidated
|
||||
------------+-----------+-------------
|
||||
loopback_1 | bob | t
|
||||
loopback_2 | bob | t
|
||||
(2 rows)
|
||||
|
||||
ROLLBACK;
|
||||
DROP DATABASE :DN_DBNAME_1;
|
||||
DROP DATABASE :DN_DBNAME_2;
|
||||
|
@ -30,5 +30,32 @@ $d$;
|
||||
|
||||
SELECT _timescaledb_internal.test_remote_connection_cache();
|
||||
|
||||
-- Test that connection cache entries for a role gets invalidated when
|
||||
-- we rename the role
|
||||
GRANT USAGE ON FOREIGN SERVER loopback_1, loopback_2 TO :ROLE_1;
|
||||
SET ROLE :ROLE_1;
|
||||
|
||||
CREATE TABLE testtable (time timestamptz, location int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('testtable', 'time', 'location');
|
||||
INSERT INTO testtable VALUES ('2021-09-19', 1, 13.2);
|
||||
|
||||
-- Should show valid connections for ROLE_1
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name=:'ROLE_1'
|
||||
ORDER BY 1,2;
|
||||
RESET ROLE;
|
||||
BEGIN;
|
||||
|
||||
-- Renaming the role should invalidate the connection cache entries
|
||||
-- for ROLE_1/bob. The connections will be recreated on next cache
|
||||
-- fetch.
|
||||
ALTER ROLE :ROLE_1 RENAME TO bob;
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name='bob'
|
||||
ORDER BY 1,2;
|
||||
ROLLBACK;
|
||||
|
||||
DROP DATABASE :DN_DBNAME_1;
|
||||
DROP DATABASE :DN_DBNAME_2;
|
||||
|
68
tsl/test/t/003_connections.pl
Normal file
68
tsl/test/t/003_connections.pl
Normal file
@ -0,0 +1,68 @@
|
||||
# This file and its contents are licensed under the Timescale License.
|
||||
# Please see the included NOTICE for copyright information and
|
||||
# LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
# test a simple multi node cluster creation and basic operations
|
||||
use strict;
|
||||
use warnings;
|
||||
use AccessNode;
|
||||
use DataNode;
|
||||
use TestLib;
|
||||
use Test::More tests => 1;
|
||||
|
||||
#Initialize all the multi-node instances
|
||||
my $an = AccessNode->create('an');
|
||||
my $dn1 = DataNode->create('dn1');
|
||||
my $dn2 = DataNode->create('dn2');
|
||||
|
||||
$an->add_data_node($dn1);
|
||||
$an->add_data_node($dn2);
|
||||
|
||||
$dn1->append_conf('postgresql.conf', 'log_connections=true');
|
||||
$dn2->append_conf('postgresql.conf', 'log_connections=true');
|
||||
|
||||
my $output = $an->safe_psql(
|
||||
'postgres',
|
||||
qq[
|
||||
CREATE ROLE alice;
|
||||
CALL distributed_exec('CREATE ROLE alice LOGIN');
|
||||
GRANT USAGE ON FOREIGN SERVER dn1,dn2 TO alice;
|
||||
SET ROLE alice;
|
||||
CREATE TABLE conditions (time timestamptz, location int, temp float);
|
||||
SELECT create_distributed_hypertable('conditions', 'time', 'location');
|
||||
INSERT INTO conditions VALUES ('2021-09-20 00:00:00+02', 1, 23.4);
|
||||
]);
|
||||
|
||||
|
||||
my ($cmdret, $stdout, $stderr) = $an->psql(
|
||||
'postgres',
|
||||
qq[
|
||||
SET ROLE alice;
|
||||
SELECT time AT TIME ZONE 'America/New_York', location, temp FROM conditions;
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name='alice';
|
||||
RESET ROLE;
|
||||
DROP TABLE conditions;
|
||||
REVOKE USAGE ON FOREIGN SERVER dn1,dn2 FROM alice;
|
||||
DROP ROLE ALICE;
|
||||
SELECT node_name, user_name, invalidated
|
||||
FROM _timescaledb_internal.show_connection_cache()
|
||||
WHERE user_name='alice';
|
||||
|
||||
]);
|
||||
|
||||
# Expected output:
|
||||
#
|
||||
# * First row is from the SELECT query that creates the connections
|
||||
# * for alice.
|
||||
#
|
||||
# * Second row is the output from the connection cache after SELECT
|
||||
# and prior to dropping alice. The entry has invalidated=false, so
|
||||
# the entry is still valid.
|
||||
#
|
||||
# * There is no row for the third connection cache query since the
|
||||
# * connections for alice have been purged.
|
||||
is( $stdout,
|
||||
q[2021-09-19 18:00:00|1|23.4
|
||||
dn1|alice|f]);
|
@ -1,4 +1,4 @@
|
||||
set(PROVE_TEST_FILES 001_simple_multinode.pl)
|
||||
set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections.pl)
|
||||
set(PROVE_DEBUG_TEST_FILES 002_chunk_copy_move.pl)
|
||||
|
||||
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
|
Loading…
x
Reference in New Issue
Block a user