Add an API to invoke SQL on backend servers

This change adds a new function that will invoke the passed in command
on any specified backend servers (defaulting to all). The command
will be run asynchronously with transactional semantics (it either
succeeds on all targets or is rolled back).
This commit is contained in:
Brian Rowe 2019-01-24 19:21:23 -08:00 committed by Erik Nordström
parent 125f793307
commit db82c25d44
10 changed files with 505 additions and 14 deletions

View File

@ -3,6 +3,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/connection.c
${CMAKE_CURRENT_SOURCE_DIR}/connection_cache.c
${CMAKE_CURRENT_SOURCE_DIR}/dist_txn.c
${CMAKE_CURRENT_SOURCE_DIR}/dist_commands.c
${CMAKE_CURRENT_SOURCE_DIR}/txn.c
${CMAKE_CURRENT_SOURCE_DIR}/txn_store.c
${CMAKE_CURRENT_SOURCE_DIR}/txn_id.c

View File

@ -113,7 +113,7 @@ async_request_send_with_params_elevel(PGconn *conn, const char *sql, int n_param
}
AsyncRequest *
async_request_send_prepare(PGconn *conn, char *sql, int n_params)
async_request_send_prepare(PGconn *conn, const char *sql, int n_params)
{
AsyncRequest *req = palloc0(sizeof(AsyncRequest));
size_t stmt_name_len = NAMEDATALEN;
@ -379,22 +379,12 @@ PreparedStmt *
async_request_wait_prepared_statement(AsyncRequest *request)
{
AsyncResponseResult *result;
PreparedStmt *prep = palloc0(sizeof(PreparedStmt));
PreparedStmt *prep;
Assert(request->stmt_name != NULL);
result = async_request_wait_ok_result(request);
if (PQresultStatus(result->result) != PGRES_COMMAND_OK)
async_response_report_error(&result->base, ERROR);
*prep = (PreparedStmt){
.conn = result->request->conn,
.sql = result->request->sql,
.stmt_name = result->request->stmt_name,
.n_params = result->request->n_params,
};
prep = async_response_result_generate_prepared_stmt(result);
async_response_result_close(result);
return prep;
@ -620,3 +610,24 @@ prepared_stmt_close(PreparedStmt *stmt)
async_request_wait_ok_command(async_request_send(stmt->conn, sql));
}
/* Request must have been generated by async_request_send_prepare() */
PreparedStmt *
async_response_result_generate_prepared_stmt(AsyncResponseResult *result)
{
PreparedStmt *prep;
if (PQresultStatus(result->result) != PGRES_COMMAND_OK)
async_response_report_error(&result->base, ERROR);
prep = palloc0(sizeof(PreparedStmt));
*prep = (PreparedStmt){
.conn = result->request->conn,
.sql = result->request->sql,
.stmt_name = result->request->stmt_name,
.n_params = result->request->n_params,
};
return prep;
}

View File

@ -62,7 +62,8 @@ extern AsyncRequest *async_request_send_with_params_elevel(PGconn *conn, const c
async_request_send_with_params_elevel(conn, sql_statement, 0, NULL, elevel)
#define async_request_send(conn, sql_statement) \
async_request_send_with_error(conn, sql_statement, ERROR)
extern AsyncRequest *async_request_send_prepare(PGconn *conn, char *sql_statement, int n_params);
extern AsyncRequest *async_request_send_prepare(PGconn *conn, const char *sql_statement,
int n_params);
extern AsyncRequest *async_request_send_prepared_stmt(PreparedStmt *stmt,
const char *const *paramValues);
extern void async_request_attach_user_data(AsyncRequest *req, void *user_data);
@ -81,6 +82,7 @@ extern void async_response_result_close(AsyncResponseResult *res);
extern PGresult *async_response_result_get_pg_result(AsyncResponseResult *res);
extern void *async_response_result_get_user_data(AsyncResponseResult *res);
extern AsyncRequest *async_response_result_get_request(AsyncResponseResult *res);
extern PreparedStmt *async_response_result_generate_prepared_stmt(AsyncResponseResult *res);
/* Async Request Set */
extern AsyncRequestSet *async_request_set_create(void);

View File

@ -0,0 +1,208 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <libpq-fe.h>
#include "remote/dist_commands.h"
#include "remote/dist_txn.h"
#include "remote/connection_cache.h"
#include "server.h"
#include "miscadmin.h"
typedef struct DistPreparedStmt
{
const char *server_name;
PreparedStmt *prepared_stmt;
} DistPreparedStmt;
typedef struct DistCmdResponse
{
const char *server;
AsyncResponseResult *result;
} DistCmdResponse;
typedef struct DistCmdResult
{
Size num_responses;
DistCmdResponse responses[FLEXIBLE_ARRAY_MEMBER];
} DistCmdResult;
static PGconn *
ts_dist_cmd_get_connection_for_server(const char *server, RemoteTxnPrepStmtOption ps_opt)
{
ForeignServer *fs = GetForeignServerByName(server, false);
UserMapping *um = GetUserMapping(GetUserId(), fs->serverid);
return remote_dist_txn_get_connection(um, ps_opt);
}
static DistCmdResult *
ts_dist_cmd_collect_responses(List *requests)
{
AsyncRequestSet *rs = async_request_set_create();
AsyncResponseResult *ar;
ListCell *lc;
DistCmdResult *results =
palloc(sizeof(DistCmdResult) + requests->length * sizeof(DistCmdResponse));
int i = 0;
foreach (lc, requests)
async_request_set_add(rs, lfirst(lc));
while ((ar = async_request_set_wait_ok_result(rs)))
{
DistCmdResponse *response = &results->responses[i];
if (PQresultStatus(async_response_result_get_pg_result(ar)) != PGRES_COMMAND_OK)
elog(ERROR, "unexpected tuple recieved while expecting a command");
response->result = ar;
response->server = pstrdup(async_response_result_get_user_data(ar));
++i;
}
Assert(i == requests->length);
results->num_responses = i;
return results;
}
DistCmdResult *
ts_dist_cmd_invoke_on_servers(const char *sql, List *server_names)
{
ListCell *lc;
List *requests = NIL;
DistCmdResult *results;
if (server_names == NIL)
elog(ERROR, "target servers must be specified for ts_dist_cmd_invoke_on_servers");
foreach (lc, server_names)
{
const char *server_name = lfirst(lc);
PGconn *connection =
ts_dist_cmd_get_connection_for_server(server_name, REMOTE_TXN_NO_PREP_STMT);
AsyncRequest *req = async_request_send(connection, sql);
async_request_attach_user_data(req, (char *) server_name);
requests = lappend(requests, req);
}
results = ts_dist_cmd_collect_responses(requests);
list_free(requests);
return results;
}
DistCmdResult *
ts_dist_cmd_invoke_on_all_servers(const char *sql)
{
return ts_dist_cmd_invoke_on_servers(sql, server_get_servername_list());
}
PGresult *
ts_dist_cmd_get_server_result(DistCmdResult *response, const char *server_name)
{
int i;
for (i = 0; i < response->num_responses; ++i)
{
DistCmdResponse *resp = &response->responses[i];
if (strcmp(server_name, resp->server) == 0)
return async_response_result_get_pg_result(resp->result);
}
return NULL;
}
void
ts_dist_cmd_close_response(DistCmdResult *response)
{
int i;
for (i = 0; i < response->num_responses; ++i)
{
DistCmdResponse *resp = &response->responses[i];
async_response_result_close(resp->result);
pfree((char *) resp->server);
}
pfree(response);
}
extern PreparedDistCmd *
ts_dist_cmd_prepare_command(const char *sql, size_t n_params, List *server_names)
{
List *result = NIL;
ListCell *lc;
AsyncRequestSet *prep_requests = async_request_set_create();
AsyncResponseResult *async_resp;
if (server_names == NIL)
elog(ERROR, "target servers must be specified for ts_dist_cmd_prepare_command");
foreach (lc, server_names)
{
const char *name = lfirst(lc);
PGconn *connection = ts_dist_cmd_get_connection_for_server(name, REMOTE_TXN_USE_PREP_STMT);
DistPreparedStmt *cmd = palloc(sizeof(DistPreparedStmt));
AsyncRequest *ar = async_request_send_prepare(connection, sql, n_params);
cmd->server_name = pstrdup(name);
async_request_attach_user_data(ar, &cmd->prepared_stmt);
result = lappend(result, cmd);
async_request_set_add(prep_requests, ar);
}
while ((async_resp = async_request_set_wait_ok_result(prep_requests)))
{
*(PreparedStmt **) async_response_result_get_user_data(async_resp) =
async_response_result_generate_prepared_stmt(async_resp);
async_response_result_close(async_resp);
}
return result;
}
PreparedDistCmd *
ts_dist_cmd_prepare_command_on_all_servers(const char *sql, size_t n_params)
{
return ts_dist_cmd_prepare_command(sql, n_params, server_get_servername_list());
}
extern DistCmdResult *
ts_dist_cmd_invoke_prepared_command(PreparedDistCmd *command, const char *const *param_values)
{
List *reqs = NIL;
ListCell *lc;
DistCmdResult *results;
foreach (lc, command)
{
DistPreparedStmt *stmt = lfirst(lc);
AsyncRequest *req = async_request_send_prepared_stmt(stmt->prepared_stmt, param_values);
async_request_attach_user_data(req, (char *) stmt->server_name);
reqs = lappend(reqs, req);
}
results = ts_dist_cmd_collect_responses(reqs);
list_free(reqs);
return results;
}
void
ts_dist_cmd_close_prepared_command(PreparedDistCmd *command)
{
ListCell *lc;
foreach (lc, command)
prepared_stmt_close(((DistPreparedStmt *) lfirst(lc))->prepared_stmt);
list_free_deep(command);
}

View File

@ -0,0 +1,32 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_REMOTE_DIST_COMMANDS_H
#define TIMESCALEDB_TSL_REMOTE_DIST_COMMANDS_H
#include <catalog.h>
#include <libpq-fe.h>
typedef struct DistCmdResult DistCmdResult;
typedef struct List PreparedDistCmd;
extern DistCmdResult *ts_dist_cmd_invoke_on_servers(const char *sql, List *server_names);
extern DistCmdResult *ts_dist_cmd_invoke_on_all_servers(const char *sql);
extern PGresult *ts_dist_cmd_get_server_result(DistCmdResult *response, const char *server_name);
extern void ts_dist_cmd_close_response(DistCmdResult *response);
extern PreparedDistCmd *ts_dist_cmd_prepare_command(const char *sql, size_t n_params,
List *server_names);
extern PreparedDistCmd *ts_dist_cmd_prepare_command_on_all_servers(const char *sql,
size_t n_params);
extern DistCmdResult *ts_dist_cmd_invoke_prepared_command(PreparedDistCmd *command,
const char *const *param_values);
extern void ts_dist_cmd_close_prepared_command(PreparedDistCmd *command);
#endif

View File

@ -0,0 +1,120 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE DATABASE server1;
SELECT * FROM add_server('server1', 'localhost', 'server1', port=>current_setting('port')::integer);
server_name | host | port | database | username | server_username | created
-------------+-----------+-------+----------+------------+-----------------+---------
server1 | localhost | 15432 | server1 | super_user | super_user | t
(1 row)
CREATE DATABASE server2;
SELECT * FROM add_server('server2', 'localhost', 'server2', port=>current_setting('port')::integer);
server_name | host | port | database | username | server_username | created
-------------+-----------+-------+----------+------------+-----------------+---------
server2 | localhost | 15432 | server2 | super_user | super_user | t
(1 row)
CREATE DATABASE server3;
SELECT * FROM add_server('server3', 'localhost', 'server3', port=>current_setting('port')::integer);
server_name | host | port | database | username | server_username | created
-------------+-----------+-------+----------+------------+-----------------+---------
server3 | localhost | 15432 | server3 | super_user | super_user | t
(1 row)
\des+
List of foreign servers
Name | Owner | Foreign-data wrapper | Access privileges | Type | Version | FDW options | Description
---------+------------+----------------------+-------------------+------+---------+----------------------------------------------------+-------------
server1 | super_user | timescaledb_fdw | | | | (host 'localhost', port '15432', dbname 'server1') |
server2 | super_user | timescaledb_fdw | | | | (host 'localhost', port '15432', dbname 'server2') |
server3 | super_user | timescaledb_fdw | | | | (host 'localhost', port '15432', dbname 'server3') |
(3 rows)
CREATE FUNCTION _timescaledb_internal.invoke_distributed_commands()
RETURNS void
AS :TSL_MODULE_PATHNAME, 'tsl_invoke_distributed_commands'
LANGUAGE C STRICT;
CREATE FUNCTION _timescaledb_internal.invoke_faulty_distributed_command()
RETURNS void
AS :TSL_MODULE_PATHNAME, 'tsl_invoke_faulty_distributed_command'
LANGUAGE C STRICT;
SELECT _timescaledb_internal.invoke_distributed_commands();
INFO: server1 result: PGRES_COMMAND_OK
INFO: server2 result: PGRES_COMMAND_OK
INFO: server3 result: PGRES_COMMAND_OK
INFO: server1 result: PGRES_COMMAND_OK
INFO: server3 result: PGRES_COMMAND_OK
INFO: server1 result: PGRES_COMMAND_OK
INFO: server3 result: PGRES_COMMAND_OK
invoke_distributed_commands
-----------------------------
(1 row)
\c server1
\dt
List of relations
Schema | Name | Type | Owner
--------+------------+-------+------------
public | disttable1 | table | super_user
public | disttable2 | table | super_user
(2 rows)
SELECT * FROM disttable1;
time | device | temp
------------------------------+--------+-------
Sat Sep 18 00:00:00 1976 PDT | 47 | 103.4
(1 row)
\c server2
\dt
List of relations
Schema | Name | Type | Owner
--------+------------+-------+------------
public | disttable1 | table | super_user
(1 row)
SELECT * FROM disttable1;
time | device | temp
------+--------+------
(0 rows)
\c server3
\dt
List of relations
Schema | Name | Type | Owner
--------+------------+-------+------------
public | disttable1 | table | super_user
public | disttable2 | table | super_user
(2 rows)
SELECT * FROM disttable1;
time | device | temp
------------------------------+--------+-------
Sat Sep 18 00:00:00 1976 PDT | 47 | 103.4
(1 row)
\c single
-- Verify failed insert command gets fully rolled back
\set ON_ERROR_STOP 0
SELECT _timescaledb_internal.invoke_faulty_distributed_command();
ERROR: schema "_timescaledb_internal" does not exist at character 8
\set ON_ERROR_STOP 1
\c server1
SELECT * from disttable2;
time | device | temp
------+--------+------
(0 rows)
\c server3
SELECT * from disttable2;
time | device | temp
------+--------+------
(0 rows)
\c single
DROP DATABASE server1;
DROP DATABASE server2;
DROP DATABASE server3;

View File

@ -100,6 +100,7 @@ if (NOT ${PG_VERSION} VERSION_LESS "10")
timescaledb_fdw.sql
)
list(APPEND TEST_FILES_DEBUG
dist_commands.sql
remote_connection.sql
remote_connection_cache.sql
remote_txn.sql

View File

@ -0,0 +1,52 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
\c :TEST_DBNAME :ROLE_SUPERUSER
CREATE DATABASE server1;
SELECT * FROM add_server('server1', 'localhost', 'server1', port=>current_setting('port')::integer);
CREATE DATABASE server2;
SELECT * FROM add_server('server2', 'localhost', 'server2', port=>current_setting('port')::integer);
CREATE DATABASE server3;
SELECT * FROM add_server('server3', 'localhost', 'server3', port=>current_setting('port')::integer);
\des+
CREATE FUNCTION _timescaledb_internal.invoke_distributed_commands()
RETURNS void
AS :TSL_MODULE_PATHNAME, 'tsl_invoke_distributed_commands'
LANGUAGE C STRICT;
CREATE FUNCTION _timescaledb_internal.invoke_faulty_distributed_command()
RETURNS void
AS :TSL_MODULE_PATHNAME, 'tsl_invoke_faulty_distributed_command'
LANGUAGE C STRICT;
SELECT _timescaledb_internal.invoke_distributed_commands();
\c server1
\dt
SELECT * FROM disttable1;
\c server2
\dt
SELECT * FROM disttable1;
\c server3
\dt
SELECT * FROM disttable1;
\c single
-- Verify failed insert command gets fully rolled back
\set ON_ERROR_STOP 0
SELECT _timescaledb_internal.invoke_faulty_distributed_command();
\set ON_ERROR_STOP 1
\c server1
SELECT * from disttable2;
\c server3
SELECT * from disttable2;
\c single
DROP DATABASE server1;
DROP DATABASE server2;
DROP DATABASE server3;

View File

@ -5,6 +5,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/async.c
${CMAKE_CURRENT_SOURCE_DIR}/command_exec.c
${CMAKE_CURRENT_SOURCE_DIR}/txn_id.c
${CMAKE_CURRENT_SOURCE_DIR}/dist_commands.c
)
target_sources(${TSL_TESTS_LIB_NAME} PRIVATE ${SOURCES})

View File

@ -0,0 +1,63 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <fmgr.h>
#include "export.h"
#include "remote/dist_commands.h"
#include "test_utils.h"
TS_FUNCTION_INFO_V1(tsl_invoke_distributed_commands);
TS_FUNCTION_INFO_V1(tsl_invoke_faulty_distributed_command);
#define LOG_PG_STATUS(RESULT, TARGET) \
elog(INFO, \
"%s result: %s", \
TARGET, \
PQresStatus(PQresultStatus(ts_dist_cmd_get_server_result(RESULT, TARGET))));
Datum
tsl_invoke_distributed_commands(PG_FUNCTION_ARGS)
{
List *servers = list_make2("server1", "server3");
DistCmdResult *results;
PreparedDistCmd *prepped_cmd;
const char *test_args[3] = { "1976-09-18 00:00:00-07", "47", "103.4" };
results = ts_dist_cmd_invoke_on_all_servers(
"CREATE TABLE public.disttable1(time timestamptz, device int, temp float);");
LOG_PG_STATUS(results, "server1");
LOG_PG_STATUS(results, "server2");
LOG_PG_STATUS(results, "server3");
ts_dist_cmd_close_response(results);
results = ts_dist_cmd_invoke_on_servers("CREATE TABLE public.disttable2(time timestamptz, "
"device int, temp float);",
servers);
TestAssertTrue(ts_dist_cmd_get_server_result(results, "server2") == NULL);
LOG_PG_STATUS(results, "server1");
LOG_PG_STATUS(results, "server3");
ts_dist_cmd_close_response(results);
prepped_cmd = ts_dist_cmd_prepare_command("INSERT INTO public.disttable1 VALUES ($1, $2, $3)",
3,
servers);
results = ts_dist_cmd_invoke_prepared_command(prepped_cmd, test_args);
LOG_PG_STATUS(results, "server1");
LOG_PG_STATUS(results, "server3");
ts_dist_cmd_close_prepared_command(prepped_cmd);
PG_RETURN_VOID();
}
Datum
tsl_invoke_faulty_distributed_command(PG_FUNCTION_ARGS)
{
ts_dist_cmd_invoke_on_servers("INSERT INTO disttable2 VALUES (CURRENT_TIMESTAMP, 42, 72.5);",
NULL);
PG_RETURN_VOID();
}