diff --git a/src/guc.c b/src/guc.c index f67ad67bc..16821a5a4 100644 --- a/src/guc.c +++ b/src/guc.c @@ -56,6 +56,7 @@ char *ts_last_tune_version = NULL; char *ts_telemetry_cloud = NULL; TSDLLEXPORT bool ts_guc_enable_2pc; TSDLLEXPORT int ts_guc_max_insert_batch_size = 1000; +TSDLLEXPORT bool ts_guc_enable_connection_binary_data; #ifdef TS_DEBUG bool ts_shutdown_bgw = false; @@ -235,6 +236,17 @@ _guc_init(void) NULL, NULL); + DefineCustomBoolVariable("timescaledb.enable_connection_binary_data", + "Enable binary format for connection", + "Enable binary format for data exchanged between nodes in the cluster", + &ts_guc_enable_connection_binary_data, + true, + PGC_USERSET, + 0, + NULL, + NULL, + NULL); + DefineCustomIntVariable("timescaledb.max_open_chunks_per_insert", "Maximum open chunks per insert", "Maximum number of open chunk tables per insert", diff --git a/src/guc.h b/src/guc.h index f4eb00dc6..14071f291 100644 --- a/src/guc.h +++ b/src/guc.h @@ -32,6 +32,7 @@ extern char *ts_last_tune_version; extern char *ts_telemetry_cloud; extern TSDLLEXPORT bool ts_guc_enable_2pc; extern TSDLLEXPORT int ts_guc_max_insert_batch_size; +extern TSDLLEXPORT bool ts_guc_enable_connection_binary_data; #ifdef TS_DEBUG extern bool ts_shutdown_bgw; diff --git a/tsl/src/fdw/timescaledb_fdw.c b/tsl/src/fdw/timescaledb_fdw.c index d6960a19f..e4f722fc2 100644 --- a/tsl/src/fdw/timescaledb_fdw.c +++ b/tsl/src/fdw/timescaledb_fdw.c @@ -55,6 +55,7 @@ #include #include #include +#include #include #include @@ -190,16 +191,14 @@ typedef struct TsFdwModifyState bool has_returning; /* is there a RETURNING clause? */ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ - /* info about parameters for prepared statement */ AttrNumber ctid_attno; /* attnum of input resjunk ctid column */ - int p_nums; /* number of parameters to transmit */ - FmgrInfo *p_flinfo; /* output conversion functions for them */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ bool prepared; int num_servers; + StmtParams *stmt_params; /* prepared statement paremeters */ TsFdwServerState servers[FLEXIBLE_ARRAY_MEMBER]; } TsFdwModifyState; @@ -1436,9 +1435,6 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm TsFdwModifyState *fmstate; Relation rel = rri->ri_RelationDesc; TupleDesc tupdesc = RelationGetDescr(rel); - AttrNumber n_params; - Oid typefnoid; - bool isvarlena; ListCell *lc; int i = 0; int num_servers = usermappings == NIL ? 1 : list_length(usermappings); @@ -1500,11 +1496,6 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm if (fmstate->has_returning) fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); - /* Prepare for output conversion of parameters used in prepared stmt. */ - n_params = list_length(fmstate->target_attrs) + 1; - fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); - fmstate->p_nums = 0; - if (operation == CMD_UPDATE || operation == CMD_DELETE) { Assert(subplan != NULL); @@ -1513,30 +1504,12 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm fmstate->ctid_attno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid"); if (!AttributeNumberIsValid(fmstate->ctid_attno)) elog(ERROR, "could not find junk ctid column"); - - /* First transmittable parameter will be ctid */ - getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); - fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); - fmstate->p_nums++; } - if (operation == CMD_INSERT || operation == CMD_UPDATE) - { - /* Set up for remaining transmittable parameters */ - foreach (lc, fmstate->target_attrs) - { - int attnum = lfirst_int(lc); - Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); - - Assert(!attr->attisdropped); - - getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); - fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); - fmstate->p_nums++; - } - } - - Assert(fmstate->p_nums <= n_params); + fmstate->stmt_params = stmt_params_create(fmstate->target_attrs, + operation == CMD_UPDATE || operation == CMD_DELETE, + tupdesc, + 1); return fmstate; } @@ -2350,66 +2323,6 @@ begin_foreign_modify(ModifyTableState *mtstate, ResultRelInfo *rri, List *fdw_pr rri->ri_FdwState = fmstate; } -/* - * convert_prep_stmt_params - * Create array of text strings representing parameter values - * - * tupleid is ctid to send, or NULL if none - * slot is slot to get remaining parameters from, or NULL if none - * - * Data is constructed in temp_cxt; caller should reset that after use. - */ -static const char ** -convert_prep_stmt_params(TsFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot) -{ - const char **p_values; - int pindex = 0; - MemoryContext oldcontext; - - oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); - - p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); - - /* 1st parameter should be ctid, if it's in use */ - if (tupleid != NULL) - { - /* don't need set_transmission_modes for TID output */ - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid)); - pindex++; - } - - /* get following parameters from slot */ - if (slot != NULL && fmstate->target_attrs != NIL) - { - int nestlevel; - ListCell *lc; - - nestlevel = set_transmission_modes(); - - foreach (lc, fmstate->target_attrs) - { - int attnum = lfirst_int(lc); - Datum value; - bool isnull; - - value = slot_getattr(slot, attnum, &isnull); - if (isnull) - p_values[pindex] = NULL; - else - p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], value); - pindex++; - } - - reset_transmission_modes(nestlevel); - } - - Assert(pindex == fmstate->p_nums); - - MemoryContextSwitchTo(oldcontext); - - return p_values; -} - /* * store_returning_result * Store the result of a RETURNING clause @@ -2450,7 +2363,9 @@ prepare_foreign_modify_server(TsFdwModifyState *fmstate, TsFdwServerState *fdw_s Assert(NULL == fdw_server->p_stmt); - req = async_request_send_prepare(fdw_server->conn, fmstate->query, fmstate->p_nums); + req = async_request_send_prepare(fdw_server->conn, + fmstate->query, + stmt_params_num_params(fmstate->stmt_params)); Assert(NULL != req); @@ -2485,24 +2400,24 @@ exec_foreign_insert(EState *estate, ResultRelInfo *rri, TupleTableSlot *slot, TupleTableSlot *planSlot) { TsFdwModifyState *fmstate = (TsFdwModifyState *) rri->ri_FdwState; + StmtParams *params = fmstate->stmt_params; AsyncRequestSet *reqset; AsyncResponseResult *rsp; - const char **p_values; int n_rows = -1; int i; - /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, NULL, slot); - if (!fmstate->prepared) prepare_foreign_modify(fmstate); reqset = async_request_set_create(); + stmt_params_convert_values(params, slot, NULL); + for (i = 0; i < fmstate->num_servers; i++) { TsFdwServerState *fdw_server = &fmstate->servers[i]; - AsyncRequest *req = async_request_send_prepared_stmt(fdw_server->p_stmt, p_values); + AsyncRequest *req = + async_request_send_prepared_stmt_with_params(fdw_server->p_stmt, params); Assert(NULL != req); @@ -2536,6 +2451,7 @@ exec_foreign_insert(EState *estate, ResultRelInfo *rri, TupleTableSlot *slot, /* And clean up */ async_response_result_close(rsp); + stmt_params_reset(params); } /* @@ -2566,11 +2482,11 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot TupleTableSlot *plan_slot, ModifyCommand cmd) { TsFdwModifyState *fmstate = (TsFdwModifyState *) rri->ri_FdwState; + StmtParams *params = fmstate->stmt_params; AsyncRequestSet *reqset; AsyncResponseResult *rsp; Datum datum; bool is_null; - const char **p_values; int n_rows = -1; int i; @@ -2585,16 +2501,16 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot if (is_null) elog(ERROR, "ctid is NULL"); - /* Convert parameters needed by prepared statement to text form */ - p_values = convert_prep_stmt_params(fmstate, - (ItemPointer) DatumGetPointer(datum), - (cmd == UPDATE_CMD ? slot : NULL)); + stmt_params_convert_values(params, + (cmd == UPDATE_CMD ? slot : NULL), + (ItemPointer) DatumGetPointer(datum)); reqset = async_request_set_create(); for (i = 0; i < fmstate->num_servers; i++) { TsFdwServerState *fdw_server = &fmstate->servers[i]; - AsyncRequest *req = async_request_send_prepared_stmt(fdw_server->p_stmt, p_values); + AsyncRequest *req = + async_request_send_prepared_stmt_with_params(fdw_server->p_stmt, params); Assert(NULL != req); @@ -2639,6 +2555,7 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot * inserts */ pfree(reqset); + stmt_params_reset(params); MemoryContextReset(fmstate->temp_cxt); @@ -2684,6 +2601,8 @@ finish_foreign_modify(TsFdwModifyState *fmstate) fdw_server->conn = NULL; } + + stmt_params_free(fmstate->stmt_params); } static void diff --git a/tsl/src/remote/CMakeLists.txt b/tsl/src/remote/CMakeLists.txt index 252e696cd..bc5abf9e7 100644 --- a/tsl/src/remote/CMakeLists.txt +++ b/tsl/src/remote/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/txn_store.c ${CMAKE_CURRENT_SOURCE_DIR}/txn_id.c ${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c + ${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c ) target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES}) target_include_directories(${TSL_LIBRARY_NAME} PRIVATE ${PG_INCLUDEDIR}) diff --git a/tsl/src/remote/async.c b/tsl/src/remote/async.c index 37ff9cacc..3750e8620 100644 --- a/tsl/src/remote/async.c +++ b/tsl/src/remote/async.c @@ -10,9 +10,21 @@ #include #include #include +#include +#include +#include +#include +#include "compat.h" +#if PG12_GE +#include +#else +#include +#endif #include "async.h" #include "connection.h" +#include "fdw/utils.h" +#include "utils.h" #define MAX_ASYNC_TIMEOUT_MS 60000 @@ -156,8 +168,9 @@ async_request_send_prepare(PGconn *conn, const char *sql, int n_params) return req; } -AsyncRequest * -async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_values) +static AsyncRequest * +async_request_send_prepared_stmt_with_formats(PreparedStmt *stmt, const char *const *param_values, + const int *param_lengths, const int *param_formats) { AsyncRequest *req = palloc0(sizeof(AsyncRequest)); @@ -171,8 +184,8 @@ async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_va stmt->stmt_name, stmt->n_params, param_values, - NULL, - NULL, + param_lengths, + param_formats, 0)) { /* @@ -185,6 +198,21 @@ async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_va return req; } +extern AsyncRequest * +async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *paramValues) +{ + return async_request_send_prepared_stmt_with_formats(stmt, paramValues, NULL, NULL); +} + +AsyncRequest * +async_request_send_prepared_stmt_with_params(PreparedStmt *stmt, StmtParams *params) +{ + return async_request_send_prepared_stmt_with_formats(stmt, + stmt_params_values(params), + stmt_params_lengths(params), + stmt_params_formats(params)); +} + /* Set user data. Often it is useful to attach data with a request so that it can later be fetched from the response. */ void diff --git a/tsl/src/remote/async.h b/tsl/src/remote/async.h index ecbaee0d8..9cead3927 100644 --- a/tsl/src/remote/async.h +++ b/tsl/src/remote/async.h @@ -10,6 +10,8 @@ #include #include +#include "stmt_params.h" + #define DEFAULT_TIMEOUT_MS (SECS_PER_HOUR * 1000) typedef struct AsyncRequest AsyncRequest; @@ -66,6 +68,8 @@ extern AsyncRequest *async_request_send_prepare(PGconn *conn, const char *sql_st int n_params); extern AsyncRequest *async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *paramValues); +extern AsyncRequest *async_request_send_prepared_stmt_with_params(PreparedStmt *stmt, + StmtParams *params); extern void async_request_attach_user_data(AsyncRequest *req, void *user_data); extern AsyncResponseResult *async_request_wait_ok_result(AsyncRequest *request); extern AsyncResponseResult *async_request_wait_any_result(AsyncRequest *request); diff --git a/tsl/src/remote/stmt_params.c b/tsl/src/remote/stmt_params.c new file mode 100644 index 000000000..2d151b7c3 --- /dev/null +++ b/tsl/src/remote/stmt_params.c @@ -0,0 +1,279 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "compat.h" +#if PG12_GE +#include +#else +#include +#endif + +#include "guc.h" +#include "fdw/utils.h" +#include "stmt_params.h" + +typedef struct StmtParams +{ + FmgrInfo *conv_funcs; + const char **values; + int *formats; + int *lengths; + int num_params; + int num_tuples; + int converted_tuples; + bool ctid; + List *target_attr_nums; + MemoryContext mctx; /* where we allocate param values */ + MemoryContext tmp_ctx; /* used for converting values */ +} StmtParams; + +/** + * Returns either type binary or text output function (text output is secondary) + **/ +static Oid +get_type_output_func(Oid type, bool *is_binary, bool force_text) +{ + HeapTuple type_tuple; + Form_pg_type pg_type; + Oid output_func; + + type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type)); + if (!HeapTupleIsValid(type_tuple)) + elog(ERROR, "cache lookup failed for type %u", type); + pg_type = (Form_pg_type) GETSTRUCT(type_tuple); + + if (!pg_type->typisdefined) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("type %s is only a shell", format_type_be(type)))); + if (OidIsValid(pg_type->typsend) && !force_text) + { + output_func = pg_type->typsend; + *is_binary = true; + } + else + { + output_func = pg_type->typoutput; + *is_binary = false; + } + ReleaseSysCache(type_tuple); + if (!OidIsValid(output_func)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_FUNCTION), + errmsg("no binary or text output function available for type %s", + format_type_be(type)))); + return output_func; +} + +/* + * ctid should be set to true if we're going to send it + * num_tuples is used for batching + * mctx memory context where we'll allocate StmtParams with all the values + */ +StmtParams * +stmt_params_create(List *target_attr_nums, bool ctid, TupleDesc tuple_desc, int num_tuples) +{ + StmtParams *params; + ListCell *lc; + Oid typefnoid; + bool isbinary; + int idx = 0; + int tup_cnt; + MemoryContext old; + MemoryContext new; + MemoryContext tmp_ctx; + + new = AllocSetContextCreate(CurrentMemoryContext, + "stmt params mem context", + ALLOCSET_DEFAULT_SIZES); + old = MemoryContextSwitchTo(new); + tmp_ctx = AllocSetContextCreate(new, "stmt params conversion", ALLOCSET_DEFAULT_SIZES); + + params = palloc(sizeof(StmtParams)); + + Assert(num_tuples > 0); + params->num_params = ctid ? list_length(target_attr_nums) + 1 : list_length(target_attr_nums); + params->conv_funcs = palloc(sizeof(FmgrInfo) * params->num_params); + params->formats = palloc(sizeof(int) * params->num_params * num_tuples); + params->lengths = palloc(sizeof(int) * params->num_params * num_tuples); + params->values = palloc(sizeof(char *) * params->num_params * num_tuples); + params->ctid = ctid; + params->target_attr_nums = target_attr_nums; + params->num_tuples = num_tuples; + params->converted_tuples = 0; + params->mctx = new; + params->tmp_ctx = tmp_ctx; + + if (params->ctid) + { + typefnoid = get_type_output_func(TIDOID, &isbinary, !ts_guc_enable_connection_binary_data); + fmgr_info(typefnoid, ¶ms->conv_funcs[idx]); + params->formats[idx] = 1; + idx++; + } + + foreach (lc, target_attr_nums) + { + int attr_num = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tuple_desc, AttrNumberGetAttrOffset(attr_num)); + Assert(!attr->attisdropped); + + typefnoid = + get_type_output_func(attr->atttypid, &isbinary, !ts_guc_enable_connection_binary_data); + params->formats[idx] = isbinary ? FORMAT_BINARY : FORMAT_TEXT; + + fmgr_info(typefnoid, ¶ms->conv_funcs[idx++]); + } + + Assert(params->num_params == idx); + + for (tup_cnt = 1; tup_cnt < params->num_tuples; tup_cnt++) + memcpy(params->formats + tup_cnt * params->num_params, + params->formats, + sizeof(int) * params->num_params); + + MemoryContextSwitchTo(old); + return params; +} + +static bool +all_values_in_binary_format(int *formats, int num_params) +{ + int i; + + for (i = 0; i < num_params; i++) + if (formats[i] != FORMAT_BINARY) + return false; + return true; +} + +/* + * tupleid is ctid. If ctid was set to true tupleid has to be provided + */ +void +stmt_params_convert_values(StmtParams *params, TupleTableSlot *slot, ItemPointer tupleid) +{ + MemoryContext old; + int idx; + ListCell *lc; + int nest_level; + bool all_binary; + + Assert(params->num_params > 0); + idx = params->converted_tuples * params->num_params; + + Assert(params->converted_tuples < params->num_tuples); + + old = MemoryContextSwitchTo(params->tmp_ctx); + + if (tupleid != NULL) + { + bytea *output_bytes; + Assert(params->ctid); + output_bytes = SendFunctionCall(¶ms->conv_funcs[idx], PointerGetDatum(tupleid)); + params->values[idx] = VARDATA(output_bytes); + params->lengths[idx] = (int) VARSIZE(output_bytes) - VARHDRSZ; + idx++; + } + else if (params->ctid) + elog(ERROR, "was configured to use ctid, but tupleid is NULL"); + + all_binary = all_values_in_binary_format(params->formats, params->num_params); + if (!all_binary) + nest_level = set_transmission_modes(); + + foreach (lc, params->target_attr_nums) + { + int attr_num = lfirst_int(lc); + Datum value; + bool isnull; + + value = slot_getattr(slot, attr_num, &isnull); + + if (isnull) + params->values[idx] = NULL; + else if (params->formats[idx] == 0) + /* text */ + params->values[idx] = OutputFunctionCall(¶ms->conv_funcs[idx], value); + else if (params->formats[idx] == 1) + { + /* binary */ + bytea *output_bytes = SendFunctionCall(¶ms->conv_funcs[idx], value); + params->values[idx] = VARDATA(output_bytes); + params->lengths[idx] = VARSIZE(output_bytes) - VARHDRSZ; + } + else + elog(ERROR, "unexpected parameter format: %d", params->formats[idx]); + idx++; + } + + params->converted_tuples++; + + if (!all_binary) + reset_transmission_modes(nest_level); + + MemoryContextSwitchTo(old); +} + +void +stmt_params_reset(StmtParams *params) +{ + MemoryContextReset(params->tmp_ctx); + params->converted_tuples = 0; +} + +/* + * Free params memory context and child context we've used for converting values to binary or text + */ +void +stmt_params_free(StmtParams *params) +{ + MemoryContextDelete(params->mctx); +} + +const int * +stmt_params_formats(StmtParams *stmt_params) +{ + return stmt_params->formats; +} + +const int * +stmt_params_lengths(StmtParams *stmt_params) +{ + return stmt_params->lengths; +} + +const char *const * +stmt_params_values(StmtParams *stmt_params) +{ + return stmt_params->values; +} + +const int +stmt_params_num_params(StmtParams *stmt_params) +{ + return stmt_params->num_params; +} + +const int +stmt_params_total_values(StmtParams *stmt_params) +{ + return stmt_params->converted_tuples * stmt_params->num_params; +} + +const int +stmt_params_converted_tuples(StmtParams *stmt_params) +{ + return stmt_params->converted_tuples; +} diff --git a/tsl/src/remote/stmt_params.h b/tsl/src/remote/stmt_params.h new file mode 100644 index 000000000..e5ebf002a --- /dev/null +++ b/tsl/src/remote/stmt_params.h @@ -0,0 +1,33 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#ifndef TIMESCALEDB_TSL_REMOTE_STMT_PARAMS_H +#define TIMESCALEDB_TSL_REMOTE_STMT_PARAMS_H + +#include +#include +#include +#include + +#define FORMAT_TEXT 0 +#define FORMAT_BINARY 1 + +typedef struct StmtParams StmtParams; + +extern StmtParams *stmt_params_create(List *target_attr_nums, bool ctid, TupleDesc tuple_desc, + int num_tuples); +extern void stmt_params_convert_values(StmtParams *params, TupleTableSlot *slot, + ItemPointer tupleid); + +extern const int *stmt_params_formats(StmtParams *stmt_params); +extern const int *stmt_params_lengths(StmtParams *stmt_params); +extern const char *const *stmt_params_values(StmtParams *stmt_params); +extern const int stmt_params_num_params(StmtParams *stmt_params); +extern void stmt_params_reset(StmtParams *params); +extern void stmt_params_free(StmtParams *params); +extern const int stmt_params_total_values(StmtParams *stmt_params); +extern const int stmt_params_converted_tuples(StmtParams *stmt_params); + +#endif diff --git a/tsl/test/expected/hypertable_distributed-11.out b/tsl/test/expected/hypertable_distributed-11.out index 03d657d70..c709d8c29 100644 --- a/tsl/test/expected/hypertable_distributed-11.out +++ b/tsl/test/expected/hypertable_distributed-11.out @@ -1624,3 +1624,26 @@ SELECT count(*) FROM twodim; 15 (1 row) +\c :TEST_DBNAME :ROLE_SUPERUSER; +SET ROLE :ROLE_DEFAULT_CLUSTER_USER; +-- Distributed table with custom type that has no binary output +CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text); +SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1); +NOTICE: adding not-null constraint to column "time" + hypertable_id | schema_name | table_name | created +---------------+-------------+-------------------+--------- + 8 | public | disttable_with_ct | t +(1 row) + +-- Insert data with custom type +INSERT INTO disttable_with_ct VALUES + ('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'), + ('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST +-- Test queries on distributed table with custom type +SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct; + time | txn_id | val | substring +------------------------------+------------+-----+---------------------- + Tue Jan 01 01:01:00 2019 PST | ts-1-10-20 | 1.1 | a + Tue Jan 01 01:02:00 2019 PST | ts-1-11-20 | 2 | abcabcabcabcabcabcab +(2 rows) + diff --git a/tsl/test/expected/hypertable_distributed-12.out b/tsl/test/expected/hypertable_distributed-12.out index cd7739148..da1768e5f 100644 --- a/tsl/test/expected/hypertable_distributed-12.out +++ b/tsl/test/expected/hypertable_distributed-12.out @@ -1618,3 +1618,26 @@ SELECT count(*) FROM twodim; 15 (1 row) +\c :TEST_DBNAME :ROLE_SUPERUSER; +SET ROLE :ROLE_DEFAULT_CLUSTER_USER; +-- Distributed table with custom type that has no binary output +CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text); +SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1); +NOTICE: adding not-null constraint to column "time" + hypertable_id | schema_name | table_name | created +---------------+-------------+-------------------+--------- + 8 | public | disttable_with_ct | t +(1 row) + +-- Insert data with custom type +INSERT INTO disttable_with_ct VALUES + ('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'), + ('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST +-- Test queries on distributed table with custom type +SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct; + time | txn_id | val | substring +------------------------------+------------+-----+---------------------- + Tue Jan 01 01:01:00 2019 PST | ts-1-10-20 | 1.1 | a + Tue Jan 01 01:02:00 2019 PST | ts-1-11-20 | 2 | abcabcabcabcabcabcab +(2 rows) + diff --git a/tsl/test/expected/remote_stmt_params.out b/tsl/test/expected/remote_stmt_params.out new file mode 100644 index 000000000..6a41ce17a --- /dev/null +++ b/tsl/test/expected/remote_stmt_params.out @@ -0,0 +1,22 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +\c :TEST_DBNAME :ROLE_SUPERUSER +CREATE OR REPLACE FUNCTION _timescaledb_internal.test_stmt_params_format(binary BOOL) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'tsl_test_stmt_params_format' +LANGUAGE C STRICT; +-- by default we use binary format +SELECT _timescaledb_internal.test_stmt_params_format(true); + test_stmt_params_format +------------------------- + +(1 row) + +SET timescaledb.enable_connection_binary_data = false; +SELECT _timescaledb_internal.test_stmt_params_format(false); + test_stmt_params_format +------------------------- + +(1 row) + diff --git a/tsl/test/sql/CMakeLists.txt b/tsl/test/sql/CMakeLists.txt index 15f43dafe..a66fd1b2d 100644 --- a/tsl/test/sql/CMakeLists.txt +++ b/tsl/test/sql/CMakeLists.txt @@ -90,6 +90,7 @@ if (PG_VERSION_SUPPORTS_MULTINODE) dist_commands.sql remote_connection.sql remote_connection_cache.sql + remote_stmt_params.sql remote_txn.sql remote_txn_id.sql remote_txn_resolve.sql diff --git a/tsl/test/sql/hypertable_distributed.sql.in b/tsl/test/sql/hypertable_distributed.sql.in index 32fd35a95..2768a20d9 100644 --- a/tsl/test/sql/hypertable_distributed.sql.in +++ b/tsl/test/sql/hypertable_distributed.sql.in @@ -611,3 +611,18 @@ SELECT count(*) FROM twodim; SELECT * FROM twodim ORDER BY time; SELECT count(*) FROM twodim; + +\c :TEST_DBNAME :ROLE_SUPERUSER; +SET ROLE :ROLE_DEFAULT_CLUSTER_USER; + +-- Distributed table with custom type that has no binary output +CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text); +SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1); + +-- Insert data with custom type +INSERT INTO disttable_with_ct VALUES + ('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'), + ('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST + +-- Test queries on distributed table with custom type +SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct; \ No newline at end of file diff --git a/tsl/test/sql/remote_stmt_params.sql b/tsl/test/sql/remote_stmt_params.sql new file mode 100644 index 000000000..c5be2c769 --- /dev/null +++ b/tsl/test/sql/remote_stmt_params.sql @@ -0,0 +1,17 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +\c :TEST_DBNAME :ROLE_SUPERUSER + +CREATE OR REPLACE FUNCTION _timescaledb_internal.test_stmt_params_format(binary BOOL) +RETURNS VOID +AS :TSL_MODULE_PATHNAME, 'tsl_test_stmt_params_format' +LANGUAGE C STRICT; + +-- by default we use binary format +SELECT _timescaledb_internal.test_stmt_params_format(true); + +SET timescaledb.enable_connection_binary_data = false; + +SELECT _timescaledb_internal.test_stmt_params_format(false); diff --git a/tsl/test/src/remote/CMakeLists.txt b/tsl/test/src/remote/CMakeLists.txt index 90caa04c8..f97366088 100644 --- a/tsl/test/src/remote/CMakeLists.txt +++ b/tsl/test/src/remote/CMakeLists.txt @@ -8,6 +8,7 @@ set(SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/dist_commands.c ${CMAKE_CURRENT_SOURCE_DIR}/txn_persistent_record.c ${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c + ${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c ) target_sources(${TSL_TESTS_LIB_NAME} PRIVATE ${SOURCES}) diff --git a/tsl/test/src/remote/stmt_params.c b/tsl/test/src/remote/stmt_params.c new file mode 100644 index 000000000..b3ab85419 --- /dev/null +++ b/tsl/test/src/remote/stmt_params.c @@ -0,0 +1,51 @@ +/* + * This file and its contents are licensed under the Timescale License. + * Please see the included NOTICE for copyright information and + * LICENSE-TIMESCALE for a copy of the license. + */ +#include +#include + +#include "export.h" +#include "compat.h" +#include "remote/stmt_params.h" +#include "test_utils.h" + +TS_FUNCTION_INFO_V1(tsl_test_stmt_params_format); + +Datum +tsl_test_stmt_params_format(PG_FUNCTION_ARGS) +{ + StmtParams *params; + const int *formats; + int i; + TupleDesc tuple_desc; + List *target_attr_nums; + bool binary = PG_GETARG_BOOL(0); + Form_pg_attribute *attrs = palloc(sizeof(Form_pg_attribute) * 2); + attrs[0] = &(FormData_pg_attribute){ + .atttypid = INT4OID, + }; + attrs[1] = &(FormData_pg_attribute){ + .atttypid = BOOLOID, + }; + +#if PG12_GE + tuple_desc = CreateTupleDesc(2, attrs); +#else + tuple_desc = CreateTupleDesc(2, false, attrs); +#endif + target_attr_nums = list_make2_int(1, 2); + + params = stmt_params_create(target_attr_nums, false, tuple_desc, 2); + formats = stmt_params_formats(params); + TestAssertTrue(stmt_params_num_params(params) == 2); + if (binary) + for (i = 0; i < 4; i++) + TestAssertTrue(formats[i] == FORMAT_BINARY); + else + for (i = 0; i < 4; i++) + TestAssertTrue(formats[i] == FORMAT_TEXT); + + PG_RETURN_VOID(); +}