Support VACUUM on distributed hypertables

This patch upgrades existing dist_cmd functions to support
executing commands which cannot be run inside an active transaction,
such as the VACUUM command.
This commit is contained in:
Dmitry Simonenko 2019-10-15 16:33:40 +03:00 committed by Erik Nordström
parent 2b1b1bdf87
commit a8ca09b307
15 changed files with 515 additions and 119 deletions

View File

@ -717,8 +717,13 @@ process_vacuum(ProcessUtilityArgs *args)
/* Exclude distributed hypertables from the list of relations
* to vacuum since they contain no local tuples. */
if (hypertable_is_distributed(ht) && (get_vacuum_options(stmt) & VACOPT_VACUUM))
continue;
if (hypertable_is_distributed(ht))
{
int opts = get_vacuum_options(stmt);
if (opts & VACOPT_VACUUM)
continue;
}
ctx.ht_vacuum_rel = vacuum_rel;
foreach_chunk(ht, add_chunk_to_vacuum, &ctx);

View File

@ -172,15 +172,25 @@ create_foreign_server(const char *const node_name, const char *const host, int32
}
TSConnection *
data_node_get_connection(const char *const data_node, RemoteTxnPrepStmtOption const ps_opt)
data_node_get_connection(const char *const data_node, RemoteTxnPrepStmtOption const ps_opt,
bool transactional)
{
const ForeignServer *server;
TSConnectionId id;
TSConnection *conn;
Cache *conn_cache;
Assert(data_node != NULL);
server = data_node_get_foreign_server(data_node, ACL_NO_CHECK, false);
id = remote_connection_id(server->serverid, GetUserId());
return remote_dist_txn_get_connection(id, ps_opt);
if (transactional)
return remote_dist_txn_get_connection(id, ps_opt);
conn_cache = remote_connection_cache_pin();
conn = remote_connection_cache_get_connection(conn_cache, id);
ts_cache_release(conn_cache);
return conn;
}
/* Attribute numbers for datum returned by create_data_node() */

View File

@ -19,7 +19,8 @@ extern ForeignServer *data_node_get_foreign_server(const char *node_name, AclMod
extern ForeignServer *data_node_get_foreign_server_by_oid(Oid foreign_server_oid, AclMode mode);
extern TSConnection *data_node_get_connection(const char *const data_node,
RemoteTxnPrepStmtOption const ps_opt);
RemoteTxnPrepStmtOption const ps_opt,
bool transactional);
extern Datum data_node_add(PG_FUNCTION_ARGS);
extern Datum data_node_delete(PG_FUNCTION_ARGS);

View File

@ -196,7 +196,8 @@ dist_util_remote_hypertable_info(PG_FUNCTION_ARGS)
funcctx->user_fctx =
ts_dist_cmd_invoke_on_data_nodes("SELECT * FROM "
"timescaledb_information.hypertable_size_info;",
list_make1((void *) node_name));
list_make1((void *) node_name),
true);
funcctx->attinmeta = TupleDescGetAttInMetadata(tupdesc);
MemoryContextSwitchTo(oldcontext);

View File

@ -78,7 +78,7 @@ hypertable_create_backend_tables(int32 hypertable_id, List *data_nodes)
foreach (cell, deparse_get_tabledef_commands(ht->main_table_relid))
ts_dist_cmd_run_on_data_nodes(lfirst(cell), data_nodes);
dist_res = ts_dist_cmd_invoke_on_data_nodes(commands->table_create_command, data_nodes);
dist_res = ts_dist_cmd_invoke_on_data_nodes(commands->table_create_command, data_nodes, true);
foreach (cell, data_nodes)
{
PGresult *res = ts_dist_cmd_get_data_node_result(dist_res, lfirst(cell));

View File

@ -71,7 +71,7 @@ ts_dist_cmd_collect_responses(List *requests)
* server OIDs.
*/
DistCmdResult *
ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *data_nodes)
ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *data_nodes, bool transactional)
{
ListCell *lc;
List *requests = NIL;
@ -97,7 +97,8 @@ ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *data_nodes)
foreach (lc, data_nodes)
{
const char *node_name = lfirst(lc);
TSConnection *connection = data_node_get_connection(node_name, REMOTE_TXN_NO_PREP_STMT);
TSConnection *connection =
data_node_get_connection(node_name, REMOTE_TXN_NO_PREP_STMT, transactional);
AsyncRequest *req = async_request_send(connection, sql);
async_request_attach_user_data(req, (char *) node_name);
@ -112,7 +113,7 @@ ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *data_nodes)
DistCmdResult *
ts_dist_cmd_invoke_on_data_nodes_using_search_path(const char *sql, const char *search_path,
List *node_names)
List *node_names, bool transactional)
{
DistCmdResult *set_result;
DistCmdResult *results;
@ -122,18 +123,20 @@ ts_dist_cmd_invoke_on_data_nodes_using_search_path(const char *sql, const char *
{
char *set_request = psprintf("SET search_path = %s, pg_catalog", search_path);
set_result = ts_dist_cmd_invoke_on_data_nodes(set_request, node_names);
set_result = ts_dist_cmd_invoke_on_data_nodes(set_request, node_names, transactional);
if (set_result)
ts_dist_cmd_close_response(set_result);
pfree(set_request);
}
results = ts_dist_cmd_invoke_on_data_nodes(sql, node_names);
results = ts_dist_cmd_invoke_on_data_nodes(sql, node_names, transactional);
if (set_search_path)
{
set_result = ts_dist_cmd_invoke_on_data_nodes("SET search_path = pg_catalog", node_names);
set_result = ts_dist_cmd_invoke_on_data_nodes("SET search_path = pg_catalog",
node_names,
transactional);
if (set_result)
ts_dist_cmd_close_response(set_result);
}
@ -144,7 +147,7 @@ ts_dist_cmd_invoke_on_data_nodes_using_search_path(const char *sql, const char *
DistCmdResult *
ts_dist_cmd_invoke_on_all_data_nodes(const char *sql)
{
return ts_dist_cmd_invoke_on_data_nodes(sql, data_node_get_node_name_list());
return ts_dist_cmd_invoke_on_data_nodes(sql, data_node_get_node_name_list(), true);
}
/*
@ -158,14 +161,15 @@ ts_dist_cmd_invoke_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_n
if (NIL == data_nodes)
data_nodes = data_node_get_node_name_list();
return ts_dist_cmd_invoke_on_data_nodes(deparse_func_call(fcinfo), data_nodes);
return ts_dist_cmd_invoke_on_data_nodes(deparse_func_call(fcinfo), data_nodes, true);
}
DistCmdResult *
ts_dist_cmd_invoke_func_call_on_all_data_nodes(FunctionCallInfo fcinfo)
{
return ts_dist_cmd_invoke_on_data_nodes(deparse_func_call(fcinfo),
data_node_get_node_name_list());
data_node_get_node_name_list(),
true);
}
/*
@ -226,7 +230,7 @@ ts_dist_cmd_prepare_command(const char *sql, size_t n_params, List *node_names)
foreach (lc, node_names)
{
const char *name = lfirst(lc);
TSConnection *connection = data_node_get_connection(name, REMOTE_TXN_USE_PREP_STMT);
TSConnection *connection = data_node_get_connection(name, REMOTE_TXN_USE_PREP_STMT, true);
DistPreparedStmt *cmd = palloc(sizeof(DistPreparedStmt));
AsyncRequest *ar = async_request_send_prepare(connection, sql, n_params);
@ -304,7 +308,11 @@ ts_dist_cmd_exec(PG_FUNCTION_ARGS)
data_node_list = data_node_array_to_node_name_list(data_nodes);
search_path = GetConfigOption("search_path", false, false);
result = ts_dist_cmd_invoke_on_data_nodes_using_search_path(query, search_path, data_node_list);
result = ts_dist_cmd_invoke_on_data_nodes_using_search_path(query,
search_path,
data_node_list,
true);
if (result)
ts_dist_cmd_close_response(result);

View File

@ -12,10 +12,12 @@
typedef struct DistCmdResult DistCmdResult;
typedef struct List PreparedDistCmd;
extern DistCmdResult *ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *node_names);
extern DistCmdResult *ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *node_names,
bool transactional);
extern DistCmdResult *ts_dist_cmd_invoke_on_data_nodes_using_search_path(const char *sql,
const char *search_path,
List *node_names);
List *node_names,
bool transactional);
extern DistCmdResult *ts_dist_cmd_invoke_on_all_data_nodes(const char *sql);
extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_all_data_nodes(FunctionCallInfo fcinfo);
extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_data_nodes(FunctionCallInfo fcinfo,
@ -26,7 +28,7 @@ extern PGresult *ts_dist_cmd_get_data_node_result(DistCmdResult *response, const
extern void ts_dist_cmd_close_response(DistCmdResult *response);
#define ts_dist_cmd_run_on_data_nodes(command, nodes) \
ts_dist_cmd_close_response(ts_dist_cmd_invoke_on_data_nodes(command, nodes));
ts_dist_cmd_close_response(ts_dist_cmd_invoke_on_data_nodes(command, nodes, true));
extern PreparedDistCmd *ts_dist_cmd_prepare_command(const char *sql, size_t n_params,
List *node_names);

View File

@ -288,7 +288,7 @@ create_connection_list_for_chunk(CopyConnectionState *state, Chunk *chunk)
{
ChunkDataNode *cdn = lfirst(lc);
TSConnection *connection =
data_node_get_connection(NameStr(cdn->fd.node_name), REMOTE_TXN_NO_PREP_STMT);
data_node_get_connection(NameStr(cdn->fd.node_name), REMOTE_TXN_NO_PREP_STMT, true);
start_remote_copy_on_new_connection(state, connection);
chunk_connections->connections = lappend(chunk_connections->connections, connection);

View File

@ -25,6 +25,8 @@ typedef enum
DIST_DDL_EXEC_NONE,
/* Execute on start hook */
DIST_DDL_EXEC_ON_START,
/* Execute on start hook without using a transactions */
DIST_DDL_EXEC_ON_START_NO_2PC,
/* Execute on end hook */
DIST_DDL_EXEC_ON_END
} DistDDLExecType;
@ -159,17 +161,12 @@ dist_ddl_check_session(void)
static DistDDLExecType
dist_ddl_process_vacuum(VacuumStmt *stmt)
{
/* let analyze through */
if (get_vacuum_options(stmt) & VACOPT_ANALYZE)
return DIST_DDL_EXEC_NONE;
/* We do not support VERBOSE flag since it will require to print data
* returned from the data nodes */
if (get_vacuum_options(stmt) & VACOPT_VERBOSE)
dist_ddl_error_raise_unsupported();
/* VACCUM currently unsupported. A VACCUM cannot run inside a transaction
* block. Unfortunately, we currently execute all distributed DDL inside a
* distributed transaction. We need to add a way to run some DDL commands
* across "raw" connections. */
dist_ddl_error_raise_unsupported();
return DIST_DDL_EXEC_ON_START;
return DIST_DDL_EXEC_ON_START_NO_2PC;
}
static void
@ -337,7 +334,8 @@ dist_ddl_preprocess(ProcessUtilityArgs *args)
break;
case T_VacuumStmt:
dist_ddl_state.exec_type = dist_ddl_process_vacuum((VacuumStmt *) args->parsetree);
dist_ddl_state.exec_type =
dist_ddl_process_vacuum(castNode(VacuumStmt, args->parsetree));
break;
case T_TruncateStmt:
{
@ -386,7 +384,7 @@ dist_ddl_preprocess(ProcessUtilityArgs *args)
}
static void
dist_ddl_execute(void)
dist_ddl_execute(bool transactional)
{
DistCmdResult *result;
@ -397,7 +395,8 @@ dist_ddl_execute(void)
result = ts_dist_cmd_invoke_on_data_nodes_using_search_path(dist_ddl_state.query_string,
search_path,
dist_ddl_state.data_node_list);
dist_ddl_state.data_node_list,
transactional);
if (result)
ts_dist_cmd_close_response(result);
@ -431,8 +430,18 @@ dist_ddl_start(ProcessUtilityArgs *args)
dist_ddl_state.mctx = CurrentMemoryContext;
}
if (dist_ddl_state.exec_type == DIST_DDL_EXEC_ON_START)
dist_ddl_execute();
switch (dist_ddl_state.exec_type)
{
case DIST_DDL_EXEC_ON_START:
dist_ddl_execute(true);
break;
case DIST_DDL_EXEC_ON_START_NO_2PC:
dist_ddl_execute(false);
break;
case DIST_DDL_EXEC_ON_END:
case DIST_DDL_EXEC_NONE:
break;
}
}
void
@ -462,7 +471,7 @@ dist_ddl_end(EventTriggerData *command)
}
/* Execute command on remote data nodes. */
dist_ddl_execute();
dist_ddl_execute(true);
}
static bool

View File

@ -217,14 +217,105 @@ INSERT INTO disttable VALUES
('2018-07-01 06:01', 13, 1.4),
('2018-07-01 09:11', 90, 2.7),
('2018-07-01 08:01', 29, 1.5);
\set ON_ERROR_STOP 0
-- VACUUM currently not supported. A VACCUM cannot run in a
-- transaction block so we need to distribute the command to data
-- nodes using "raw" connections.
VACUUM ANALYZE disttable;
-- Test distributed VACUUM ANALYZE support. First show no statistics
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
NOTICE: [data_node_1]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_1]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_1_dist_chunk|r | 0| 0
_hyper_1_4_dist_chunk|r | 0| 0
(2 rows)
NOTICE: [data_node_2]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_2]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_3_dist_chunk|r | 0| 0
_hyper_1_5_dist_chunk|r | 0| 0
(2 rows)
NOTICE: [data_node_3]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_3]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_2_dist_chunk|r | 0| 0
_hyper_1_6_dist_chunk|r | 0| 0
(2 rows)
remote_exec
-------------
(1 row)
VACUUM (FULL, ANALYZE) disttable;
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
NOTICE: [data_node_1]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_1]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_1_dist_chunk|r | 2| 1
_hyper_1_4_dist_chunk|r | 1| 1
(2 rows)
NOTICE: [data_node_2]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_2]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_3_dist_chunk|r | 1| 1
_hyper_1_5_dist_chunk|r | 1| 1
(2 rows)
NOTICE: [data_node_3]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_3]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_2_dist_chunk|r | 1| 1
_hyper_1_6_dist_chunk|r | 2| 1
(2 rows)
remote_exec
-------------
(1 row)
VACUUM FULL disttable;
ERROR: operation not supported on distributed hypertable
VACUUM disttable;
\set ON_ERROR_STOP 0
-- VACUUM VERBOSE is not supported at the moment
VACUUM VERBOSE disttable;
ERROR: operation not supported on distributed hypertable
\set ON_ERROR_STOP 1
-- Test ANALYZE. First show no statistics
@ -397,9 +488,9 @@ SELECT node_name, "options" FROM timescaledb_information.data_node ORDER BY node
SELECT * FROM hypertable_data_node_relation_size('disttable');
node_name | num_chunks | table_size | index_size | toast_size | total_size
-------------+------------+------------+------------+------------+------------
data_node_3 | 2 | 16 kB | 96 kB | | 112 kB
data_node_1 | 2 | 16 kB | 96 kB | | 112 kB
data_node_2 | 2 | 16 kB | 96 kB | | 112 kB
data_node_3 | 2 | 80 kB | 96 kB | | 176 kB
data_node_1 | 2 | 80 kB | 96 kB | | 176 kB
data_node_2 | 2 | 80 kB | 96 kB | | 176 kB
(3 rows)
-- Show what some queries would look like on the frontend
@ -637,39 +728,44 @@ FROM disttable;
-- Test AsyncAppend when using window functions
EXPLAIN (VERBOSE, COSTS FALSE)
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WindowAgg
Output: disttable.device, disttable.temp, avg(disttable.temp) OVER (?)
-> Custom Scan (AsyncAppend)
Output: disttable.device, disttable.temp
-> Merge Append
Sort Key: disttable_1.device
-> Custom Scan (DataNodeScan) on public.disttable disttable_1
Output: disttable_1.device, disttable_1.temp
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_4_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_2
Output: disttable_2.device, disttable_2.temp
Data node: data_node_2
Chunks: _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_3
Output: disttable_3.device, disttable_3.temp
Data node: data_node_3
Chunks: _hyper_1_2_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
(21 rows)
FROM disttable
ORDER BY device, temp;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: disttable.device, disttable.temp, (avg(disttable.temp) OVER (?))
Sort Key: disttable.device, disttable.temp
-> WindowAgg
Output: disttable.device, disttable.temp, avg(disttable.temp) OVER (?)
-> Custom Scan (AsyncAppend)
Output: disttable.device, disttable.temp
-> Merge Append
Sort Key: disttable_1.device
-> Custom Scan (DataNodeScan) on public.disttable disttable_1
Output: disttable_1.device, disttable_1.temp
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_4_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_2
Output: disttable_2.device, disttable_2.temp
Data node: data_node_2
Chunks: _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_3
Output: disttable_3.device, disttable_3.temp
Data node: data_node_3
Chunks: _hyper_1_2_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
(24 rows)
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
FROM disttable
ORDER BY device, temp;
device | temp | avg
--------+------+------------------
1 | 1.1 | 1.23333333333333
1 | 1.2 | 1.23333333333333
1 | 1.4 | 1.23333333333333
1 | 1.1 | 1.23333333333333
2 | 1.3 | 1.3
3 | 2.1 | 2.1
13 | 1.4 | 1.4

View File

@ -217,14 +217,105 @@ INSERT INTO disttable VALUES
('2018-07-01 06:01', 13, 1.4),
('2018-07-01 09:11', 90, 2.7),
('2018-07-01 08:01', 29, 1.5);
\set ON_ERROR_STOP 0
-- VACUUM currently not supported. A VACCUM cannot run in a
-- transaction block so we need to distribute the command to data
-- nodes using "raw" connections.
VACUUM ANALYZE disttable;
-- Test distributed VACUUM ANALYZE support. First show no statistics
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
NOTICE: [data_node_1]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_1]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_1_dist_chunk|r | 0| 0
_hyper_1_4_dist_chunk|r | 0| 0
(2 rows)
NOTICE: [data_node_2]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_2]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_3_dist_chunk|r | 0| 0
_hyper_1_5_dist_chunk|r | 0| 0
(2 rows)
NOTICE: [data_node_3]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_3]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_2_dist_chunk|r | 0| 0
_hyper_1_6_dist_chunk|r | 0| 0
(2 rows)
remote_exec
-------------
(1 row)
VACUUM (FULL, ANALYZE) disttable;
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
NOTICE: [data_node_1]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_1]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_1_dist_chunk|r | 2| 1
_hyper_1_4_dist_chunk|r | 1| 1
(2 rows)
NOTICE: [data_node_2]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_2]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_3_dist_chunk|r | 1| 1
_hyper_1_5_dist_chunk|r | 1| 1
(2 rows)
NOTICE: [data_node_3]:
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass
NOTICE: [data_node_3]:
relname |relkind|reltuples|relpages
---------------------+-------+---------+--------
_hyper_1_2_dist_chunk|r | 1| 1
_hyper_1_6_dist_chunk|r | 2| 1
(2 rows)
remote_exec
-------------
(1 row)
VACUUM FULL disttable;
ERROR: operation not supported on distributed hypertable
VACUUM disttable;
\set ON_ERROR_STOP 0
-- VACUUM VERBOSE is not supported at the moment
VACUUM VERBOSE disttable;
ERROR: operation not supported on distributed hypertable
\set ON_ERROR_STOP 1
-- Test ANALYZE. First show no statistics
@ -397,9 +488,9 @@ SELECT node_name, "options" FROM timescaledb_information.data_node ORDER BY node
SELECT * FROM hypertable_data_node_relation_size('disttable');
node_name | num_chunks | table_size | index_size | toast_size | total_size
-------------+------------+------------+------------+------------+------------
data_node_3 | 2 | 16 kB | 96 kB | | 112 kB
data_node_1 | 2 | 16 kB | 96 kB | | 112 kB
data_node_2 | 2 | 16 kB | 96 kB | | 112 kB
data_node_3 | 2 | 80 kB | 96 kB | | 176 kB
data_node_1 | 2 | 80 kB | 96 kB | | 176 kB
data_node_2 | 2 | 80 kB | 96 kB | | 176 kB
(3 rows)
-- Show what some queries would look like on the frontend
@ -637,39 +728,44 @@ FROM disttable;
-- Test AsyncAppend when using window functions
EXPLAIN (VERBOSE, COSTS FALSE)
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------
WindowAgg
Output: disttable.device, disttable.temp, avg(disttable.temp) OVER (?)
-> Custom Scan (AsyncAppend)
Output: disttable.device, disttable.temp
-> Merge Append
Sort Key: disttable_1.device
-> Custom Scan (DataNodeScan) on public.disttable disttable_1
Output: disttable_1.device, disttable_1.temp
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_4_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_2
Output: disttable_2.device, disttable_2.temp
Data node: data_node_2
Chunks: _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_3
Output: disttable_3.device, disttable_3.temp
Data node: data_node_3
Chunks: _hyper_1_2_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
(21 rows)
FROM disttable
ORDER BY device, temp;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort
Output: disttable.device, disttable.temp, (avg(disttable.temp) OVER (?))
Sort Key: disttable.device, disttable.temp
-> WindowAgg
Output: disttable.device, disttable.temp, avg(disttable.temp) OVER (?)
-> Custom Scan (AsyncAppend)
Output: disttable.device, disttable.temp
-> Merge Append
Sort Key: disttable_1.device
-> Custom Scan (DataNodeScan) on public.disttable disttable_1
Output: disttable_1.device, disttable_1.temp
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_4_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_2
Output: disttable_2.device, disttable_2.temp
Data node: data_node_2
Chunks: _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
-> Custom Scan (DataNodeScan) on public.disttable disttable_3
Output: disttable_3.device, disttable_3.temp
Data node: data_node_3
Chunks: _hyper_1_2_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT device, temp FROM public.disttable WHERE _timescaledb_internal.chunks_in(disttable, ARRAY[1, 2]) ORDER BY device ASC NULLS LAST
(24 rows)
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
FROM disttable
ORDER BY device, temp;
device | temp | avg
--------+------+------------------
1 | 1.1 | 1.23333333333333
1 | 1.2 | 1.23333333333333
1 | 1.4 | 1.23333333333333
1 | 1.1 | 1.23333333333333
2 | 1.3 | 1.3
3 | 2.1 | 2.1
13 | 1.4 | 1.4

View File

@ -15,6 +15,9 @@ RETURNS VOID
AS :TSL_MODULE_PATHNAME, 'ts_remote_exec'
LANGUAGE C;
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
CREATE OR REPLACE FUNCTION test_override_pushdown_timestamptz(new_value TIMESTAMPTZ) RETURNS VOID
AS :TSL_MODULE_PATHNAME, 'test_override_pushdown_timestamptz'
LANGUAGE C VOLATILE STRICT;
-- Cleanup from other potential tests that created these databases
SET client_min_messages TO ERROR;
DROP DATABASE IF EXISTS data_node_1;
@ -2106,6 +2109,154 @@ LIMIT 1;
Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[2, 1]) AND (("time" < '2018-06-01 00:00:00-07'::timestamp with time zone)) ORDER BY "time" ASC NULLS LAST, device ASC NULLS LAST
(21 rows)
-- contains whitelisted time expressions
SELECT test_override_pushdown_timestamptz('2018-06-01 00:00'::timestamptz);
test_override_pushdown_timestamptz
------------------------------------
(1 row)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM pg2dim
WHERE time < Now( ) - INTERVAL '3 days'
GROUP BY 1, 2
LIMIT 1;
QUERY PLAN
-----------------------------------------------------------------------------------------
Limit
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, (avg(pg2dim_h1_t1.temp))
-> Append
-> HashAggregate
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, avg(pg2dim_h1_t1.temp)
Group Key: pg2dim_h1_t1."time", pg2dim_h1_t1.device
-> Foreign Scan on public.pg2dim_h1_t1
Output: pg2dim_h1_t1."time", pg2dim_h1_t1.device, pg2dim_h1_t1.temp
Filter: (pg2dim_h1_t1."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t1
-> HashAggregate
Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, avg(pg2dim_h1_t2.temp)
Group Key: pg2dim_h1_t2."time", pg2dim_h1_t2.device
-> Foreign Scan on public.pg2dim_h1_t2
Output: pg2dim_h1_t2."time", pg2dim_h1_t2.device, pg2dim_h1_t2.temp
Filter: (pg2dim_h1_t2."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t2
-> HashAggregate
Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, avg(pg2dim_h1_t3.temp)
Group Key: pg2dim_h1_t3."time", pg2dim_h1_t3.device
-> Foreign Scan on public.pg2dim_h1_t3
Output: pg2dim_h1_t3."time", pg2dim_h1_t3.device, pg2dim_h1_t3.temp
Filter: (pg2dim_h1_t3."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h1_t3
-> HashAggregate
Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, avg(pg2dim_h2_t1.temp)
Group Key: pg2dim_h2_t1."time", pg2dim_h2_t1.device
-> Foreign Scan on public.pg2dim_h2_t1
Output: pg2dim_h2_t1."time", pg2dim_h2_t1.device, pg2dim_h2_t1.temp
Filter: (pg2dim_h2_t1."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t1
-> HashAggregate
Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, avg(pg2dim_h2_t2.temp)
Group Key: pg2dim_h2_t2."time", pg2dim_h2_t2.device
-> Foreign Scan on public.pg2dim_h2_t2
Output: pg2dim_h2_t2."time", pg2dim_h2_t2.device, pg2dim_h2_t2.temp
Filter: (pg2dim_h2_t2."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t2
-> HashAggregate
Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, avg(pg2dim_h2_t3.temp)
Group Key: pg2dim_h2_t3."time", pg2dim_h2_t3.device
-> Foreign Scan on public.pg2dim_h2_t3
Output: pg2dim_h2_t3."time", pg2dim_h2_t3.device, pg2dim_h2_t3.temp
Filter: (pg2dim_h2_t3."time" < (now() - '@ 3 days'::interval))
Remote SQL: SELECT "time", device, temp FROM public.pg2dim_h2_t3
(45 rows)
EXPLAIN (VERBOSE, COSTS OFF)
SELECT time, device, avg(temp)
FROM hyper
WHERE time < Now( ) - INTERVAL '3 days'
GROUP BY 1, 2
LIMIT 1;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: "time", device, (avg(temp))
-> Custom Scan (AsyncAppend)
Output: "time", device, (avg(temp))
-> Append
-> Custom Scan (DataNodeScan)
Output: hyper."time", hyper.device, (avg(hyper.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
-> Custom Scan (DataNodeScan)
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_2
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
(17 rows)
-- Verify that repeated runs of the same plan will get different timestamps
PREPARE timestamp_pushdown_test AS
SELECT time, device, avg(temp)
FROM hyper
WHERE time < now() - INTERVAL '3 days'
GROUP BY 1, 2
LIMIT 1;
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE timestamp_pushdown_test;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: "time", device, (avg(temp))
-> Custom Scan (AsyncAppend)
Output: "time", device, (avg(temp))
-> Append
-> Custom Scan (DataNodeScan)
Output: hyper."time", hyper.device, (avg(hyper.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
-> Custom Scan (DataNodeScan)
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_2
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2018-06-01 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
(17 rows)
SELECT test_override_pushdown_timestamptz('2019-10-15 00:00'::timestamptz);
test_override_pushdown_timestamptz
------------------------------------
(1 row)
EXPLAIN (VERBOSE, COSTS OFF)
EXECUTE timestamp_pushdown_test;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit
Output: "time", device, (avg(temp))
-> Custom Scan (AsyncAppend)
Output: "time", device, (avg(temp))
-> Append
-> Custom Scan (DataNodeScan)
Output: hyper."time", hyper.device, (avg(hyper.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_1
Chunks: _hyper_1_1_dist_chunk, _hyper_1_3_dist_chunk, _hyper_1_5_dist_chunk, _hyper_1_7_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3, 4]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
-> Custom Scan (DataNodeScan)
Output: hyper_1."time", hyper_1.device, (avg(hyper_1.temp))
Relations: Aggregate on (public.hyper)
Data node: data_node_2
Chunks: _hyper_1_2_dist_chunk, _hyper_1_4_dist_chunk, _hyper_1_6_dist_chunk
Remote SQL: SELECT "time", device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(hyper, ARRAY[1, 2, 3]) AND (("time" < (('2019-10-15 00:00:00-07'::timestamptz) - '3 days'::interval))) GROUP BY 1, 2
(17 rows)
-- Test one-dimensional push down
CREATE TABLE hyper1d (time timestamptz, device int, temp float);
SELECT * FROM create_distributed_hypertable('hyper1d', 'time', chunk_time_interval => '3 months'::interval);

View File

@ -121,13 +121,27 @@ INSERT INTO disttable VALUES
('2018-07-01 09:11', 90, 2.7),
('2018-07-01 08:01', 29, 1.5);
\set ON_ERROR_STOP 0
-- VACUUM currently not supported. A VACCUM cannot run in a
-- transaction block so we need to distribute the command to data
-- nodes using "raw" connections.
VACUUM ANALYZE disttable;
-- Test distributed VACUUM ANALYZE support. First show no statistics
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
VACUUM (FULL, ANALYZE) disttable;
SELECT * FROM test.remote_exec('{ data_node_1, data_node_2, data_node_3 }', $$
SELECT relname, relkind, reltuples, relpages
FROM pg_class cl, (SELECT show_chunks AS chunk FROM show_chunks('disttable')) ch
WHERE cl.oid = ch.chunk::regclass;
$$);
VACUUM FULL disttable;
VACUUM disttable;
\set ON_ERROR_STOP 0
-- VACUUM VERBOSE is not supported at the moment
VACUUM VERBOSE disttable;
\set ON_ERROR_STOP 1
-- Test ANALYZE. First show no statistics
@ -234,10 +248,12 @@ FROM disttable;
-- Test AsyncAppend when using window functions
EXPLAIN (VERBOSE, COSTS FALSE)
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
FROM disttable
ORDER BY device, temp;
SELECT device, temp, avg(temp) OVER (PARTITION BY device)
FROM disttable;
FROM disttable
ORDER BY device, temp;
-- Test remote explain

View File

@ -55,7 +55,8 @@ tsl_invoke_distributed_commands(PG_FUNCTION_ARGS)
results = ts_dist_cmd_invoke_on_data_nodes("CREATE TABLE public.disttable2(time timestamptz, "
"device int, temp float);",
subset_nodes);
subset_nodes,
true);
TestAssertTrue(ts_dist_cmd_get_data_node_result(results, llast(data_nodes)) == NULL);
foreach (lc, subset_nodes)

View File

@ -179,7 +179,7 @@ ts_remote_exec(PG_FUNCTION_ARGS)
foreach (lc, data_node_list)
{
const char *node_name = lfirst(lc);
TSConnection *conn = data_node_get_connection(node_name, REMOTE_TXN_USE_PREP_STMT);
TSConnection *conn = data_node_get_connection(node_name, REMOTE_TXN_USE_PREP_STMT, true);
/* Configure connection to be compatible with current options of the test env */
set_connection_settings(conn);