From 048d67af935687f7abff66dfc8089345eaf8d2fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstr=C3=B6m?= Date: Fri, 6 Nov 2020 13:10:48 +0100 Subject: [PATCH] Fix subquery errors when using AsyncAppend This change fixes errors when using sub-queries together with AsyncAppend on distributed hypertables. Since all subqueries are sent to remote data nodes as separate queries on the same connection, it is important that one query does not block the connection once the other query starts to execute. In other words, a query must complete a request and retrieve the result before another (sub-)query can execute. During query execution, this means that the following invariant must hold: An executor node cannot leave a request (or result) pending on a connection after returning a tuple, since for the next tuple a different executor node can be called to execute another sub-query on the same connection. This would be the case if two sub-queries are, e.g., joined with a nested loop. AsyncAppend (and associated fetchers) failed the above invariant; the executor node could leave a request unfinished after returning a tuple, causing the connection to be in an unexpected state when another AsyncAppend was called as part of a join with another sub-query. It turns out that only cursor fetcher can be used in these cases without having to fetch and buffer the entire result set of a sub-query (with a `CURSOR` the request can be broken up in multiple separate `FETCH` requests that can be interleaved with other sub-queries). Unfortunately, when executing a query using a `CURSOR`, it doesn't support parallel execution (on the data node). Previously, this was solved by defaulting to another "row-by-row" method of fetching data. However, row-by-row suffers from the same issue of leaving the connection in an unfinished state unless the whole result set is read and buffered, which could potentially blow up memory. The following changes are made to address the situation: * The cursor fetcher is made the default fetcher so that all queries can be executed without errors. * Fixes are also applied to the cursor fetcher to make sure it does not have pending queries after AsyncAppend returns a tuple. * AsyncAppend is similarly tweaked to avoid leaving connections with pending queries. * The ability to set the `fetch_size` (number of tuples to fetch in each `FETCH` request) and other options at the foreign data wrapper level is added. This allows changing FDW options globally for all data nodes, as the previous method of setting them on each foreign server is currently blocked. Setting a smaller `fetch_size` is often necessary to trigger the erroneous behavior. Unfortunately, these changes might lead to a regression in query performance. With cursor fetcher, queries cannot execute in parallel mode on data nodes. Further, the previous code was more aggressive with sending new fetch requests after each batch (for performance reasons), but this is not possible when sub-query joins are present. A future optimization could, for instance, use the row-by-row fetcher by default and fall back to cursor fetcher if it detects joins between sub-queries. Fixes #2511 --- src/guc.c | 2 +- tsl/src/async_append.c | 90 ++- tsl/src/async_append.h | 6 +- tsl/src/fdw/data_node_scan_exec.c | 17 +- tsl/src/fdw/option.c | 8 +- tsl/src/fdw/relinfo.c | 38 +- tsl/src/fdw/scan_exec.c | 11 +- tsl/src/fdw/scan_exec.h | 2 +- tsl/src/fdw/scan_plan.c | 4 +- tsl/src/remote/async.c | 68 ++- tsl/src/remote/cursor_fetcher.c | 47 +- tsl/src/remote/cursor_fetcher.h | 2 +- tsl/src/remote/data_fetcher.c | 17 +- tsl/src/remote/data_fetcher.h | 20 +- tsl/src/remote/row_by_row_fetcher.c | 35 +- tsl/src/remote/row_by_row_fetcher.h | 2 +- tsl/test/expected/dist_query-11.out | 746 ++++++++++++++++++++++++ tsl/test/expected/dist_query-12.out | 731 +++++++++++++++++++++++ tsl/test/sql/data_fetcher.sql | 10 +- tsl/test/sql/dist_query.sql.in | 5 + tsl/test/sql/include/dist_query_run.sql | 36 ++ 21 files changed, 1762 insertions(+), 135 deletions(-) diff --git a/src/guc.c b/src/guc.c index 143708f69..fe48e7e16 100644 --- a/src/guc.c +++ b/src/guc.c @@ -284,7 +284,7 @@ _guc_init(void) "Pick data fetcher type based on type of queries you plan to run " "(rowbyrow or cursor)", (int *) &ts_guc_remote_data_fetcher, - RowByRowFetcherType, + CursorFetcherType, remote_data_fetchers, PGC_USERSET, 0, diff --git a/tsl/src/async_append.c b/tsl/src/async_append.c index ef99dcb5e..84edc11dc 100644 --- a/tsl/src/async_append.c +++ b/tsl/src/async_append.c @@ -25,12 +25,59 @@ #include "hypertable_cache.h" /* - * AsyncAppend provides an asynchronous API during query execution to create cursors and - * fetch data from data nodes more efficiently. This should make better use of our data nodes - * and do more things in parallel. AsyncAppend is only used for plans - * that involve distributed hypertable (a plan that involves scanning of data nodes). - * The node is injected as a parent of Append or MergeAppend nodes. - * Here is how the modified plan looks like. + * AsyncAppend provides an asynchronous API during query execution that + * decouples the sending of query requests from the reading of the result. + * + * Normally, an Append executes serially, i.e., it first executes the first + * child node, then the second, and so forth. In the case of a distributed + * query, that means the query on the second data node will not start + * executing until the first node has finished. Thus, if there are three data + * nodes, the remote communication will proceed as follows: + * + * 1. Send query to data node 1. + * 2. Get data from data node 1. + * 3. Send query to data node 2. + * 4. Get data from data node 2. + * 5. Send query to data node 3. + * 6. Get data from data node 4. + * + * Since a data node will always need some time to process a query before it + * is ready to send back results, this won't be very efficient. + + * In contrast, AsyncAppend makes sure that all data node requests are sent + * before any data is read: + * + * 1. Send query to data node 1. + * 2. Send query to data node 2. + * 3. Send query to data node 3. + * 4. Get data from data node 1. + * 5. Get data from data node 2. + * 6. Get data from data node 4. + * + * With asynchronous approach, data node 2 and 3 will start processing their + * queries while the data from data node 1 is still being read. + * + * There's a caveat with this asynchronous approach, however. Since there's + * only one connection to each data node (to make sure that each data node is + * tied to a single transaction and snapshot), it is not possible to start + * executing a new query on the same data node until the first query is + * complete (to ensure the connection in idle state). This is important if a + * query consists of several sub-queries that are sent as separate queries to + * the same node. In that case, the entire result of the first sub-query must + * be fetched before proceeding with the next sub-query, which may cause + * memory blow up. + * + * The sub-query issue can be solved by using a CURSOR to break up a query in + * batches (multiple FETCH statements that fetch a fixed amount of rows each + * time). FETCH statements for multiple CURSORs (for different sub-queries) + * can be interleaved as long as they always read the full batch before + * returning. The downside of a CURSOR, however, is that it doesn't support + * parallel execution of the query on the data nodes. + * + * AsyncAppend is only used for plans that involve distributed hypertables (a + * plan that involves scanning of data nodes). The node is injected as a + * parent of Append or MergeAppend nodes. Here is how the modified plan looks + * like. * * ....... * | @@ -50,9 +97,14 @@ * ..... * * - * Since the PostgreSQL planner treats partitioned relations in a special way (throwing away - * existing and generating new paths), we needed to adjust plan paths at a later stage, - * thus using upper path hooks to do that. + * Since the PostgreSQL planner treats partitioned relations in a special way + * (throwing away existing and generating new paths), we needed to adjust plan + * paths at a later stage, thus using upper path hooks to do that. + * + * There are ways asynchronous appends can be further improved. For instance, + * after sending the initial queries to all nodes, the append node should pick + * the child to read based on which data node returns data first instead of + * just picking the first child. * */ @@ -184,9 +236,15 @@ init(AsyncScanState *ass) } static void -fetch_tuples(AsyncScanState *ass) +send_fetch_request(AsyncScanState *ass) { - ass->fetch_tuples(ass); + ass->send_fetch_request(ass); +} + +static void +fetch_data(AsyncScanState *ass) +{ + ass->fetch_data(ass); } static TupleTableSlot * @@ -201,16 +259,16 @@ async_append_exec(CustomScanState *node) if (state->first_run) { - /* Since we can't start sending requests in the BeginCustomScan stage (it's not guaranteed - * that a node will execute so we might end up sending requests that never get completed) we - * do it'here */ state->first_run = false; iterate_data_nodes_and_exec(state, init); - iterate_data_nodes_and_exec(state, fetch_tuples); + iterate_data_nodes_and_exec(state, send_fetch_request); + /* Fetch a new data batch into all sub-nodes. This will clear the + * connection for new requests (important when there are, e.g., + * subqueries that share the connection). */ + iterate_data_nodes_and_exec(state, fetch_data); } ResetExprContext(econtext); - slot = ExecProcNode(state->subplan_state); econtext->ecxt_scantuple = slot; diff --git a/tsl/src/async_append.h b/tsl/src/async_append.h index 2e65d6c39..c1705a723 100644 --- a/tsl/src/async_append.h +++ b/tsl/src/async_append.h @@ -30,8 +30,12 @@ typedef struct AsyncAppendPath typedef struct AsyncScanState { CustomScanState css; + /* Initialize the scan state */ void (*init)(struct AsyncScanState *state); - void (*fetch_tuples)(struct AsyncScanState *state); + /* Send a request for new data */ + void (*send_fetch_request)(struct AsyncScanState *state); + /* Fetch the actual data */ + void (*fetch_data)(struct AsyncScanState *state); } AsyncScanState; extern void async_append_add_paths(PlannerInfo *root, RelOptInfo *hyper_rel); diff --git a/tsl/src/fdw/data_node_scan_exec.c b/tsl/src/fdw/data_node_scan_exec.c index b9dfb23e5..7b3af97f9 100644 --- a/tsl/src/fdw/data_node_scan_exec.c +++ b/tsl/src/fdw/data_node_scan_exec.c @@ -155,7 +155,16 @@ static void create_fetcher(AsyncScanState *ass) { DataNodeScanState *dnss = (DataNodeScanState *) ass; - create_data_fetcher(&dnss->async_state.css.ss, &dnss->fsstate, FETCH_ASYNC); + create_data_fetcher(&dnss->async_state.css.ss, &dnss->fsstate); +} + +static void +send_fetch_request(AsyncScanState *ass) +{ + DataNodeScanState *dnss = (DataNodeScanState *) ass; + DataFetcher *fetcher = dnss->fsstate.fetcher; + + fetcher->funcs->send_fetch_request(fetcher); } static void @@ -163,7 +172,8 @@ fetch_data(AsyncScanState *ass) { DataNodeScanState *dnss = (DataNodeScanState *) ass; DataFetcher *fetcher = dnss->fsstate.fetcher; - fetcher->funcs->fetch_data_start(fetcher); + + fetcher->funcs->fetch_data(fetcher); } Node * @@ -175,6 +185,7 @@ data_node_scan_state_create(CustomScan *cscan) dnss->async_state.css.methods = &data_node_scan_state_methods; dnss->systemcol = linitial_int(list_nth(cscan->custom_private, 1)); dnss->async_state.init = create_fetcher; - dnss->async_state.fetch_tuples = fetch_data; + dnss->async_state.send_fetch_request = send_fetch_request; + dnss->async_state.fetch_data = fetch_data; return (Node *) dnss; } diff --git a/tsl/src/fdw/option.c b/tsl/src/fdw/option.c index d2cc149f3..303d8a3c1 100644 --- a/tsl/src/fdw/option.c +++ b/tsl/src/fdw/option.c @@ -26,6 +26,7 @@ #include #include #include +#include #include #include #include @@ -143,13 +144,16 @@ init_ts_fdw_options(void) /* non-libpq FDW-specific FDW options */ static const TsFdwOption non_libpq_options[] = { /* cost factors */ + { "fdw_startup_cost", ForeignDataWrapperRelationId }, { "fdw_startup_cost", ForeignServerRelationId }, + { "fdw_tuple_cost", ForeignDataWrapperRelationId }, { "fdw_tuple_cost", ForeignServerRelationId }, /* shippable extensions */ + { "extensions", ForeignDataWrapperRelationId }, { "extensions", ForeignServerRelationId }, - /* fetch_size is available on both server and table */ + /* fetch_size is available on both foreign data wrapper and server */ + { "fetch_size", ForeignDataWrapperRelationId }, { "fetch_size", ForeignServerRelationId }, - { "fetch_size", ForeignTableRelationId }, { NULL, InvalidOid } }; diff --git a/tsl/src/fdw/relinfo.c b/tsl/src/fdw/relinfo.c index 554a0df6d..f06f4c0f0 100644 --- a/tsl/src/fdw/relinfo.c +++ b/tsl/src/fdw/relinfo.c @@ -44,29 +44,37 @@ #define DEFAULT_CHUNK_LOOKBACK_WINDOW 10 /* - * Parse options from foreign server and apply them to fpinfo. + * Parse options from the foreign data wrapper and foreign server and apply + * them to fpinfo. The server options take precedence over the data wrapper + * ones. * * New options might also require tweaking merge_fdw_options(). */ static void -apply_server_options(TsFdwRelInfo *fpinfo) +apply_fdw_and_server_options(TsFdwRelInfo *fpinfo) { ListCell *lc; + ForeignDataWrapper *fdw = GetForeignDataWrapper(fpinfo->server->fdwid); + List *options[] = { fdw->options, fpinfo->server->options }; + int i; - foreach (lc, fpinfo->server->options) + for (i = 0; i < lengthof(options); i++) { - DefElem *def = (DefElem *) lfirst(lc); + foreach (lc, options[i]) + { + DefElem *def = (DefElem *) lfirst(lc); - if (strcmp(def->defname, "fdw_startup_cost") == 0) - fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL); - else if (strcmp(def->defname, "fdw_tuple_cost") == 0) - fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL); - else if (strcmp(def->defname, "extensions") == 0) - fpinfo->shippable_extensions = - list_concat(fpinfo->shippable_extensions, - option_extract_extension_list(defGetString(def), false)); - else if (strcmp(def->defname, "fetch_size") == 0) - fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + if (strcmp(def->defname, "fdw_startup_cost") == 0) + fpinfo->fdw_startup_cost = strtod(defGetString(def), NULL); + else if (strcmp(def->defname, "fdw_tuple_cost") == 0) + fpinfo->fdw_tuple_cost = strtod(defGetString(def), NULL); + else if (strcmp(def->defname, "extensions") == 0) + fpinfo->shippable_extensions = + list_concat(fpinfo->shippable_extensions, + option_extract_extension_list(defGetString(def), false)); + else if (strcmp(def->defname, "fetch_size") == 0) + fpinfo->fetch_size = strtol(defGetString(def), NULL, 10); + } } } @@ -414,7 +422,7 @@ fdw_relinfo_create(PlannerInfo *root, RelOptInfo *rel, Oid server_oid, Oid local fpinfo->shippable_extensions = list_make1_oid(get_extension_oid(EXTENSION_NAME, true)); fpinfo->fetch_size = DEFAULT_FDW_FETCH_SIZE; - apply_server_options(fpinfo); + apply_fdw_and_server_options(fpinfo); /* * Identify which baserestrictinfo clauses can be sent to the data diff --git a/tsl/src/fdw/scan_exec.c b/tsl/src/fdw/scan_exec.c index 5c99331fc..293657ab5 100644 --- a/tsl/src/fdw/scan_exec.c +++ b/tsl/src/fdw/scan_exec.c @@ -90,12 +90,10 @@ fill_query_params_array(ExprContext *econtext, FmgrInfo *param_flinfo, List *par } /* - * Create cursor for node's query with current parameter values. - * Operation can be blocking or non-blocking, depending on the bool arg. - * In non blocking case we just dispatch async request to create cursor + * Create data fetcher for node's query with current parameter values. */ DataFetcher * -create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate, FetchMode mode) +create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate) { ExprContext *econtext = ss->ps.ps_ExprContext; int num_params = fsstate->num_params; @@ -135,8 +133,7 @@ create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate, FetchMode mode) ss, fsstate->retrieved_attrs, fsstate->query, - params, - mode); + params); fsstate->fetcher = fetcher; MemoryContextSwitchTo(oldcontext); @@ -318,7 +315,7 @@ fdw_scan_iterate(ScanState *ss, TsFdwScanState *fsstate) DataFetcher *fetcher = fsstate->fetcher; if (NULL == fetcher) - fetcher = create_data_fetcher(ss, fsstate, FETCH_NOASYNC); + fetcher = create_data_fetcher(ss, fsstate); tuple = fetcher->funcs->get_next_tuple(fetcher); diff --git a/tsl/src/fdw/scan_exec.h b/tsl/src/fdw/scan_exec.h index 552b7b2cb..f7e70ff4c 100644 --- a/tsl/src/fdw/scan_exec.h +++ b/tsl/src/fdw/scan_exec.h @@ -48,7 +48,7 @@ extern void fdw_scan_end(TsFdwScanState *fsstate); extern void fdw_scan_explain(ScanState *ss, List *fdw_private, ExplainState *es, TsFdwScanState *fsstate); -extern DataFetcher *create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate, FetchMode mode); +extern DataFetcher *create_data_fetcher(ScanState *ss, TsFdwScanState *fsstate); #ifdef TS_DEBUG diff --git a/tsl/src/fdw/scan_plan.c b/tsl/src/fdw/scan_plan.c index cd474e49e..c86981d09 100644 --- a/tsl/src/fdw/scan_plan.c +++ b/tsl/src/fdw/scan_plan.c @@ -333,7 +333,7 @@ fdw_scan_info_init(ScanInfo *scaninfo, PlannerInfo *root, RelOptInfo *rel, Path * For a join relation, FDW-specific information about the inner and outer * relations is provided using fpinfo_i and fpinfo_o. For an upper relation, * fpinfo_o provides the information for the input relation; fpinfo_i is - * expected to NULL. + * expected to be NULL. */ static void merge_fdw_options(TsFdwRelInfo *fpinfo, const TsFdwRelInfo *fpinfo_o, const TsFdwRelInfo *fpinfo_i) @@ -349,7 +349,7 @@ merge_fdw_options(TsFdwRelInfo *fpinfo, const TsFdwRelInfo *fpinfo_o, const TsFd Assert(fpinfo_i == NULL); /* - * Copy the server specific FDW options. (For a join, both relations come + * Copy the server specific FDW options. (For a join, both relations come * from the same server, so the server options should have the same value * for both relations.) */ diff --git a/tsl/src/remote/async.c b/tsl/src/remote/async.c index 9eea558c6..2c7290f2e 100644 --- a/tsl/src/remote/async.c +++ b/tsl/src/remote/async.c @@ -21,6 +21,8 @@ #else #include #endif + +#include #include "async.h" #include "connection.h" #include "utils.h" @@ -175,11 +177,11 @@ async_request_send_internal(AsyncRequest *req, int elevel) * the prepared statements we use in this module are simple enough that * the data node will make the right choices. */ - if (!PQsendPrepare(remote_connection_get_pg_conn(req->conn), - req->stmt_name, - req->sql, - req->prep_stmt_params, - NULL)) + if (0 == PQsendPrepare(remote_connection_get_pg_conn(req->conn), + req->stmt_name, + req->sql, + req->prep_stmt_params, + NULL)) { /* * null is fine to pass down as the res, the connection error message @@ -561,38 +563,44 @@ get_single_response_nonblocking(AsyncRequestSet *set) AsyncRequest *req = lfirst(lc); PGconn *pg_conn = remote_connection_get_pg_conn(req->conn); - if (remote_connection_is_processing(req->conn) && req->state == DEFERRED) - return async_response_error_create("request already in process"); - - if (req->state == DEFERRED) + switch (req->state) { - req = async_request_send_internal(req, WARNING); + case DEFERRED: + if (remote_connection_is_processing(req->conn)) + return async_response_error_create("request already in progress"); - if (req == NULL) - return async_response_error_create("failed to send deferred request"); - } + req = async_request_send_internal(req, WARNING); - if (req->state != EXECUTING) - return async_response_error_create("no request currently executing"); + if (req == NULL) + return async_response_error_create("failed to send deferred request"); - if (!PQisBusy(pg_conn)) - { - PGresult *res = PQgetResult(pg_conn); + Assert(req->state == EXECUTING); + TS_FALLTHROUGH; + case EXECUTING: + if (0 == PQisBusy(pg_conn)) + { + PGresult *res = PQgetResult(pg_conn); - if (NULL == res) - { - /* - * NULL return means query is complete - */ - set->requests = list_delete_ptr(set->requests, req); - remote_connection_set_processing(req->conn, false); - async_request_set_state(req, COMPLETED); - /* set changed so rerun function */ - return get_single_response_nonblocking(set); - } - return &async_response_result_create(req, res)->base; + if (NULL == res) + { + /* + * NULL return means query is complete + */ + set->requests = list_delete_ptr(set->requests, req); + remote_connection_set_processing(req->conn, false); + async_request_set_state(req, COMPLETED); + + /* set changed so rerun function */ + return get_single_response_nonblocking(set); + } + return &async_response_result_create(req, res)->base; + } + break; + case COMPLETED: + return async_response_error_create("request already completed"); } } + return NULL; } diff --git a/tsl/src/remote/cursor_fetcher.c b/tsl/src/remote/cursor_fetcher.c index 25902eb46..5ec162e61 100644 --- a/tsl/src/remote/cursor_fetcher.c +++ b/tsl/src/remote/cursor_fetcher.c @@ -22,6 +22,20 @@ /* * Cursor for fetching data from a data node. + * + * The cursor fetcher splits the query result into multiple fetches, which + * allows multiplexing on-going sub-queries on the same connection without + * having to fetch the full result for each sub-query in one go (thus not + * over-running memory). + * + * When a query consists of multiple subqueries that fetch data from the same + * data nodes, and the sub-queries are joined using, e.g., a nested loop, then + * a CURSOR is necessary to run the two sub-queries over the same connection. + * + * The downside of using a CURSOR, however, is that the plan on the remote + * node cannot execute in parallel. + * + * https://www.postgresql.org/docs/current/when-can-parallel-query-be-used.html */ typedef struct CursorFetcher { @@ -31,8 +45,7 @@ typedef struct CursorFetcher AsyncRequest *create_req; /* a request to create cursor */ } CursorFetcher; -static void cursor_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); -static void cursor_fetcher_fetch_data_start(DataFetcher *df); +static void cursor_fetcher_send_fetch_request(DataFetcher *df); static int cursor_fetcher_fetch_data(DataFetcher *df); static void cursor_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); static void cursor_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx); @@ -42,7 +55,7 @@ static void cursor_fetcher_rewind(DataFetcher *df); static void cursor_fetcher_close(DataFetcher *df); static DataFetcherFuncs funcs = { - .fetch_data_start = cursor_fetcher_fetch_data_start, + .send_fetch_request = cursor_fetcher_send_fetch_request, .fetch_data = cursor_fetcher_fetch_data, .set_fetch_size = cursor_fetcher_set_fetch_size, .set_tuple_mctx = cursor_fetcher_set_tuple_memcontext, @@ -119,12 +132,11 @@ cursor_fetcher_wait_until_open(DataFetcher *df) static CursorFetcher * remote_cursor_init_with_params(TSConnection *conn, Relation rel, TupleDesc tupdesc, ScanState *ss, - List *retrieved_attrs, const char *stmt, StmtParams *params, - FetchMode mode) + List *retrieved_attrs, const char *stmt, StmtParams *params) { CursorFetcher *cursor = palloc0(sizeof(CursorFetcher)); - data_fetcher_init(&cursor->state, conn, stmt, params, rel, ss, retrieved_attrs, mode); + data_fetcher_init(&cursor->state, conn, stmt, params, rel, ss, retrieved_attrs); cursor->state.type = CursorFetcherType; /* Assign a unique ID for my cursor */ cursor->id = remote_connection_get_cursor_number(); @@ -151,14 +163,13 @@ cursor_fetcher_create_for_rel(TSConnection *conn, Relation rel, List *retrieved_ NULL, retrieved_attrs, stmt, - params, - false); + params); return &cursor->state; } DataFetcher * cursor_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, - const char *stmt, StmtParams *params, FetchMode mode) + const char *stmt, StmtParams *params) { Scan *scan = (Scan *) ss->ps.plan; TupleDesc tupdesc; @@ -182,8 +193,7 @@ cursor_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieve tupdesc = ss->ss_ScanTupleSlot->tts_tupleDescriptor; } - cursor = - remote_cursor_init_with_params(conn, rel, tupdesc, ss, retrieved_attrs, stmt, params, mode); + cursor = remote_cursor_init_with_params(conn, rel, tupdesc, ss, retrieved_attrs, stmt, params); return &cursor->state; } @@ -212,7 +222,7 @@ cursor_fetcher_set_tuple_memcontext(DataFetcher *df, MemoryContext mctx) * Send async req to fetch data from cursor. */ static void -cursor_fetcher_fetch_data_start(DataFetcher *df) +cursor_fetcher_send_fetch_request(DataFetcher *df) { AsyncRequest *volatile req = NULL; MemoryContext oldcontext; @@ -313,7 +323,11 @@ cursor_fetcher_fetch_data_complete(CursorFetcher *cursor) tuplefactory_reset_mctx(cursor->state.tf); MemoryContextSwitchTo(cursor->state.batch_mctx); - /* Update fetch_ct_2 */ + /* Update batch count to indicate we are no longer in the first + * batch. When we are on the second batch or greater, a rewind of the + * cursor needs to refetch the first batch. If we are still in the + * first batch, however, a rewind can be done by simply resetting the + * tuple index to 0 within the batch. */ if (cursor->state.batch_count < 2) cursor->state.batch_count++; @@ -343,8 +357,6 @@ cursor_fetcher_fetch_data_complete(CursorFetcher *cursor) MemoryContextSwitchTo(oldcontext); - data_fetcher_request_data_async(&cursor->state); - return numrows; } @@ -353,11 +365,14 @@ cursor_fetcher_fetch_data(DataFetcher *df) { CursorFetcher *cursor = cast_fetcher(CursorFetcher, df); + if (cursor->state.eof) + return 0; + if (!cursor->state.open) cursor_fetcher_wait_until_open(df); if (cursor->state.data_req == NULL) - cursor_fetcher_fetch_data_start(df); + cursor_fetcher_send_fetch_request(df); return cursor_fetcher_fetch_data_complete(cursor); } diff --git a/tsl/src/remote/cursor_fetcher.h b/tsl/src/remote/cursor_fetcher.h index bca77d1c5..8cccad217 100644 --- a/tsl/src/remote/cursor_fetcher.h +++ b/tsl/src/remote/cursor_fetcher.h @@ -15,6 +15,6 @@ extern DataFetcher *cursor_fetcher_create_for_rel(TSConnection *conn, Relation r StmtParams *params); extern DataFetcher *cursor_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, const char *stmt, - StmtParams *params, FetchMode mode); + StmtParams *params); #endif /* TIMESCALEDB_TSL_CURSOR_FETCHER_H */ diff --git a/tsl/src/remote/data_fetcher.c b/tsl/src/remote/data_fetcher.c index 1a2f86e7c..f2a171038 100644 --- a/tsl/src/remote/data_fetcher.c +++ b/tsl/src/remote/data_fetcher.c @@ -23,19 +23,19 @@ data_fetcher_create_for_rel(TSConnection *conn, Relation rel, List *retrieved_at DataFetcher * data_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, - const char *stmt, StmtParams *params, FetchMode mode) + const char *stmt, StmtParams *params) { if (ts_guc_remote_data_fetcher == CursorFetcherType) - return cursor_fetcher_create_for_scan(conn, ss, retrieved_attrs, stmt, params, mode); + return cursor_fetcher_create_for_scan(conn, ss, retrieved_attrs, stmt, params); else - return row_by_row_fetcher_create_for_scan(conn, ss, retrieved_attrs, stmt, params, mode); + return row_by_row_fetcher_create_for_scan(conn, ss, retrieved_attrs, stmt, params); } #define DEFAULT_FETCH_SIZE 100 void data_fetcher_init(DataFetcher *df, TSConnection *conn, const char *stmt, StmtParams *params, - Relation rel, ScanState *ss, List *retrieved_attrs, FetchMode mode) + Relation rel, ScanState *ss, List *retrieved_attrs) { Assert(df != NULL); Assert(stmt != NULL); @@ -57,7 +57,6 @@ data_fetcher_init(DataFetcher *df, TSConnection *conn, const char *stmt, StmtPar df->req_mctx = AllocSetContextCreate(CurrentMemoryContext, "async req/resp", ALLOCSET_DEFAULT_SIZES); df->fetch_size = DEFAULT_FETCH_SIZE; - df->mode = mode; } void @@ -118,14 +117,6 @@ data_fetcher_set_tuple_mctx(DataFetcher *df, MemoryContext mctx) df->tuple_mctx = mctx; } -void -data_fetcher_request_data_async(DataFetcher *df) -{ - /* for async fetcher we try requesting next batch */ - if (df->mode == FETCH_ASYNC && !df->eof) - df->funcs->fetch_data_start(df); -} - void data_fetcher_reset(DataFetcher *df) { diff --git a/tsl/src/remote/data_fetcher.h b/tsl/src/remote/data_fetcher.h index 746fcbbfa..d3cb4c556 100644 --- a/tsl/src/remote/data_fetcher.h +++ b/tsl/src/remote/data_fetcher.h @@ -16,20 +16,18 @@ #include "guc.h" #include "tuplefactory.h" -typedef enum FetchMode -{ - FETCH_ASYNC, - FETCH_NOASYNC -} FetchMode; - typedef struct DataFetcher DataFetcher; typedef struct DataFetcherFuncs { - void (*fetch_data_start)(DataFetcher *data_fetcher); + /* Send a request for new data. This doesn't read the data itself */ + void (*send_fetch_request)(DataFetcher *data_fetcher); + /* Read data in response to a fetch request. If no request has been sent, + * send it first. */ + int (*fetch_data)(DataFetcher *data_fetcher); + /* Set the fetch (batch) size */ void (*set_fetch_size)(DataFetcher *data_fetcher, int fetch_size); void (*set_tuple_mctx)(DataFetcher *data_fetcher, MemoryContext mctx); - int (*fetch_data)(DataFetcher *data_fetcher); HeapTuple (*get_next_tuple)(DataFetcher *data_fetcher); HeapTuple (*get_tuple)(DataFetcher *data_fetcher, int row); void (*rewind)(DataFetcher *data_fetcher); @@ -60,7 +58,6 @@ typedef struct DataFetcher bool open; bool eof; - FetchMode mode; AsyncRequest *data_req; /* a request to fetch data */ } DataFetcher; @@ -69,19 +66,18 @@ extern DataFetcher *data_fetcher_create_for_rel(TSConnection *conn, Relation rel StmtParams *params); extern DataFetcher *data_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, const char *stmt, - StmtParams *params, FetchMode mode); + StmtParams *params); void data_fetcher_free(DataFetcher *df); extern void data_fetcher_init(DataFetcher *df, TSConnection *conn, const char *stmt, StmtParams *params, Relation rel, ScanState *ss, - List *retrieved_attrs, FetchMode mode); + List *retrieved_attrs); extern HeapTuple data_fetcher_get_tuple(DataFetcher *df, int row); extern HeapTuple data_fetcher_get_next_tuple(DataFetcher *df); extern void data_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); extern void data_fetcher_set_tuple_mctx(DataFetcher *df, MemoryContext mctx); extern void data_fetcher_validate(DataFetcher *df); -extern void data_fetcher_request_data_async(DataFetcher *df); extern void data_fetcher_reset(DataFetcher *df); #ifdef USE_ASSERT_CHECKING diff --git a/tsl/src/remote/row_by_row_fetcher.c b/tsl/src/remote/row_by_row_fetcher.c index e4861ccd1..c83d482d1 100644 --- a/tsl/src/remote/row_by_row_fetcher.c +++ b/tsl/src/remote/row_by_row_fetcher.c @@ -15,7 +15,7 @@ typedef struct RowByRowFetcher DataFetcher state; } RowByRowFetcher; -static void row_by_row_fetcher_start(DataFetcher *df); +static void row_by_row_fetcher_send_fetch_request(DataFetcher *df); static void row_by_row_fetcher_reset(RowByRowFetcher *fetcher); static int row_by_row_fetcher_fetch_data(DataFetcher *df); static void row_by_row_fetcher_set_fetch_size(DataFetcher *df, int fetch_size); @@ -26,7 +26,7 @@ static void row_by_row_fetcher_rescan(DataFetcher *df); static void row_by_row_fetcher_close(DataFetcher *df); static DataFetcherFuncs funcs = { - .fetch_data_start = row_by_row_fetcher_start, + .send_fetch_request = row_by_row_fetcher_send_fetch_request, .fetch_data = row_by_row_fetcher_fetch_data, .set_fetch_size = row_by_row_fetcher_set_fetch_size, .set_tuple_mctx = row_by_row_fetcher_set_tuple_memcontext, @@ -38,17 +38,14 @@ static DataFetcherFuncs funcs = { static RowByRowFetcher * create_row_by_row_fetcher(TSConnection *conn, const char *stmt, StmtParams *params, Relation rel, - ScanState *ss, List *retrieved_attrs, FetchMode mode) + ScanState *ss, List *retrieved_attrs) { RowByRowFetcher *fetcher = palloc0(sizeof(RowByRowFetcher)); - data_fetcher_init(&fetcher->state, conn, stmt, params, rel, ss, retrieved_attrs, mode); + data_fetcher_init(&fetcher->state, conn, stmt, params, rel, ss, retrieved_attrs); fetcher->state.type = RowByRowFetcherType; fetcher->state.funcs = &funcs; - if (fetcher->state.mode == FETCH_ASYNC) - row_by_row_fetcher_start(&fetcher->state); - return fetcher; } @@ -74,7 +71,7 @@ row_by_row_fetcher_reset(RowByRowFetcher *fetcher) } static void -row_by_row_fetcher_start(DataFetcher *df) +row_by_row_fetcher_send_fetch_request(DataFetcher *df) { AsyncRequest *volatile req = NULL; MemoryContext oldcontext; @@ -103,8 +100,16 @@ row_by_row_fetcher_start(DataFetcher *df) FORMAT_BINARY : FORMAT_TEXT); Assert(NULL != req); + if (!async_request_set_single_row_mode(req)) - elog(ERROR, "failed to set single row mode for %s", fetcher->state.stmt); + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not set single-row mode on connection to \"%s\"", + remote_connection_node_name(fetcher->state.conn)), + errdetail("The aborted statement is: %s.", fetcher->state.stmt), + errhint( + "Row-by-row fetching of data is not supported together with sub-queries." + " Use cursor fetcher instead."))); fetcher->state.data_req = req; fetcher->state.open = true; @@ -137,6 +142,7 @@ row_by_row_fetcher_complete(RowByRowFetcher *fetcher) data_fetcher_validate(&fetcher->state); async_request_set_add(fetch_req_wrapper, fetcher->state.data_req); + /* * We'll store the tuples in the batch_mctx. First, flush the previous * batch. @@ -237,8 +243,11 @@ row_by_row_fetcher_fetch_data(DataFetcher *df) { RowByRowFetcher *fetcher = cast_fetcher(RowByRowFetcher, df); + if (fetcher->state.eof) + return 0; + if (!fetcher->state.open) - row_by_row_fetcher_start(df); + row_by_row_fetcher_send_fetch_request(df); return row_by_row_fetcher_complete(fetcher); } @@ -266,19 +275,19 @@ row_by_row_fetcher_create_for_rel(TSConnection *conn, Relation rel, List *retrie RowByRowFetcher *fetcher; Assert(rel != NULL); - fetcher = create_row_by_row_fetcher(conn, stmt, params, rel, NULL, retrieved_attrs, false); + fetcher = create_row_by_row_fetcher(conn, stmt, params, rel, NULL, retrieved_attrs); return &fetcher->state; } DataFetcher * row_by_row_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, - const char *stmt, StmtParams *params, FetchMode mode) + const char *stmt, StmtParams *params) { RowByRowFetcher *fetcher; Assert(ss != NULL); - fetcher = create_row_by_row_fetcher(conn, stmt, params, NULL, ss, retrieved_attrs, mode); + fetcher = create_row_by_row_fetcher(conn, stmt, params, NULL, ss, retrieved_attrs); return &fetcher->state; } diff --git a/tsl/src/remote/row_by_row_fetcher.h b/tsl/src/remote/row_by_row_fetcher.h index ef1625ca2..d362f2bda 100644 --- a/tsl/src/remote/row_by_row_fetcher.h +++ b/tsl/src/remote/row_by_row_fetcher.h @@ -15,6 +15,6 @@ extern DataFetcher *row_by_row_fetcher_create_for_rel(TSConnection *conn, Relati StmtParams *params); extern DataFetcher *row_by_row_fetcher_create_for_scan(TSConnection *conn, ScanState *ss, List *retrieved_attrs, const char *stmt, - StmtParams *params, FetchMode mode); + StmtParams *params); #endif /* TIMESCALEDB_TSL_ROW_BY_ROW_FETCHER_H */ diff --git a/tsl/test/expected/dist_query-11.out b/tsl/test/expected/dist_query-11.out index 29b69633f..b6db99c73 100644 --- a/tsl/test/expected/dist_query-11.out +++ b/tsl/test/expected/dist_query-11.out @@ -18,6 +18,10 @@ SELECT format('\! diff %s %s', :'TEST_RESULTS_UNOPTIMIZED', :'TEST_RESULTS_REFER format('\! diff %s %s', :'TEST_RESULTS_REPART_OPTIMIZED', :'TEST_RESULTS_REPART_REFERENCE') AS "DIFF_CMD_REPART", format('\! diff %s %s', :'TEST_RESULTS_1DIM', :'TEST_RESULTS_REPART_REFERENCE') AS "DIFF_CMD_1DIM" \gset +-- Use a small fetch size to make sure that result are fetched across +-- multiple fetches. +--ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD fetch_size '500'); +SET timescaledb.remote_data_fetcher = 'rowbyrow'; SET client_min_messages TO notice; -- Load the data \ir :TEST_LOAD_NAME @@ -1025,6 +1029,135 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + CTE top_n + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> CTE Scan on top_n + Output: top_n.device +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -1892,6 +2025,135 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + CTE top_n + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> CTE Scan on top_n + Output: top_n.device +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -2539,6 +2801,101 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND device = 1 + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND device = 1 +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + CTE top_n + -> Limit + Output: hyper_1.device, (avg(hyper_1.temp)) + -> Merge Append + Sort Key: (avg(hyper_1.temp)) DESC + -> Custom Scan (DataNodeScan) + Output: hyper_1.device, (avg(hyper_1.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) AND ((device = 1)) GROUP BY 1 ORDER BY avg(temp) DESC NULLS FIRST + -> Nested Loop + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")) + -> Custom Scan (DataNodeScan) on public.hyper + Output: hyper."time", hyper.device, hyper.temp, time_bucket('@ 1 min'::interval, hyper."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) AND ((device = 1)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST + -> CTE Scan on top_n + Output: top_n.device, top_n.avg + Filter: (top_n.device = 1) +(26 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -3420,6 +3777,135 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + CTE top_n + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> CTE Scan on top_n + Output: top_n.device +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -4309,6 +4795,136 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time >= '2019-01-01' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time >= '2019-01-01' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + CTE top_n + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Finalize HashAggregate + Output: device, avg(temp) + Group Key: device + -> Custom Scan (AsyncAppend) + Output: device, (PARTIAL avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (PARTIAL avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_8_chunk, _dist_hyper_1_12_chunk, _dist_hyper_1_17_chunk, _dist_hyper_1_1_chunk, _dist_hyper_1_15_chunk, _dist_hyper_1_4_chunk, _dist_hyper_1_13_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 7, 1, 6, 2, 5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (PARTIAL avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_14_chunk, _dist_hyper_1_11_chunk, _dist_hyper_1_18_chunk, _dist_hyper_1_2_chunk, _dist_hyper_1_5_chunk, _dist_hyper_1_9_chunk, _dist_hyper_1_7_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[6, 5, 7, 1, 2, 4, 3]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper_6.device, (PARTIAL avg(hyper_6.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_3 + Chunks: _dist_hyper_1_10_chunk, _dist_hyper_1_16_chunk, _dist_hyper_1_6_chunk, _dist_hyper_1_3_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 2, 1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_8_chunk, _dist_hyper_1_12_chunk, _dist_hyper_1_17_chunk, _dist_hyper_1_1_chunk, _dist_hyper_1_15_chunk, _dist_hyper_1_4_chunk, _dist_hyper_1_13_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 7, 1, 6, 2, 5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_14_chunk, _dist_hyper_1_11_chunk, _dist_hyper_1_18_chunk, _dist_hyper_1_2_chunk, _dist_hyper_1_5_chunk, _dist_hyper_1_9_chunk, _dist_hyper_1_7_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[6, 5, 7, 1, 2, 4, 3]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_10_chunk, _dist_hyper_1_16_chunk, _dist_hyper_1_6_chunk, _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 2, 1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> CTE Scan on top_n + Output: top_n.device +(61 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper1d %%% PREFIX: EXPLAIN (verbose, costs off) @@ -5188,6 +5804,136 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper1d + WHERE time >= '2019-01-01' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper1d INNER JOIN top_n USING (device) +WHERE time >= '2019-01-01' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device, avg(hyper1d.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device + CTE top_n + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Finalize HashAggregate + Output: device, avg(temp) + Group Key: device + -> Custom Scan (AsyncAppend) + Output: device, (PARTIAL avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper1d_4.device, (PARTIAL avg(hyper1d_4.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper1d_5.device, (PARTIAL avg(hyper1d_5.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_2 + Chunks: _dist_hyper_2_20_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper1d_6.device, (PARTIAL avg(hyper1d_6.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_3 + Chunks: _dist_hyper_2_21_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device, hyper1d.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper1d."time"), hyper1d.device, hyper1d.temp + Hash Cond: (hyper1d.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper1d."time", hyper1d.device, hyper1d.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_1 + Output: hyper1d_1."time", hyper1d_1.device, hyper1d_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_2 + Output: hyper1d_2."time", hyper1d_2.device, hyper1d_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_2_20_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_3 + Output: hyper1d_3."time", hyper1d_3.device, hyper1d_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_2_21_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> CTE Scan on top_n + Output: top_n.device +(61 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h2."time")), h2.device + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(34 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: reference %%% PREFIX: diff --git a/tsl/test/expected/dist_query-12.out b/tsl/test/expected/dist_query-12.out index dcb4be4e8..9fae2ab77 100644 --- a/tsl/test/expected/dist_query-12.out +++ b/tsl/test/expected/dist_query-12.out @@ -18,6 +18,10 @@ SELECT format('\! diff %s %s', :'TEST_RESULTS_UNOPTIMIZED', :'TEST_RESULTS_REFER format('\! diff %s %s', :'TEST_RESULTS_REPART_OPTIMIZED', :'TEST_RESULTS_REPART_REFERENCE') AS "DIFF_CMD_REPART", format('\! diff %s %s', :'TEST_RESULTS_1DIM', :'TEST_RESULTS_REPART_REFERENCE') AS "DIFF_CMD_1DIM" \gset +-- Use a small fetch size to make sure that result are fetched across +-- multiple fetches. +--ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD fetch_size '500'); +SET timescaledb.remote_data_fetcher = 'rowbyrow'; SET client_min_messages TO notice; -- Load the data \ir :TEST_LOAD_NAME @@ -1025,6 +1029,133 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Inner Unique: true + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -1892,6 +2023,133 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Inner Unique: true + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -2511,6 +2769,96 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND device = 1 + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND device = 1 +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Nested Loop + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + -> Custom Scan (DataNodeScan) on public.hyper + Output: hyper."time", hyper.device, hyper.temp, time_bucket('@ 1 min'::interval, hyper."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) AND ((device = 1)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST + -> Materialize + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + Filter: (top_n.device = 1) + -> Limit + Output: hyper_1.device, (avg(hyper_1.temp)) + -> Custom Scan (DataNodeScan) + Output: hyper_1.device, (avg(hyper_1.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) AND ((device = 1)) GROUP BY 1 ORDER BY avg(temp) DESC NULLS FIRST +(23 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -3392,6 +3740,133 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Inner Unique: true + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Custom Scan (AsyncAppend) + Output: device, (avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT device, avg(temp) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> GroupAggregate + Output: hyper_6.device, avg(hyper_6.temp) + Group Key: hyper_6.device + -> Custom Scan (DataNodeScan) on public.hyper hyper_6 + Output: hyper_6.device, hyper_6.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY device ASC NULLS LAST +(60 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper %%% PREFIX: EXPLAIN (verbose, costs off) @@ -4281,6 +4756,134 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper + WHERE time >= '2019-01-01' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper INNER JOIN top_n USING (device) +WHERE time >= '2019-01-01' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, avg(hyper.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device, hyper.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper."time")), hyper.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper."time"), hyper.device, hyper.temp + Inner Unique: true + Hash Cond: (hyper.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper."time", hyper.device, hyper.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper hyper_1 + Output: hyper_1."time", hyper_1.device, hyper_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_1_8_chunk, _dist_hyper_1_12_chunk, _dist_hyper_1_17_chunk, _dist_hyper_1_1_chunk, _dist_hyper_1_15_chunk, _dist_hyper_1_4_chunk, _dist_hyper_1_13_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 7, 1, 6, 2, 5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) + -> Custom Scan (DataNodeScan) on public.hyper hyper_2 + Output: hyper_2."time", hyper_2.device, hyper_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_1_14_chunk, _dist_hyper_1_11_chunk, _dist_hyper_1_18_chunk, _dist_hyper_1_2_chunk, _dist_hyper_1_5_chunk, _dist_hyper_1_9_chunk, _dist_hyper_1_7_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[6, 5, 7, 1, 2, 4, 3]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) + -> Custom Scan (DataNodeScan) on public.hyper hyper_3 + Output: hyper_3."time", hyper_3.device, hyper_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_1_10_chunk, _dist_hyper_1_16_chunk, _dist_hyper_1_6_chunk, _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 2, 1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Finalize HashAggregate + Output: device, avg(temp) + Group Key: device + -> Custom Scan (AsyncAppend) + Output: device, (PARTIAL avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper_4.device, (PARTIAL avg(hyper_4.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_1 + Chunks: _dist_hyper_1_8_chunk, _dist_hyper_1_12_chunk, _dist_hyper_1_17_chunk, _dist_hyper_1_1_chunk, _dist_hyper_1_15_chunk, _dist_hyper_1_4_chunk, _dist_hyper_1_13_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 7, 1, 6, 2, 5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper_5.device, (PARTIAL avg(hyper_5.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_2 + Chunks: _dist_hyper_1_14_chunk, _dist_hyper_1_11_chunk, _dist_hyper_1_18_chunk, _dist_hyper_1_2_chunk, _dist_hyper_1_5_chunk, _dist_hyper_1_9_chunk, _dist_hyper_1_7_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[6, 5, 7, 1, 2, 4, 3]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper_6.device, (PARTIAL avg(hyper_6.temp)) + Relations: Aggregate on (public.hyper) + Data node: data_node_3 + Chunks: _dist_hyper_1_10_chunk, _dist_hyper_1_16_chunk, _dist_hyper_1_6_chunk, _dist_hyper_1_3_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[3, 4, 2, 1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST +(61 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: hyper1d %%% PREFIX: EXPLAIN (verbose, costs off) @@ -5160,6 +5763,134 @@ LIMIT 10 Output: join_test.device (27 rows) + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +WITH top_n AS ( + SELECT device, avg(temp) + FROM hyper1d + WHERE time >= '2019-01-01' + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM hyper1d INNER JOIN top_n USING (device) +WHERE time >= '2019-01-01' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device, avg(hyper1d.temp) + Group Key: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device + -> Sort + Output: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device, hyper1d.temp + Sort Key: (time_bucket('@ 1 min'::interval, hyper1d."time")), hyper1d.device + -> Hash Join + Output: time_bucket('@ 1 min'::interval, hyper1d."time"), hyper1d.device, hyper1d.temp + Inner Unique: true + Hash Cond: (hyper1d.device = top_n.device) + -> Custom Scan (AsyncAppend) + Output: hyper1d."time", hyper1d.device, hyper1d.temp + -> Append + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_1 + Output: hyper1d_1."time", hyper1d_1.device, hyper1d_1.temp + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_2 + Output: hyper1d_2."time", hyper1d_2.device, hyper1d_2.temp + Data node: data_node_2 + Chunks: _dist_hyper_2_20_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) + -> Custom Scan (DataNodeScan) on public.hyper1d hyper1d_3 + Output: hyper1d_3."time", hyper1d_3.device, hyper1d_3.temp + Data node: data_node_3 + Chunks: _dist_hyper_2_21_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Hash + Output: top_n.device + -> Subquery Scan on top_n + Output: top_n.device + -> Limit + Output: device, (avg(temp)) + -> Sort + Output: device, (avg(temp)) + Sort Key: (avg(temp)) DESC + -> Finalize HashAggregate + Output: device, avg(temp) + Group Key: device + -> Custom Scan (AsyncAppend) + Output: device, (PARTIAL avg(temp)) + -> Append + -> Custom Scan (DataNodeScan) + Output: hyper1d_4.device, (PARTIAL avg(hyper1d_4.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 ORDER BY device ASC NULLS LAST + -> Custom Scan (DataNodeScan) + Output: hyper1d_5.device, (PARTIAL avg(hyper1d_5.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_2 + Chunks: _dist_hyper_2_20_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 + -> Custom Scan (DataNodeScan) + Output: hyper1d_6.device, (PARTIAL avg(hyper1d_6.temp)) + Relations: Aggregate on (public.hyper1d) + Data node: data_node_3 + Chunks: _dist_hyper_2_21_chunk + Remote SQL: SELECT device, _timescaledb_internal.partialize_agg(avg(temp)) FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[5]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) GROUP BY 1 +(61 rows) + + +######### CTEs/Sub-queries + +EXPLAIN (verbose, costs off) +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 + QUERY PLAN +----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + GroupAggregate + Output: (time_bucket('@ 1 min'::interval, h1."time")), h1.device, avg(h1.temp), max(h2.temp) + Group Key: time_bucket('@ 1 min'::interval, h1."time"), h1.device + -> Merge Join + Output: time_bucket('@ 1 min'::interval, h1."time"), h1.device, h1.temp, h2.temp + Merge Cond: ((time_bucket('@ 1 min'::interval, h1."time") = (time_bucket('@ 1 min'::interval, h2."time"))) AND (h1.device = h2.device)) + -> Custom Scan (AsyncAppend) + Output: h1."time", h1.device, h1.temp + -> Merge Append + Sort Key: (time_bucket('@ 1 min'::interval, h1_1."time")), h1_1.device + -> Custom Scan (DataNodeScan) on public.hyper h1_1 + Output: h1_1."time", h1_1.device, h1_1.temp, time_bucket('@ 1 min'::interval, h1_1."time") + Data node: data_node_1 + Chunks: _dist_hyper_1_1_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_2 + Output: h1_2."time", h1_2.device, h1_2.temp, time_bucket('@ 1 min'::interval, h1_2."time") + Data node: data_node_2 + Chunks: _dist_hyper_1_2_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Custom Scan (DataNodeScan) on public.hyper h1_3 + Output: h1_3."time", h1_3.device, h1_3.temp, time_bucket('@ 1 min'::interval, h1_3."time") + Data node: data_node_3 + Chunks: _dist_hyper_1_3_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper WHERE _timescaledb_internal.chunks_in(public.hyper.*, ARRAY[1]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST + -> Materialize + Output: h2.temp, h2."time", h2.device, (time_bucket('@ 1 min'::interval, h2."time")) + -> Custom Scan (DataNodeScan) on public.hyper1d h2 + Output: h2.temp, h2."time", h2.device, time_bucket('@ 1 min'::interval, h2."time") + Data node: data_node_1 + Chunks: _dist_hyper_2_19_chunk + Remote SQL: SELECT "time", device, temp FROM public.hyper1d WHERE _timescaledb_internal.chunks_in(public.hyper1d.*, ARRAY[8]) AND (("time" >= '2019-01-01 00:00:00-08'::timestamp with time zone)) AND (("time" <= '2019-01-01 15:00:00-08'::timestamp with time zone)) ORDER BY public.time_bucket('00:01:00'::interval, "time") ASC NULLS LAST, device ASC NULLS LAST +(32 rows) + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% RUNNING TESTS on table: reference %%% PREFIX: diff --git a/tsl/test/sql/data_fetcher.sql b/tsl/test/sql/data_fetcher.sql index 1aaeb77cc..21e895d7f 100644 --- a/tsl/test/sql/data_fetcher.sql +++ b/tsl/test/sql/data_fetcher.sql @@ -19,12 +19,20 @@ SET client_min_messages TO warning; \set ECHO errors SET client_min_messages TO error; +-- Set a smaller fetch size to ensure that the result is split into +-- mutliple batches. +ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD fetch_size '100'); + -- run the queries using row by row fetcher +SET timescaledb.remote_data_fetcher = 'rowbyrow'; +\set ON_ERROR_STOP 0 \o :TEST_RESULTS_ROW_BY_ROW \ir :TEST_QUERY_NAME \o +\set ON_ERROR_STOP 1 + -- run queries using cursor fetcher -SET timescaledb.remote_data_fetcher = cursor; +SET timescaledb.remote_data_fetcher = 'cursor'; \o :TEST_RESULTS_CURSOR \ir :TEST_QUERY_NAME \o diff --git a/tsl/test/sql/dist_query.sql.in b/tsl/test/sql/dist_query.sql.in index 31928dbff..16a050a8b 100644 --- a/tsl/test/sql/dist_query.sql.in +++ b/tsl/test/sql/dist_query.sql.in @@ -21,6 +21,11 @@ SELECT format('\! diff %s %s', :'TEST_RESULTS_UNOPTIMIZED', :'TEST_RESULTS_REFER format('\! diff %s %s', :'TEST_RESULTS_1DIM', :'TEST_RESULTS_REPART_REFERENCE') AS "DIFF_CMD_1DIM" \gset + +-- Use a small fetch size to make sure that result are fetched across +-- multiple fetches. +--ALTER FOREIGN DATA WRAPPER timescaledb_fdw OPTIONS (ADD fetch_size '500'); +SET timescaledb.remote_data_fetcher = 'rowbyrow'; SET client_min_messages TO notice; -- Load the data diff --git a/tsl/test/sql/include/dist_query_run.sql b/tsl/test/sql/include/dist_query_run.sql index 2338e270e..19ce32e79 100644 --- a/tsl/test/sql/include/dist_query_run.sql +++ b/tsl/test/sql/include/dist_query_run.sql @@ -287,3 +287,39 @@ WHERE t.device = join_test.device LIMIT 10; DROP TABLE join_test; + +----------------------------------------------------------------- +-- Test CTE / sub-queries. Data from two sub-queries on the same data +-- node is joined on the access node. +----------------------------------------------------------------- +\set TEST_DESC '\n######### CTEs/Sub-queries\n' + +-- CTE / subquery +\qecho :TEST_DESC +:PREFIX +WITH top_n AS ( + SELECT device, avg(temp) + FROM :TABLE_NAME + WHERE :WHERE_CLAUSE + GROUP BY 1 + ORDER BY 2 DESC + LIMIT 10 +) +SELECT time_bucket('60s', time) AS "time", device, avg(temp) +FROM :TABLE_NAME INNER JOIN top_n USING (device) +WHERE :WHERE_CLAUSE +GROUP BY 1,2 +ORDER BY 1,2 +:OUTPUT_CMD + +-- Join between two distributed hypertables +\qecho :TEST_DESC +:PREFIX +SELECT time_bucket('60s', h1.time) AS "time", h1.device, avg(h1.temp), max(h2.temp) +FROM hyper h1 INNER JOIN hyper1d h2 ON (time_bucket('60', h1.time) = time_bucket('60', h2.time) AND h1.device = h2.device) +WHERE h1.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' AND + h2.time BETWEEN '2019-01-01' AND '2019-01-01 15:00' +GROUP BY 1,2 +ORDER BY 1,2 +:OUTPUT_CMD +