Support non-transactional distibuted_exec

Before this commit, executions of `distributed_exec` was always
transactional and this could not be disabled. This cause problems when
executing statements that cannot be executed inside a transaction, such
as `CREATE DATABASE`.

With this commit, we introduce a parameter `transactional` to
`distributed_exec` that allow non-transactional executions on data
nodes. The parameter defaults to `TRUE`.  Generates an error if
`distributed_exec` is non-transactional and inside a transaction.

It also changes `distributed_exec` to be a procedure, which better
matches it's usage (it returns `VOID`).

Closes #1660
This commit is contained in:
Mats Kindahl 2020-01-28 11:22:33 +01:00 committed by Mats Kindahl
parent 967a10afcb
commit 85428bc383
17 changed files with 310 additions and 225 deletions

View File

@ -187,8 +187,11 @@ AS '@MODULE_PATHNAME@', 'ts_data_node_detach' LANGUAGE C VOLATILE;
-- Execute query on a specified list of data nodes. By default node_list is NULL, which means
-- to execute the query on every data node
CREATE OR REPLACE FUNCTION distributed_exec(query TEXT, node_list name[] = NULL) RETURNS VOID
AS '@MODULE_PATHNAME@', 'ts_distributed_exec' LANGUAGE C VOLATILE;
CREATE PROCEDURE distributed_exec(
query TEXT,
node_list name[] = NULL,
transactional BOOLEAN = TRUE)
AS '@MODULE_PATHNAME@', 'ts_distributed_exec' LANGUAGE C;
-- Sets new replication factor for distributed hypertable
CREATE OR REPLACE FUNCTION set_replication_factor(

View File

@ -1,2 +1,3 @@
DROP FUNCTION IF EXISTS detach_data_node(name,regclass,boolean,boolean);
DROP FUNCTION IF EXISTS distributed_exec;

View File

@ -194,6 +194,8 @@
#endif
#define FC_FN_OID(fcinfo) ((fcinfo)->flinfo->fn_oid)
/* convenience setters */
#define FC_SET_ARG(fcinfo, n, val) \
do \

View File

@ -9,6 +9,8 @@
#include <catalog/namespace.h>
#include <funcapi.h>
#include <libpq-fe.h>
#include <utils/lsyscache.h>
#include <utils/syscache.h>
#include "dist_commands.h"
#include "dist_txn.h"
@ -432,13 +434,16 @@ ts_dist_cmd_exec(PG_FUNCTION_ARGS)
{
const char *query = PG_ARGISNULL(0) ? NULL : TextDatumGetCString(PG_GETARG_DATUM(0));
ArrayType *data_nodes = PG_ARGISNULL(1) ? NULL : PG_GETARG_ARRAYTYPE_P(1);
bool transactional = PG_ARGISNULL(2) ? true : PG_GETARG_BOOL(2);
DistCmdResult *result;
List *data_node_list;
const char *search_path;
if (!transactional)
PreventInTransactionBlock(true, get_func_name(FC_FN_OID(fcinfo)));
if (NULL == query)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid command string")));
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("empty command string")));
if (dist_util_membership() != DIST_MEMBER_ACCESS_NODE)
ereport(ERROR,
@ -454,8 +459,7 @@ ts_dist_cmd_exec(PG_FUNCTION_ARGS)
result = ts_dist_cmd_invoke_on_data_nodes_using_search_path(query,
search_path,
data_node_list,
true);
transactional);
if (result)
ts_dist_cmd_close_response(result);

View File

@ -254,12 +254,7 @@ INSERT INTO disttable VALUES ('2018-01-01 05:00:00-8', 1, 23.4, 'green'),
('2018-01-01 06:00:00-8', 4, 22.3, NULL),
('2018-01-01 06:00:00-8', 1, 21.1, 'green');
-- Make sure we get deterministic behavior across all nodes
SELECT distributed_exec($$ SELECT setseed(1); $$);
distributed_exec
------------------
(1 row)
CALL distributed_exec($$ SELECT setseed(1); $$);
-- No stats on the local table
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
@ -292,12 +287,7 @@ ORDER BY 1,2,3;
(0 rows)
-- Run ANALYZE on data node 1
SELECT * FROM distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
distributed_exec
------------------
(1 row)
CALL distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
-- Stats should now be refreshed after running get_chunk_{col,rel}stats
SELECT relname, reltuples, relpages, relallvisible FROM pg_class WHERE relname IN
(SELECT (_timescaledb_internal.show_chunk(show_chunks)).table_name

View File

@ -254,12 +254,7 @@ INSERT INTO disttable VALUES ('2018-01-01 05:00:00-8', 1, 23.4, 'green'),
('2018-01-01 06:00:00-8', 4, 22.3, NULL),
('2018-01-01 06:00:00-8', 1, 21.1, 'green');
-- Make sure we get deterministic behavior across all nodes
SELECT distributed_exec($$ SELECT setseed(1); $$);
distributed_exec
------------------
(1 row)
CALL distributed_exec($$ SELECT setseed(1); $$);
-- No stats on the local table
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
@ -292,12 +287,7 @@ ORDER BY 1,2,3;
(0 rows)
-- Run ANALYZE on data node 1
SELECT * FROM distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
distributed_exec
------------------
(1 row)
CALL distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
-- Stats should now be refreshed after running get_chunk_{col,rel}stats
SELECT relname, reltuples, relpages, relallvisible FROM pg_class WHERE relname IN
(SELECT (_timescaledb_internal.show_chunk(show_chunks)).table_name

View File

@ -219,25 +219,10 @@ t
-- Test distributed_exec()
-- Make sure dist session is properly set
SELECT * FROM distributed_exec('DO $$ BEGIN ASSERT(SELECT is_frontend_session()) = true; END; $$;');
distributed_exec
------------------
(1 row)
CALL distributed_exec('DO $$ BEGIN ASSERT(SELECT is_frontend_session()) = true; END; $$;');
-- Test creating and dropping a table
SELECT * FROM distributed_exec('CREATE TABLE dist_test (id int)');
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec('INSERT INTO dist_test values (7)');
distributed_exec
------------------
(1 row)
CALL distributed_exec('CREATE TABLE dist_test (id int)');
CALL distributed_exec('INSERT INTO dist_test values (7)');
SELECT * FROM test.remote_exec(NULL, $$ SELECT * from dist_test; $$);
NOTICE: [data_node_1]: SELECT * from dist_test
NOTICE: [data_node_1]:
@ -268,14 +253,9 @@ id
(1 row)
SELECT * FROM distributed_exec('DROP TABLE dist_test');
distributed_exec
------------------
(1 row)
CALL distributed_exec('DROP TABLE dist_test');
\set ON_ERROR_STOP 0
SELECT * FROM distributed_exec('INSERT INTO dist_test VALUES (8)', '{data_node_1}');
CALL distributed_exec('INSERT INTO dist_test VALUES (8)', '{data_node_1}');
ERROR: [data_node_1]: relation "dist_test" does not exist
\set ON_ERROR_STOP 1
-- Test creating and dropping a role
@ -283,7 +263,7 @@ CREATE ROLE dist_test_role;
-- Expect this to be an error, since data nodes are created on the same instance
\set ON_ERROR_STOP 0
SELECT test.execute_sql_and_filter_data_node_name_on_error($$
SELECT * FROM distributed_exec('CREATE ROLE dist_test_role');
CALL distributed_exec('CREATE ROLE dist_test_role');
$$);
ERROR: [data_node_x]: role "dist_test_role" already exists
\set ON_ERROR_STOP 1
@ -320,17 +300,35 @@ t
DROP ROLE DIST_TEST_ROLE;
\set ON_ERROR_STOP 0
SELECT test.execute_sql_and_filter_data_node_name_on_error($$
SELECT * FROM distributed_exec('DROP ROLE dist_test_role');
CALL distributed_exec('DROP ROLE dist_test_role');
$$);
ERROR: [data_node_x]: role "dist_test_role" does not exist
\set ON_ERROR_STOP 1
-- Do not allow to run distributed_exec() on a data nodes
\c data_node_1
\set ON_ERROR_STOP 0
SELECT * FROM distributed_exec('SELECT 1');
CALL distributed_exec('SELECT 1');
ERROR: function must be run on the access node only
\set ON_ERROR_STOP 1
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT * FROM delete_data_node('data_node_1');
delete_data_node
------------------
t
(1 row)
SELECT * FROM delete_data_node('data_node_2');
delete_data_node
------------------
t
(1 row)
SELECT * FROM delete_data_node('data_node_3');
delete_data_node
------------------
t
(1 row)
DROP DATABASE data_node_1;
DROP DATABASE data_node_2;
DROP DATABASE data_node_3;
@ -344,3 +342,143 @@ ERROR: data node "myserver" is not a TimescaleDB server
\set ON_ERROR_STOP 1
DROP SERVER myserver;
DROP EXTENSION postgres_fdw;
-- Test that transactional behaviour is the default and that it can be
-- disabled.
--
-- In this case, we only execute it on one data node since we are
-- creating a database and multiple creations of the database would
-- clash when executed on the same instace.
--
-- We prefix the database names with the test file to be able to
-- parallelize the test. Not possible right now because there are
-- other databases above that prevents this.
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT * FROM add_data_node('dist_commands_1', host => 'localhost',
database => 'dist_commands_1');
node_name | host | port | database | node_created | database_created | extension_created
-----------------+-----------+-------+-----------------+--------------+------------------+-------------------
dist_commands_1 | localhost | 55432 | dist_commands_1 | t | t | t
(1 row)
SELECT * FROM add_data_node('dist_commands_2', host => 'localhost',
database => 'dist_commands_2');
node_name | host | port | database | node_created | database_created | extension_created
-----------------+-----------+-------+-----------------+--------------+------------------+-------------------
dist_commands_2 | localhost | 55432 | dist_commands_2 | t | t | t
(1 row)
GRANT USAGE ON FOREIGN SERVER dist_commands_1, dist_commands_2 TO PUBLIC;
\set ON_ERROR_STOP 0
CALL distributed_exec('CREATE DATABASE dist_commands_magic',
node_list => '{dist_commands_1}');
ERROR: [dist_commands_1]: CREATE DATABASE cannot run inside a transaction block
\set ON_ERROR_STOP 1
CALL distributed_exec('CREATE DATABASE dist_commands_magic',
node_list => '{dist_commands_1}', transactional => FALSE);
DROP DATABASE dist_commands_magic;
-- Test that distributed_exec honor the 2PC behaviour when starting a
-- transaction locally. It should also give an error if attempting to
-- execute non-transactionally inside a local transaction.
-- To test that distributed_exec honors transactions, we create a
-- table on both data nodes, and then tweak one of the tables so that
-- we get a duplicate key when updating the table on both data
-- nodes. This should then abort the transaction on all data nodes.
\c :TEST_DBNAME :ROLE_1
CALL distributed_exec($$
CREATE TABLE my_table (key INT, value TEXT, PRIMARY KEY (key));
$$);
\c dist_commands_1
INSERT INTO my_table VALUES (1, 'foo');
\c :TEST_DBNAME :ROLE_1
\set ON_ERROR_STOP 0
BEGIN;
CALL distributed_exec($$ INSERT INTO my_table VALUES (1, 'bar') $$);
ERROR: [dist_commands_1]: duplicate key value violates unique constraint "my_table_pkey"
COMMIT;
\set ON_ERROR_STOP 1
-- No changes should be there
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);
NOTICE: [dist_commands_1]: SELECT * FROM my_table
NOTICE: [dist_commands_1]:
key|value
---+-----
1|foo
(1 row)
NOTICE: [dist_commands_2]: SELECT * FROM my_table
NOTICE: [dist_commands_2]:
key|value
---+-----
(0 rows)
remote_exec
-------------
(1 row)
-- This should work.
BEGIN;
CALL distributed_exec($$ INSERT INTO my_table VALUES (2, 'bar'); $$);
COMMIT;
-- We should see changes
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);
NOTICE: [dist_commands_1]: SELECT * FROM my_table
NOTICE: [dist_commands_1]:
key|value
---+-----
1|foo
2|bar
(2 rows)
NOTICE: [dist_commands_2]: SELECT * FROM my_table
NOTICE: [dist_commands_2]:
key|value
---+-----
2|bar
(1 row)
remote_exec
-------------
(1 row)
-- This should fail since we are inside a transaction and asking for
-- transactional execution on the remote nodes. Non-transactional
-- execution should be outside transactions.
\set ON_ERROR_STOP 0
BEGIN;
CALL distributed_exec(
$$ INSERT INTO my_table VALUES (3, 'baz') $$,
transactional => FALSE
);
ERROR: distributed_exec cannot run inside a transaction block
COMMIT;
\set ON_ERROR_STOP 1
-- We should see no changes
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);
NOTICE: [dist_commands_1]: SELECT * FROM my_table
NOTICE: [dist_commands_1]:
key|value
---+-----
1|foo
2|bar
(2 rows)
NOTICE: [dist_commands_2]: SELECT * FROM my_table
NOTICE: [dist_commands_2]:
key|value
---+-----
2|bar
(1 row)
remote_exec
-------------
(1 row)

View File

@ -36,18 +36,8 @@ SELECT * FROM add_data_node('data_node_3', host => 'localhost',
GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC;
-- Presence of non-distributed hypertables on data nodes should not cause issues
SELECT distributed_exec('CREATE TABLE local(time timestamptz, measure int)', '{ "data_node_1", "data_node_3" }');
distributed_exec
------------------
(1 row)
SELECT distributed_exec($$ SELECT create_hypertable('local', 'time') $$, '{ "data_node_1", "data_node_3" }');
distributed_exec
------------------
(1 row)
CALL distributed_exec('CREATE TABLE local(time timestamptz, measure int)', '{ "data_node_1", "data_node_3" }');
CALL distributed_exec($$ SELECT create_hypertable('local', 'time') $$, '{ "data_node_1", "data_node_3" }');
-- Import testsupport.sql file to data nodes
\unset ECHO
-- This SCHEMA will not be created on data nodes

View File

@ -3048,14 +3048,9 @@ SELECT * FROM set_number_partitions('disttable', 3);
(1 row)
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT';
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT'
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM set_integer_now_func('disttable', 'dummy_now');
set_integer_now_func
----------------------
@ -3131,14 +3126,9 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
INSERT INTO disttable2 VALUES ('2017-01-01 06:01', 1, 1.1);
SELECT * FROM show_chunks('disttable2');
show_chunks
@ -3173,38 +3163,18 @@ SELECT * FROM show_tablespaces('disttable2');
(0 rows)
-- Ensure tablespace API works for data nodes
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
DROP TABLE disttable2;
CREATE TABLE disttable2(time timestamptz, device int, temp float) TABLESPACE tablespace1;
SELECT create_hypertable('disttable2', 'time', chunk_time_interval => 1000000::bigint, replication_factor => 1);
@ -3215,14 +3185,9 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
INSERT INTO disttable2 VALUES ('2017-01-01 06:01', 1, 1.1);
SELECT * FROM show_chunks('disttable2');
show_chunks
@ -3376,25 +3341,15 @@ CREATE TABLE devices (
device_id INTEGER PRIMARY KEY,
device_name VARCHAR(10)
);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE TABLE devices(device_id INTEGER PRIMARY KEY, device_name VARCHAR(10))
$$);
distributed_exec
------------------
(1 row)
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765');
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765')
$$);
distributed_exec
------------------
(1 row)
CREATE TABLE hyper (
time TIMESTAMPTZ NOT NULL,
device INTEGER REFERENCES devices(device_id),
@ -3710,14 +3665,9 @@ SELECT * FROM set_replication_factor('hyper', replication_factor => 4);
ERROR: too big replication factor for hypertable "hyper"
\set ON_ERROR_STOP 1
DROP TABLE hyper;
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
DROP TABLE devices;
$$);
distributed_exec
------------------
(1 row)
DROP TABLE devices;
-- Test storage options are distributed to data nodes
--

View File

@ -3029,14 +3029,9 @@ SELECT * FROM set_number_partitions('disttable', 3);
(1 row)
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT';
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT'
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM set_integer_now_func('disttable', 'dummy_now');
set_integer_now_func
----------------------
@ -3112,14 +3107,9 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
INSERT INTO disttable2 VALUES ('2017-01-01 06:01', 1, 1.1);
SELECT * FROM show_chunks('disttable2');
show_chunks
@ -3154,38 +3144,18 @@ SELECT * FROM show_tablespaces('disttable2');
(0 rows)
-- Ensure tablespace API works for data nodes
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
distributed_exec
------------------
(1 row)
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
DROP TABLE disttable2;
CREATE TABLE disttable2(time timestamptz, device int, temp float) TABLESPACE tablespace1;
SELECT create_hypertable('disttable2', 'time', chunk_time_interval => 1000000::bigint, replication_factor => 1);
@ -3196,14 +3166,9 @@ NOTICE: adding not-null constraint to column "time"
(1 row)
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
distributed_exec
------------------
(1 row)
INSERT INTO disttable2 VALUES ('2017-01-01 06:01', 1, 1.1);
SELECT * FROM show_chunks('disttable2');
show_chunks
@ -3356,25 +3321,15 @@ CREATE TABLE devices (
device_id INTEGER PRIMARY KEY,
device_name VARCHAR(10)
);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE TABLE devices(device_id INTEGER PRIMARY KEY, device_name VARCHAR(10))
$$);
distributed_exec
------------------
(1 row)
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765');
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765')
$$);
distributed_exec
------------------
(1 row)
CREATE TABLE hyper (
time TIMESTAMPTZ NOT NULL,
device INTEGER REFERENCES devices(device_id),
@ -3690,14 +3645,9 @@ SELECT * FROM set_replication_factor('hyper', replication_factor => 4);
ERROR: too big replication factor for hypertable "hyper"
\set ON_ERROR_STOP 1
DROP TABLE hyper;
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
DROP TABLE devices;
$$);
distributed_exec
------------------
(1 row)
DROP TABLE devices;
-- Test storage options are distributed to data nodes
--

View File

@ -145,12 +145,7 @@ CREATE AGGREGATE custom_sum(int4) (
STYPE = int8
);
-- Set seed on all data nodes for ANALYZE to sample consistently
SELECT distributed_exec($$ SELECT setseed(1); $$);
distributed_exec
------------------
(1 row)
CALL distributed_exec($$ SELECT setseed(1); $$);
ANALYZE reference;
ANALYZE hyper;
ANALYZE hyper1d;

View File

@ -145,12 +145,7 @@ CREATE AGGREGATE custom_sum(int4) (
STYPE = int8
);
-- Set seed on all data nodes for ANALYZE to sample consistently
SELECT distributed_exec($$ SELECT setseed(1); $$);
distributed_exec
------------------
(1 row)
CALL distributed_exec($$ SELECT setseed(1); $$);
ANALYZE reference;
ANALYZE hyper;
ANALYZE hyper1d;

View File

@ -134,7 +134,7 @@ INSERT INTO disttable VALUES ('2018-01-01 05:00:00-8', 1, 23.4, 'green'),
('2018-01-01 06:00:00-8', 1, 21.1, 'green');
-- Make sure we get deterministic behavior across all nodes
SELECT distributed_exec($$ SELECT setseed(1); $$);
CALL distributed_exec($$ SELECT setseed(1); $$);
-- No stats on the local table
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
@ -150,7 +150,7 @@ SELECT * FROM pg_stats WHERE tablename IN
ORDER BY 1,2,3;
-- Run ANALYZE on data node 1
SELECT * FROM distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
CALL distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
-- Stats should now be refreshed after running get_chunk_{col,rel}stats
SELECT relname, reltuples, relpages, relallvisible FROM pg_class WHERE relname IN

View File

@ -114,15 +114,15 @@ SELECT * FROM test.remote_exec(NULL, $$ SELECT is_frontend_session(); $$);
-- Test distributed_exec()
-- Make sure dist session is properly set
SELECT * FROM distributed_exec('DO $$ BEGIN ASSERT(SELECT is_frontend_session()) = true; END; $$;');
CALL distributed_exec('DO $$ BEGIN ASSERT(SELECT is_frontend_session()) = true; END; $$;');
-- Test creating and dropping a table
SELECT * FROM distributed_exec('CREATE TABLE dist_test (id int)');
SELECT * FROM distributed_exec('INSERT INTO dist_test values (7)');
CALL distributed_exec('CREATE TABLE dist_test (id int)');
CALL distributed_exec('INSERT INTO dist_test values (7)');
SELECT * FROM test.remote_exec(NULL, $$ SELECT * from dist_test; $$);
SELECT * FROM distributed_exec('DROP TABLE dist_test');
CALL distributed_exec('DROP TABLE dist_test');
\set ON_ERROR_STOP 0
SELECT * FROM distributed_exec('INSERT INTO dist_test VALUES (8)', '{data_node_1}');
CALL distributed_exec('INSERT INTO dist_test VALUES (8)', '{data_node_1}');
\set ON_ERROR_STOP 1
-- Test creating and dropping a role
@ -130,24 +130,27 @@ CREATE ROLE dist_test_role;
-- Expect this to be an error, since data nodes are created on the same instance
\set ON_ERROR_STOP 0
SELECT test.execute_sql_and_filter_data_node_name_on_error($$
SELECT * FROM distributed_exec('CREATE ROLE dist_test_role');
CALL distributed_exec('CREATE ROLE dist_test_role');
$$);
\set ON_ERROR_STOP 1
SELECT * FROM test.remote_exec(NULL, $$ SELECT true from pg_catalog.pg_roles WHERE rolname = 'dist_test_role'; $$);
DROP ROLE DIST_TEST_ROLE;
\set ON_ERROR_STOP 0
SELECT test.execute_sql_and_filter_data_node_name_on_error($$
SELECT * FROM distributed_exec('DROP ROLE dist_test_role');
CALL distributed_exec('DROP ROLE dist_test_role');
$$);
\set ON_ERROR_STOP 1
-- Do not allow to run distributed_exec() on a data nodes
\c data_node_1
\set ON_ERROR_STOP 0
SELECT * FROM distributed_exec('SELECT 1');
CALL distributed_exec('SELECT 1');
\set ON_ERROR_STOP 1
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT * FROM delete_data_node('data_node_1');
SELECT * FROM delete_data_node('data_node_2');
SELECT * FROM delete_data_node('data_node_3');
DROP DATABASE data_node_1;
DROP DATABASE data_node_2;
DROP DATABASE data_node_3;
@ -161,3 +164,77 @@ SELECT * FROM test.remote_exec('{myserver}', $$ SELECT 1; $$);
\set ON_ERROR_STOP 1
DROP SERVER myserver;
DROP EXTENSION postgres_fdw;
-- Test that transactional behaviour is the default and that it can be
-- disabled.
--
-- In this case, we only execute it on one data node since we are
-- creating a database and multiple creations of the database would
-- clash when executed on the same instace.
--
-- We prefix the database names with the test file to be able to
-- parallelize the test. Not possible right now because there are
-- other databases above that prevents this.
\c :TEST_DBNAME :ROLE_SUPERUSER
SELECT * FROM add_data_node('dist_commands_1', host => 'localhost',
database => 'dist_commands_1');
SELECT * FROM add_data_node('dist_commands_2', host => 'localhost',
database => 'dist_commands_2');
GRANT USAGE ON FOREIGN SERVER dist_commands_1, dist_commands_2 TO PUBLIC;
\set ON_ERROR_STOP 0
CALL distributed_exec('CREATE DATABASE dist_commands_magic',
node_list => '{dist_commands_1}');
\set ON_ERROR_STOP 1
CALL distributed_exec('CREATE DATABASE dist_commands_magic',
node_list => '{dist_commands_1}', transactional => FALSE);
DROP DATABASE dist_commands_magic;
-- Test that distributed_exec honor the 2PC behaviour when starting a
-- transaction locally. It should also give an error if attempting to
-- execute non-transactionally inside a local transaction.
-- To test that distributed_exec honors transactions, we create a
-- table on both data nodes, and then tweak one of the tables so that
-- we get a duplicate key when updating the table on both data
-- nodes. This should then abort the transaction on all data nodes.
\c :TEST_DBNAME :ROLE_1
CALL distributed_exec($$
CREATE TABLE my_table (key INT, value TEXT, PRIMARY KEY (key));
$$);
\c dist_commands_1
INSERT INTO my_table VALUES (1, 'foo');
\c :TEST_DBNAME :ROLE_1
\set ON_ERROR_STOP 0
BEGIN;
CALL distributed_exec($$ INSERT INTO my_table VALUES (1, 'bar') $$);
COMMIT;
\set ON_ERROR_STOP 1
-- No changes should be there
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);
-- This should work.
BEGIN;
CALL distributed_exec($$ INSERT INTO my_table VALUES (2, 'bar'); $$);
COMMIT;
-- We should see changes
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);
-- This should fail since we are inside a transaction and asking for
-- transactional execution on the remote nodes. Non-transactional
-- execution should be outside transactions.
\set ON_ERROR_STOP 0
BEGIN;
CALL distributed_exec(
$$ INSERT INTO my_table VALUES (3, 'baz') $$,
transactional => FALSE
);
COMMIT;
\set ON_ERROR_STOP 1
-- We should see no changes
SELECT * FROM test.remote_exec(NULL, $$ SELECT * FROM my_table; $$);

View File

@ -29,8 +29,8 @@ SELECT * FROM add_data_node('data_node_3', host => 'localhost',
GRANT USAGE ON FOREIGN SERVER data_node_1, data_node_2, data_node_3 TO PUBLIC;
-- Presence of non-distributed hypertables on data nodes should not cause issues
SELECT distributed_exec('CREATE TABLE local(time timestamptz, measure int)', '{ "data_node_1", "data_node_3" }');
SELECT distributed_exec($$ SELECT create_hypertable('local', 'time') $$, '{ "data_node_1", "data_node_3" }');
CALL distributed_exec('CREATE TABLE local(time timestamptz, measure int)', '{ "data_node_1", "data_node_3" }');
CALL distributed_exec($$ SELECT create_hypertable('local', 'time') $$, '{ "data_node_1", "data_node_3" }');
-- Import testsupport.sql file to data nodes
\unset ECHO

View File

@ -879,7 +879,7 @@ SELECT * FROM set_chunk_time_interval('disttable', 2000000000::bigint);
SELECT * FROM set_number_partitions('disttable', 3);
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT';
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE OR REPLACE FUNCTION dummy_now() RETURNS BIGINT LANGUAGE SQL IMMUTABLE as 'SELECT 2::BIGINT'
$$);
@ -906,7 +906,7 @@ CREATE TABLE disttable2(time timestamptz, device int, temp float) TABLESPACE tab
SELECT create_distributed_hypertable('disttable2', 'time', chunk_time_interval => 1000000::bigint);
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
@ -927,16 +927,16 @@ SELECT detach_tablespaces('disttable2');
SELECT * FROM show_tablespaces('disttable2');
-- Ensure tablespace API works for data nodes
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespace('tablespace2', 'disttable2');
$$);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT attach_tablespace('tablespace2', 'disttable2');
$$);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT detach_tablespaces('disttable2');
$$);
DROP TABLE disttable2;
@ -945,7 +945,7 @@ CREATE TABLE disttable2(time timestamptz, device int, temp float) TABLESPACE tab
SELECT create_hypertable('disttable2', 'time', chunk_time_interval => 1000000::bigint, replication_factor => 1);
-- Ensure that table is created on the data nodes without a tablespace
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
SELECT * FROM show_tablespaces('disttable2');
$$);
@ -1036,14 +1036,14 @@ CREATE TABLE devices (
device_name VARCHAR(10)
);
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
CREATE TABLE devices(device_id INTEGER PRIMARY KEY, device_name VARCHAR(10))
$$);
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765');
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
INSERT INTO devices VALUES
(1, 'A001'), (2, 'B015'), (3, 'D821'), (4, 'C561'), (5, 'D765')
$$);
@ -1134,7 +1134,7 @@ SELECT * FROM set_replication_factor('hyper', replication_factor => 4);
\set ON_ERROR_STOP 1
DROP TABLE hyper;
SELECT * FROM distributed_exec($$
CALL distributed_exec($$
DROP TABLE devices;
$$);
DROP TABLE devices;

View File

@ -78,7 +78,7 @@ CREATE AGGREGATE custom_sum(int4) (
);
-- Set seed on all data nodes for ANALYZE to sample consistently
SELECT distributed_exec($$ SELECT setseed(1); $$);
CALL distributed_exec($$ SELECT setseed(1); $$);
ANALYZE reference;
ANALYZE hyper;
ANALYZE hyper1d;