mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 10:33:27 +08:00
Binary format support for statement parameters
This change clearly separates handling of statement parameters. By default parameters are represented in binary format but if not supported it can fallback to text format. A GUC timescaledb.enable_cluster_binary_format can be used to force TEXT.
This commit is contained in:
parent
ed1b9d19f1
commit
a38f514411
12
src/guc.c
12
src/guc.c
@ -56,6 +56,7 @@ char *ts_last_tune_version = NULL;
|
||||
char *ts_telemetry_cloud = NULL;
|
||||
TSDLLEXPORT bool ts_guc_enable_2pc;
|
||||
TSDLLEXPORT int ts_guc_max_insert_batch_size = 1000;
|
||||
TSDLLEXPORT bool ts_guc_enable_connection_binary_data;
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
bool ts_shutdown_bgw = false;
|
||||
@ -235,6 +236,17 @@ _guc_init(void)
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomBoolVariable("timescaledb.enable_connection_binary_data",
|
||||
"Enable binary format for connection",
|
||||
"Enable binary format for data exchanged between nodes in the cluster",
|
||||
&ts_guc_enable_connection_binary_data,
|
||||
true,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable("timescaledb.max_open_chunks_per_insert",
|
||||
"Maximum open chunks per insert",
|
||||
"Maximum number of open chunk tables per insert",
|
||||
|
@ -32,6 +32,7 @@ extern char *ts_last_tune_version;
|
||||
extern char *ts_telemetry_cloud;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_2pc;
|
||||
extern TSDLLEXPORT int ts_guc_max_insert_batch_size;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_connection_binary_data;
|
||||
|
||||
#ifdef TS_DEBUG
|
||||
extern bool ts_shutdown_bgw;
|
||||
|
@ -55,6 +55,7 @@
|
||||
#include <server_dispatch.h>
|
||||
#include <remote/dist_txn.h>
|
||||
#include <remote/async.h>
|
||||
#include <remote/stmt_params.h>
|
||||
#include <compat.h>
|
||||
|
||||
#include <planner.h>
|
||||
@ -190,16 +191,14 @@ typedef struct TsFdwModifyState
|
||||
bool has_returning; /* is there a RETURNING clause? */
|
||||
List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
|
||||
|
||||
/* info about parameters for prepared statement */
|
||||
AttrNumber ctid_attno; /* attnum of input resjunk ctid column */
|
||||
int p_nums; /* number of parameters to transmit */
|
||||
FmgrInfo *p_flinfo; /* output conversion functions for them */
|
||||
|
||||
/* working memory context */
|
||||
MemoryContext temp_cxt; /* context for per-tuple temporary data */
|
||||
|
||||
bool prepared;
|
||||
int num_servers;
|
||||
StmtParams *stmt_params; /* prepared statement paremeters */
|
||||
TsFdwServerState servers[FLEXIBLE_ARRAY_MEMBER];
|
||||
} TsFdwModifyState;
|
||||
|
||||
@ -1436,9 +1435,6 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm
|
||||
TsFdwModifyState *fmstate;
|
||||
Relation rel = rri->ri_RelationDesc;
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
AttrNumber n_params;
|
||||
Oid typefnoid;
|
||||
bool isvarlena;
|
||||
ListCell *lc;
|
||||
int i = 0;
|
||||
int num_servers = usermappings == NIL ? 1 : list_length(usermappings);
|
||||
@ -1500,11 +1496,6 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm
|
||||
if (fmstate->has_returning)
|
||||
fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
||||
|
||||
/* Prepare for output conversion of parameters used in prepared stmt. */
|
||||
n_params = list_length(fmstate->target_attrs) + 1;
|
||||
fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
|
||||
fmstate->p_nums = 0;
|
||||
|
||||
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||
{
|
||||
Assert(subplan != NULL);
|
||||
@ -1513,30 +1504,12 @@ create_foreign_modify(EState *estate, RangeTblEntry *rte, ResultRelInfo *rri, Cm
|
||||
fmstate->ctid_attno = ExecFindJunkAttributeInTlist(subplan->targetlist, "ctid");
|
||||
if (!AttributeNumberIsValid(fmstate->ctid_attno))
|
||||
elog(ERROR, "could not find junk ctid column");
|
||||
|
||||
/* First transmittable parameter will be ctid */
|
||||
getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
|
||||
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
||||
fmstate->p_nums++;
|
||||
}
|
||||
|
||||
if (operation == CMD_INSERT || operation == CMD_UPDATE)
|
||||
{
|
||||
/* Set up for remaining transmittable parameters */
|
||||
foreach (lc, fmstate->target_attrs)
|
||||
{
|
||||
int attnum = lfirst_int(lc);
|
||||
Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1);
|
||||
|
||||
Assert(!attr->attisdropped);
|
||||
|
||||
getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
|
||||
fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
|
||||
fmstate->p_nums++;
|
||||
}
|
||||
}
|
||||
|
||||
Assert(fmstate->p_nums <= n_params);
|
||||
fmstate->stmt_params = stmt_params_create(fmstate->target_attrs,
|
||||
operation == CMD_UPDATE || operation == CMD_DELETE,
|
||||
tupdesc,
|
||||
1);
|
||||
|
||||
return fmstate;
|
||||
}
|
||||
@ -2350,66 +2323,6 @@ begin_foreign_modify(ModifyTableState *mtstate, ResultRelInfo *rri, List *fdw_pr
|
||||
rri->ri_FdwState = fmstate;
|
||||
}
|
||||
|
||||
/*
|
||||
* convert_prep_stmt_params
|
||||
* Create array of text strings representing parameter values
|
||||
*
|
||||
* tupleid is ctid to send, or NULL if none
|
||||
* slot is slot to get remaining parameters from, or NULL if none
|
||||
*
|
||||
* Data is constructed in temp_cxt; caller should reset that after use.
|
||||
*/
|
||||
static const char **
|
||||
convert_prep_stmt_params(TsFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot)
|
||||
{
|
||||
const char **p_values;
|
||||
int pindex = 0;
|
||||
MemoryContext oldcontext;
|
||||
|
||||
oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
|
||||
|
||||
p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
|
||||
|
||||
/* 1st parameter should be ctid, if it's in use */
|
||||
if (tupleid != NULL)
|
||||
{
|
||||
/* don't need set_transmission_modes for TID output */
|
||||
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], PointerGetDatum(tupleid));
|
||||
pindex++;
|
||||
}
|
||||
|
||||
/* get following parameters from slot */
|
||||
if (slot != NULL && fmstate->target_attrs != NIL)
|
||||
{
|
||||
int nestlevel;
|
||||
ListCell *lc;
|
||||
|
||||
nestlevel = set_transmission_modes();
|
||||
|
||||
foreach (lc, fmstate->target_attrs)
|
||||
{
|
||||
int attnum = lfirst_int(lc);
|
||||
Datum value;
|
||||
bool isnull;
|
||||
|
||||
value = slot_getattr(slot, attnum, &isnull);
|
||||
if (isnull)
|
||||
p_values[pindex] = NULL;
|
||||
else
|
||||
p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], value);
|
||||
pindex++;
|
||||
}
|
||||
|
||||
reset_transmission_modes(nestlevel);
|
||||
}
|
||||
|
||||
Assert(pindex == fmstate->p_nums);
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
return p_values;
|
||||
}
|
||||
|
||||
/*
|
||||
* store_returning_result
|
||||
* Store the result of a RETURNING clause
|
||||
@ -2450,7 +2363,9 @@ prepare_foreign_modify_server(TsFdwModifyState *fmstate, TsFdwServerState *fdw_s
|
||||
|
||||
Assert(NULL == fdw_server->p_stmt);
|
||||
|
||||
req = async_request_send_prepare(fdw_server->conn, fmstate->query, fmstate->p_nums);
|
||||
req = async_request_send_prepare(fdw_server->conn,
|
||||
fmstate->query,
|
||||
stmt_params_num_params(fmstate->stmt_params));
|
||||
|
||||
Assert(NULL != req);
|
||||
|
||||
@ -2485,24 +2400,24 @@ exec_foreign_insert(EState *estate, ResultRelInfo *rri, TupleTableSlot *slot,
|
||||
TupleTableSlot *planSlot)
|
||||
{
|
||||
TsFdwModifyState *fmstate = (TsFdwModifyState *) rri->ri_FdwState;
|
||||
StmtParams *params = fmstate->stmt_params;
|
||||
AsyncRequestSet *reqset;
|
||||
AsyncResponseResult *rsp;
|
||||
const char **p_values;
|
||||
int n_rows = -1;
|
||||
int i;
|
||||
|
||||
/* Convert parameters needed by prepared statement to text form */
|
||||
p_values = convert_prep_stmt_params(fmstate, NULL, slot);
|
||||
|
||||
if (!fmstate->prepared)
|
||||
prepare_foreign_modify(fmstate);
|
||||
|
||||
reqset = async_request_set_create();
|
||||
|
||||
stmt_params_convert_values(params, slot, NULL);
|
||||
|
||||
for (i = 0; i < fmstate->num_servers; i++)
|
||||
{
|
||||
TsFdwServerState *fdw_server = &fmstate->servers[i];
|
||||
AsyncRequest *req = async_request_send_prepared_stmt(fdw_server->p_stmt, p_values);
|
||||
AsyncRequest *req =
|
||||
async_request_send_prepared_stmt_with_params(fdw_server->p_stmt, params);
|
||||
|
||||
Assert(NULL != req);
|
||||
|
||||
@ -2536,6 +2451,7 @@ exec_foreign_insert(EState *estate, ResultRelInfo *rri, TupleTableSlot *slot,
|
||||
|
||||
/* And clean up */
|
||||
async_response_result_close(rsp);
|
||||
stmt_params_reset(params);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -2566,11 +2482,11 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot
|
||||
TupleTableSlot *plan_slot, ModifyCommand cmd)
|
||||
{
|
||||
TsFdwModifyState *fmstate = (TsFdwModifyState *) rri->ri_FdwState;
|
||||
StmtParams *params = fmstate->stmt_params;
|
||||
AsyncRequestSet *reqset;
|
||||
AsyncResponseResult *rsp;
|
||||
Datum datum;
|
||||
bool is_null;
|
||||
const char **p_values;
|
||||
int n_rows = -1;
|
||||
int i;
|
||||
|
||||
@ -2585,16 +2501,16 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot
|
||||
if (is_null)
|
||||
elog(ERROR, "ctid is NULL");
|
||||
|
||||
/* Convert parameters needed by prepared statement to text form */
|
||||
p_values = convert_prep_stmt_params(fmstate,
|
||||
(ItemPointer) DatumGetPointer(datum),
|
||||
(cmd == UPDATE_CMD ? slot : NULL));
|
||||
stmt_params_convert_values(params,
|
||||
(cmd == UPDATE_CMD ? slot : NULL),
|
||||
(ItemPointer) DatumGetPointer(datum));
|
||||
reqset = async_request_set_create();
|
||||
|
||||
for (i = 0; i < fmstate->num_servers; i++)
|
||||
{
|
||||
TsFdwServerState *fdw_server = &fmstate->servers[i];
|
||||
AsyncRequest *req = async_request_send_prepared_stmt(fdw_server->p_stmt, p_values);
|
||||
AsyncRequest *req =
|
||||
async_request_send_prepared_stmt_with_params(fdw_server->p_stmt, params);
|
||||
|
||||
Assert(NULL != req);
|
||||
|
||||
@ -2639,6 +2555,7 @@ exec_foreign_update_or_delete(EState *estate, ResultRelInfo *rri, TupleTableSlot
|
||||
* inserts
|
||||
*/
|
||||
pfree(reqset);
|
||||
stmt_params_reset(params);
|
||||
|
||||
MemoryContextReset(fmstate->temp_cxt);
|
||||
|
||||
@ -2684,6 +2601,8 @@ finish_foreign_modify(TsFdwModifyState *fmstate)
|
||||
|
||||
fdw_server->conn = NULL;
|
||||
}
|
||||
|
||||
stmt_params_free(fmstate->stmt_params);
|
||||
}
|
||||
|
||||
static void
|
||||
|
@ -8,6 +8,7 @@ set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_store.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_id.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c
|
||||
)
|
||||
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
|
||||
target_include_directories(${TSL_LIBRARY_NAME} PRIVATE ${PG_INCLUDEDIR})
|
||||
|
@ -10,9 +10,21 @@
|
||||
#include <storage/latch.h>
|
||||
#include <miscadmin.h>
|
||||
#include <pgstat.h>
|
||||
#include <fmgr.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <catalog/pg_type.h>
|
||||
#include <access/tuptoaster.h>
|
||||
|
||||
#include "compat.h"
|
||||
#if PG12_GE
|
||||
#include <nodes/pathnodes.h>
|
||||
#else
|
||||
#include <nodes/relation.h>
|
||||
#endif
|
||||
#include "async.h"
|
||||
#include "connection.h"
|
||||
#include "fdw/utils.h"
|
||||
#include "utils.h"
|
||||
|
||||
#define MAX_ASYNC_TIMEOUT_MS 60000
|
||||
|
||||
@ -156,8 +168,9 @@ async_request_send_prepare(PGconn *conn, const char *sql, int n_params)
|
||||
return req;
|
||||
}
|
||||
|
||||
AsyncRequest *
|
||||
async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_values)
|
||||
static AsyncRequest *
|
||||
async_request_send_prepared_stmt_with_formats(PreparedStmt *stmt, const char *const *param_values,
|
||||
const int *param_lengths, const int *param_formats)
|
||||
{
|
||||
AsyncRequest *req = palloc0(sizeof(AsyncRequest));
|
||||
|
||||
@ -171,8 +184,8 @@ async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_va
|
||||
stmt->stmt_name,
|
||||
stmt->n_params,
|
||||
param_values,
|
||||
NULL,
|
||||
NULL,
|
||||
param_lengths,
|
||||
param_formats,
|
||||
0))
|
||||
{
|
||||
/*
|
||||
@ -185,6 +198,21 @@ async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *param_va
|
||||
return req;
|
||||
}
|
||||
|
||||
extern AsyncRequest *
|
||||
async_request_send_prepared_stmt(PreparedStmt *stmt, const char *const *paramValues)
|
||||
{
|
||||
return async_request_send_prepared_stmt_with_formats(stmt, paramValues, NULL, NULL);
|
||||
}
|
||||
|
||||
AsyncRequest *
|
||||
async_request_send_prepared_stmt_with_params(PreparedStmt *stmt, StmtParams *params)
|
||||
{
|
||||
return async_request_send_prepared_stmt_with_formats(stmt,
|
||||
stmt_params_values(params),
|
||||
stmt_params_lengths(params),
|
||||
stmt_params_formats(params));
|
||||
}
|
||||
|
||||
/* Set user data. Often it is useful to attach data with a request so
|
||||
that it can later be fetched from the response. */
|
||||
void
|
||||
|
@ -10,6 +10,8 @@
|
||||
#include <libpq-fe.h>
|
||||
#include <utils/timestamp.h>
|
||||
|
||||
#include "stmt_params.h"
|
||||
|
||||
#define DEFAULT_TIMEOUT_MS (SECS_PER_HOUR * 1000)
|
||||
|
||||
typedef struct AsyncRequest AsyncRequest;
|
||||
@ -66,6 +68,8 @@ extern AsyncRequest *async_request_send_prepare(PGconn *conn, const char *sql_st
|
||||
int n_params);
|
||||
extern AsyncRequest *async_request_send_prepared_stmt(PreparedStmt *stmt,
|
||||
const char *const *paramValues);
|
||||
extern AsyncRequest *async_request_send_prepared_stmt_with_params(PreparedStmt *stmt,
|
||||
StmtParams *params);
|
||||
extern void async_request_attach_user_data(AsyncRequest *req, void *user_data);
|
||||
extern AsyncResponseResult *async_request_wait_ok_result(AsyncRequest *request);
|
||||
extern AsyncResponseResult *async_request_wait_any_result(AsyncRequest *request);
|
||||
|
279
tsl/src/remote/stmt_params.c
Normal file
279
tsl/src/remote/stmt_params.c
Normal file
@ -0,0 +1,279 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <catalog/pg_type.h>
|
||||
#include <access/htup_details.h>
|
||||
#include <access/tuptoaster.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/syscache.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/memutils.h>
|
||||
|
||||
#include "compat.h"
|
||||
#if PG12_GE
|
||||
#include <nodes/pathnodes.h>
|
||||
#else
|
||||
#include <nodes/relation.h>
|
||||
#endif
|
||||
|
||||
#include "guc.h"
|
||||
#include "fdw/utils.h"
|
||||
#include "stmt_params.h"
|
||||
|
||||
typedef struct StmtParams
|
||||
{
|
||||
FmgrInfo *conv_funcs;
|
||||
const char **values;
|
||||
int *formats;
|
||||
int *lengths;
|
||||
int num_params;
|
||||
int num_tuples;
|
||||
int converted_tuples;
|
||||
bool ctid;
|
||||
List *target_attr_nums;
|
||||
MemoryContext mctx; /* where we allocate param values */
|
||||
MemoryContext tmp_ctx; /* used for converting values */
|
||||
} StmtParams;
|
||||
|
||||
/**
|
||||
* Returns either type binary or text output function (text output is secondary)
|
||||
**/
|
||||
static Oid
|
||||
get_type_output_func(Oid type, bool *is_binary, bool force_text)
|
||||
{
|
||||
HeapTuple type_tuple;
|
||||
Form_pg_type pg_type;
|
||||
Oid output_func;
|
||||
|
||||
type_tuple = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
|
||||
if (!HeapTupleIsValid(type_tuple))
|
||||
elog(ERROR, "cache lookup failed for type %u", type);
|
||||
pg_type = (Form_pg_type) GETSTRUCT(type_tuple);
|
||||
|
||||
if (!pg_type->typisdefined)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_UNDEFINED_OBJECT),
|
||||
errmsg("type %s is only a shell", format_type_be(type))));
|
||||
if (OidIsValid(pg_type->typsend) && !force_text)
|
||||
{
|
||||
output_func = pg_type->typsend;
|
||||
*is_binary = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
output_func = pg_type->typoutput;
|
||||
*is_binary = false;
|
||||
}
|
||||
ReleaseSysCache(type_tuple);
|
||||
if (!OidIsValid(output_func))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_UNDEFINED_FUNCTION),
|
||||
errmsg("no binary or text output function available for type %s",
|
||||
format_type_be(type))));
|
||||
return output_func;
|
||||
}
|
||||
|
||||
/*
|
||||
* ctid should be set to true if we're going to send it
|
||||
* num_tuples is used for batching
|
||||
* mctx memory context where we'll allocate StmtParams with all the values
|
||||
*/
|
||||
StmtParams *
|
||||
stmt_params_create(List *target_attr_nums, bool ctid, TupleDesc tuple_desc, int num_tuples)
|
||||
{
|
||||
StmtParams *params;
|
||||
ListCell *lc;
|
||||
Oid typefnoid;
|
||||
bool isbinary;
|
||||
int idx = 0;
|
||||
int tup_cnt;
|
||||
MemoryContext old;
|
||||
MemoryContext new;
|
||||
MemoryContext tmp_ctx;
|
||||
|
||||
new = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"stmt params mem context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
old = MemoryContextSwitchTo(new);
|
||||
tmp_ctx = AllocSetContextCreate(new, "stmt params conversion", ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
params = palloc(sizeof(StmtParams));
|
||||
|
||||
Assert(num_tuples > 0);
|
||||
params->num_params = ctid ? list_length(target_attr_nums) + 1 : list_length(target_attr_nums);
|
||||
params->conv_funcs = palloc(sizeof(FmgrInfo) * params->num_params);
|
||||
params->formats = palloc(sizeof(int) * params->num_params * num_tuples);
|
||||
params->lengths = palloc(sizeof(int) * params->num_params * num_tuples);
|
||||
params->values = palloc(sizeof(char *) * params->num_params * num_tuples);
|
||||
params->ctid = ctid;
|
||||
params->target_attr_nums = target_attr_nums;
|
||||
params->num_tuples = num_tuples;
|
||||
params->converted_tuples = 0;
|
||||
params->mctx = new;
|
||||
params->tmp_ctx = tmp_ctx;
|
||||
|
||||
if (params->ctid)
|
||||
{
|
||||
typefnoid = get_type_output_func(TIDOID, &isbinary, !ts_guc_enable_connection_binary_data);
|
||||
fmgr_info(typefnoid, ¶ms->conv_funcs[idx]);
|
||||
params->formats[idx] = 1;
|
||||
idx++;
|
||||
}
|
||||
|
||||
foreach (lc, target_attr_nums)
|
||||
{
|
||||
int attr_num = lfirst_int(lc);
|
||||
Form_pg_attribute attr = TupleDescAttr(tuple_desc, AttrNumberGetAttrOffset(attr_num));
|
||||
Assert(!attr->attisdropped);
|
||||
|
||||
typefnoid =
|
||||
get_type_output_func(attr->atttypid, &isbinary, !ts_guc_enable_connection_binary_data);
|
||||
params->formats[idx] = isbinary ? FORMAT_BINARY : FORMAT_TEXT;
|
||||
|
||||
fmgr_info(typefnoid, ¶ms->conv_funcs[idx++]);
|
||||
}
|
||||
|
||||
Assert(params->num_params == idx);
|
||||
|
||||
for (tup_cnt = 1; tup_cnt < params->num_tuples; tup_cnt++)
|
||||
memcpy(params->formats + tup_cnt * params->num_params,
|
||||
params->formats,
|
||||
sizeof(int) * params->num_params);
|
||||
|
||||
MemoryContextSwitchTo(old);
|
||||
return params;
|
||||
}
|
||||
|
||||
static bool
|
||||
all_values_in_binary_format(int *formats, int num_params)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < num_params; i++)
|
||||
if (formats[i] != FORMAT_BINARY)
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* tupleid is ctid. If ctid was set to true tupleid has to be provided
|
||||
*/
|
||||
void
|
||||
stmt_params_convert_values(StmtParams *params, TupleTableSlot *slot, ItemPointer tupleid)
|
||||
{
|
||||
MemoryContext old;
|
||||
int idx;
|
||||
ListCell *lc;
|
||||
int nest_level;
|
||||
bool all_binary;
|
||||
|
||||
Assert(params->num_params > 0);
|
||||
idx = params->converted_tuples * params->num_params;
|
||||
|
||||
Assert(params->converted_tuples < params->num_tuples);
|
||||
|
||||
old = MemoryContextSwitchTo(params->tmp_ctx);
|
||||
|
||||
if (tupleid != NULL)
|
||||
{
|
||||
bytea *output_bytes;
|
||||
Assert(params->ctid);
|
||||
output_bytes = SendFunctionCall(¶ms->conv_funcs[idx], PointerGetDatum(tupleid));
|
||||
params->values[idx] = VARDATA(output_bytes);
|
||||
params->lengths[idx] = (int) VARSIZE(output_bytes) - VARHDRSZ;
|
||||
idx++;
|
||||
}
|
||||
else if (params->ctid)
|
||||
elog(ERROR, "was configured to use ctid, but tupleid is NULL");
|
||||
|
||||
all_binary = all_values_in_binary_format(params->formats, params->num_params);
|
||||
if (!all_binary)
|
||||
nest_level = set_transmission_modes();
|
||||
|
||||
foreach (lc, params->target_attr_nums)
|
||||
{
|
||||
int attr_num = lfirst_int(lc);
|
||||
Datum value;
|
||||
bool isnull;
|
||||
|
||||
value = slot_getattr(slot, attr_num, &isnull);
|
||||
|
||||
if (isnull)
|
||||
params->values[idx] = NULL;
|
||||
else if (params->formats[idx] == 0)
|
||||
/* text */
|
||||
params->values[idx] = OutputFunctionCall(¶ms->conv_funcs[idx], value);
|
||||
else if (params->formats[idx] == 1)
|
||||
{
|
||||
/* binary */
|
||||
bytea *output_bytes = SendFunctionCall(¶ms->conv_funcs[idx], value);
|
||||
params->values[idx] = VARDATA(output_bytes);
|
||||
params->lengths[idx] = VARSIZE(output_bytes) - VARHDRSZ;
|
||||
}
|
||||
else
|
||||
elog(ERROR, "unexpected parameter format: %d", params->formats[idx]);
|
||||
idx++;
|
||||
}
|
||||
|
||||
params->converted_tuples++;
|
||||
|
||||
if (!all_binary)
|
||||
reset_transmission_modes(nest_level);
|
||||
|
||||
MemoryContextSwitchTo(old);
|
||||
}
|
||||
|
||||
void
|
||||
stmt_params_reset(StmtParams *params)
|
||||
{
|
||||
MemoryContextReset(params->tmp_ctx);
|
||||
params->converted_tuples = 0;
|
||||
}
|
||||
|
||||
/*
|
||||
* Free params memory context and child context we've used for converting values to binary or text
|
||||
*/
|
||||
void
|
||||
stmt_params_free(StmtParams *params)
|
||||
{
|
||||
MemoryContextDelete(params->mctx);
|
||||
}
|
||||
|
||||
const int *
|
||||
stmt_params_formats(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->formats;
|
||||
}
|
||||
|
||||
const int *
|
||||
stmt_params_lengths(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->lengths;
|
||||
}
|
||||
|
||||
const char *const *
|
||||
stmt_params_values(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->values;
|
||||
}
|
||||
|
||||
const int
|
||||
stmt_params_num_params(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->num_params;
|
||||
}
|
||||
|
||||
const int
|
||||
stmt_params_total_values(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->converted_tuples * stmt_params->num_params;
|
||||
}
|
||||
|
||||
const int
|
||||
stmt_params_converted_tuples(StmtParams *stmt_params)
|
||||
{
|
||||
return stmt_params->converted_tuples;
|
||||
}
|
33
tsl/src/remote/stmt_params.h
Normal file
33
tsl/src/remote/stmt_params.h
Normal file
@ -0,0 +1,33 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#ifndef TIMESCALEDB_TSL_REMOTE_STMT_PARAMS_H
|
||||
#define TIMESCALEDB_TSL_REMOTE_STMT_PARAMS_H
|
||||
|
||||
#include <postgres.h>
|
||||
#include <fmgr.h>
|
||||
#include <nodes/pg_list.h>
|
||||
#include <executor/tuptable.h>
|
||||
|
||||
#define FORMAT_TEXT 0
|
||||
#define FORMAT_BINARY 1
|
||||
|
||||
typedef struct StmtParams StmtParams;
|
||||
|
||||
extern StmtParams *stmt_params_create(List *target_attr_nums, bool ctid, TupleDesc tuple_desc,
|
||||
int num_tuples);
|
||||
extern void stmt_params_convert_values(StmtParams *params, TupleTableSlot *slot,
|
||||
ItemPointer tupleid);
|
||||
|
||||
extern const int *stmt_params_formats(StmtParams *stmt_params);
|
||||
extern const int *stmt_params_lengths(StmtParams *stmt_params);
|
||||
extern const char *const *stmt_params_values(StmtParams *stmt_params);
|
||||
extern const int stmt_params_num_params(StmtParams *stmt_params);
|
||||
extern void stmt_params_reset(StmtParams *params);
|
||||
extern void stmt_params_free(StmtParams *params);
|
||||
extern const int stmt_params_total_values(StmtParams *stmt_params);
|
||||
extern const int stmt_params_converted_tuples(StmtParams *stmt_params);
|
||||
|
||||
#endif
|
@ -1624,3 +1624,26 @@ SELECT count(*) FROM twodim;
|
||||
15
|
||||
(1 row)
|
||||
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER;
|
||||
SET ROLE :ROLE_DEFAULT_CLUSTER_USER;
|
||||
-- Distributed table with custom type that has no binary output
|
||||
CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text);
|
||||
SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1);
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
hypertable_id | schema_name | table_name | created
|
||||
---------------+-------------+-------------------+---------
|
||||
8 | public | disttable_with_ct | t
|
||||
(1 row)
|
||||
|
||||
-- Insert data with custom type
|
||||
INSERT INTO disttable_with_ct VALUES
|
||||
('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'),
|
||||
('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST
|
||||
-- Test queries on distributed table with custom type
|
||||
SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct;
|
||||
time | txn_id | val | substring
|
||||
------------------------------+------------+-----+----------------------
|
||||
Tue Jan 01 01:01:00 2019 PST | ts-1-10-20 | 1.1 | a
|
||||
Tue Jan 01 01:02:00 2019 PST | ts-1-11-20 | 2 | abcabcabcabcabcabcab
|
||||
(2 rows)
|
||||
|
||||
|
@ -1618,3 +1618,26 @@ SELECT count(*) FROM twodim;
|
||||
15
|
||||
(1 row)
|
||||
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER;
|
||||
SET ROLE :ROLE_DEFAULT_CLUSTER_USER;
|
||||
-- Distributed table with custom type that has no binary output
|
||||
CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text);
|
||||
SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1);
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
hypertable_id | schema_name | table_name | created
|
||||
---------------+-------------+-------------------+---------
|
||||
8 | public | disttable_with_ct | t
|
||||
(1 row)
|
||||
|
||||
-- Insert data with custom type
|
||||
INSERT INTO disttable_with_ct VALUES
|
||||
('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'),
|
||||
('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST
|
||||
-- Test queries on distributed table with custom type
|
||||
SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct;
|
||||
time | txn_id | val | substring
|
||||
------------------------------+------------+-----+----------------------
|
||||
Tue Jan 01 01:01:00 2019 PST | ts-1-10-20 | 1.1 | a
|
||||
Tue Jan 01 01:02:00 2019 PST | ts-1-11-20 | 2 | abcabcabcabcabcabcab
|
||||
(2 rows)
|
||||
|
||||
|
22
tsl/test/expected/remote_stmt_params.out
Normal file
22
tsl/test/expected/remote_stmt_params.out
Normal file
@ -0,0 +1,22 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.test_stmt_params_format(binary BOOL)
|
||||
RETURNS VOID
|
||||
AS :TSL_MODULE_PATHNAME, 'tsl_test_stmt_params_format'
|
||||
LANGUAGE C STRICT;
|
||||
-- by default we use binary format
|
||||
SELECT _timescaledb_internal.test_stmt_params_format(true);
|
||||
test_stmt_params_format
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET timescaledb.enable_connection_binary_data = false;
|
||||
SELECT _timescaledb_internal.test_stmt_params_format(false);
|
||||
test_stmt_params_format
|
||||
-------------------------
|
||||
|
||||
(1 row)
|
||||
|
@ -90,6 +90,7 @@ if (PG_VERSION_SUPPORTS_MULTINODE)
|
||||
dist_commands.sql
|
||||
remote_connection.sql
|
||||
remote_connection_cache.sql
|
||||
remote_stmt_params.sql
|
||||
remote_txn.sql
|
||||
remote_txn_id.sql
|
||||
remote_txn_resolve.sql
|
||||
|
@ -611,3 +611,18 @@ SELECT count(*) FROM twodim;
|
||||
SELECT * FROM twodim
|
||||
ORDER BY time;
|
||||
SELECT count(*) FROM twodim;
|
||||
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER;
|
||||
SET ROLE :ROLE_DEFAULT_CLUSTER_USER;
|
||||
|
||||
-- Distributed table with custom type that has no binary output
|
||||
CREATE TABLE disttable_with_ct(time timestamptz, txn_id rxid, val float, info text);
|
||||
SELECT * FROM create_hypertable('disttable_with_ct', 'time', replication_factor => 1);
|
||||
|
||||
-- Insert data with custom type
|
||||
INSERT INTO disttable_with_ct VALUES
|
||||
('2019-01-01 01:01', 'ts-1-10-20', 1.1, 'a'),
|
||||
('2019-01-01 01:02', 'ts-1-11-20', 2.0, repeat('abc', 1000000)); -- TOAST
|
||||
|
||||
-- Test queries on distributed table with custom type
|
||||
SELECT time,txn_id, val, substring(info for 20) FROM disttable_with_ct;
|
17
tsl/test/sql/remote_stmt_params.sql
Normal file
17
tsl/test/sql/remote_stmt_params.sql
Normal file
@ -0,0 +1,17 @@
|
||||
-- This file and its contents are licensed under the Timescale License.
|
||||
-- Please see the included NOTICE for copyright information and
|
||||
-- LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.test_stmt_params_format(binary BOOL)
|
||||
RETURNS VOID
|
||||
AS :TSL_MODULE_PATHNAME, 'tsl_test_stmt_params_format'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
-- by default we use binary format
|
||||
SELECT _timescaledb_internal.test_stmt_params_format(true);
|
||||
|
||||
SET timescaledb.enable_connection_binary_data = false;
|
||||
|
||||
SELECT _timescaledb_internal.test_stmt_params_format(false);
|
@ -8,6 +8,7 @@ set(SOURCES
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/dist_commands.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_persistent_record.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/txn_resolve.c
|
||||
${CMAKE_CURRENT_SOURCE_DIR}/stmt_params.c
|
||||
)
|
||||
|
||||
target_sources(${TSL_TESTS_LIB_NAME} PRIVATE ${SOURCES})
|
||||
|
51
tsl/test/src/remote/stmt_params.c
Normal file
51
tsl/test/src/remote/stmt_params.c
Normal file
@ -0,0 +1,51 @@
|
||||
/*
|
||||
* This file and its contents are licensed under the Timescale License.
|
||||
* Please see the included NOTICE for copyright information and
|
||||
* LICENSE-TIMESCALE for a copy of the license.
|
||||
*/
|
||||
#include <postgres.h>
|
||||
#include <catalog/pg_type.h>
|
||||
|
||||
#include "export.h"
|
||||
#include "compat.h"
|
||||
#include "remote/stmt_params.h"
|
||||
#include "test_utils.h"
|
||||
|
||||
TS_FUNCTION_INFO_V1(tsl_test_stmt_params_format);
|
||||
|
||||
Datum
|
||||
tsl_test_stmt_params_format(PG_FUNCTION_ARGS)
|
||||
{
|
||||
StmtParams *params;
|
||||
const int *formats;
|
||||
int i;
|
||||
TupleDesc tuple_desc;
|
||||
List *target_attr_nums;
|
||||
bool binary = PG_GETARG_BOOL(0);
|
||||
Form_pg_attribute *attrs = palloc(sizeof(Form_pg_attribute) * 2);
|
||||
attrs[0] = &(FormData_pg_attribute){
|
||||
.atttypid = INT4OID,
|
||||
};
|
||||
attrs[1] = &(FormData_pg_attribute){
|
||||
.atttypid = BOOLOID,
|
||||
};
|
||||
|
||||
#if PG12_GE
|
||||
tuple_desc = CreateTupleDesc(2, attrs);
|
||||
#else
|
||||
tuple_desc = CreateTupleDesc(2, false, attrs);
|
||||
#endif
|
||||
target_attr_nums = list_make2_int(1, 2);
|
||||
|
||||
params = stmt_params_create(target_attr_nums, false, tuple_desc, 2);
|
||||
formats = stmt_params_formats(params);
|
||||
TestAssertTrue(stmt_params_num_params(params) == 2);
|
||||
if (binary)
|
||||
for (i = 0; i < 4; i++)
|
||||
TestAssertTrue(formats[i] == FORMAT_BINARY);
|
||||
else
|
||||
for (i = 0; i < 4; i++)
|
||||
TestAssertTrue(formats[i] == FORMAT_TEXT);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user