Generalize deparsing of remote function calls

Certain functions invoked on an access node need to be passed on to
data nodes to ensure any mutations happen also on those
nodes. Examples of such functions are `drop_chunks`, `add_dimension`,
`set_chunk_time_interval`, etc. So far, the approach has been to
deparse these "manually" on a case-by-case basis.

This change implements a generalized deparsing function that deparses
the function based on the function call info (`FunctionCallInfo`) that
holds the information about any invoked function that can be used to
deparse the function call.

The `drop_chunks` function has been updated to use this generalized
deparsing functionality when it is invoking remote nodes.
This commit is contained in:
Erik Nordström 2019-09-11 09:55:14 +02:00 committed by Erik Nordström
parent 55d205b09b
commit 7f3bc09eb6
21 changed files with 664 additions and 254 deletions

View File

@ -1,6 +1,7 @@
-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
CREATE OR REPLACE FUNCTION _timescaledb_internal.set_dist_id(dist_id UUID) RETURNS BOOL
AS '@MODULE_PATHNAME@', 'ts_dist_set_id' LANGUAGE C VOLATILE STRICT;

View File

@ -3378,6 +3378,33 @@ list_return_srf(FunctionCallInfo fcinfo)
SRF_RETURN_DONE(funcctx);
}
static void
drop_remote_chunks(FunctionCallInfo fcinfo, const Name schema_name, const Name table_name,
List *data_node_oids)
{
/* Wildcard drops not supported on distributed hypertables */
if (schema_name == NULL && table_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use wildcard to drop chunks on distributed hypertables"),
errhint("Drop chunks on each distributed hypertable individually.")));
/* The schema name must be present when dropping remote chunks because the
* search path on the connection is always set to pg_catalog. Thus, the
* data node will not be able to resolve the same hypertables without the
* schema. */
if (schema_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("schema required when dropping chunks on distributed hypertables")));
/* Make sure the schema is set in case it was not given on the command line */
FC_ARG(fcinfo, 2) = NameGetDatum(schema_name);
FC_NULL(fcinfo, 2) = false;
ts_cm_functions->func_call_on_data_nodes(fcinfo, data_node_oids);
}
Datum
ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
{
@ -3461,7 +3488,6 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
if (NULL == schema_name && list_length(ht_oids) == 1)
{
Oid nspid = get_rel_namespace(linitial_oid(ht_oids));
schema_name =
DatumGetName(DirectFunctionCall1(namein, CStringGetDatum(get_namespace_name(nspid))));
}
@ -3540,16 +3566,7 @@ ts_chunk_drop_chunks(PG_FUNCTION_ARGS)
}
if (data_node_oids != NIL)
ts_cm_functions->drop_chunks_on_data_nodes(table_name,
schema_name,
older_than_datum,
newer_than_datum,
older_than_type,
newer_than_type,
cascade,
cascades_to_materializations,
verbose,
data_node_oids);
drop_remote_chunks(fcinfo, schema_name, table_name, data_node_oids);
/* store data for multi function call */
funcctx->max_calls = list_length(dc_names);

View File

@ -561,10 +561,7 @@ data_node_set_block_new_chunks_default(PG_FUNCTION_ARGS, bool block)
}
static void
empty_drop_chunks_on_data_nodes(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type, Oid newer_than_type,
bool cascade, bool cascades_to_materializations, bool verbose,
List *data_node_oids)
func_call_on_data_nodes_default(FunctionCallInfo finfo, List *data_node_oids)
{
error_no_default_fn_community();
pg_unreachable();
@ -659,7 +656,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.remove_from_distributed_db = error_no_default_fn_bool_void_community,
.remote_hypertable_info = error_no_default_fn_pg_community,
.validate_as_data_node = error_no_default_fn_community,
.drop_chunks_on_data_nodes = empty_drop_chunks_on_data_nodes,
.func_call_on_data_nodes = func_call_on_data_nodes_default,
};
TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;

View File

@ -140,11 +140,7 @@ typedef struct CrossModuleFunctions
bool (*remove_from_distributed_db)(void);
PGFunction remote_hypertable_info;
void (*validate_as_data_node)(void);
void (*drop_chunks_on_data_nodes)(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type,
Oid newer_than_type, bool cascade,
bool cascades_to_materializations, bool verbose,
List *data_node_oids);
void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids);
PGFunction distributed_exec;
} CrossModuleFunctions;

View File

@ -14,6 +14,7 @@
#include <utils/builtins.h>
#include <hypertable_cache.h>
#include <utils/snapmgr.h>
#include <nodes/primnodes.h>
#include <continuous_agg.h>
#include "bgw/timer.h"
@ -206,9 +207,9 @@ execute_drop_chunks_policy(int32 job_id)
Hypertable *hypertable;
Cache *hcache;
Dimension *open_dim;
List *data_node_oids = NIL;
Datum older_than;
Datum older_than_type;
int num_dropped;
if (!IsTransactionOrTransactionBlock())
{
@ -232,34 +233,31 @@ execute_drop_chunks_policy(int32 job_id)
older_than = ts_interval_subtract_from_now(&args->older_than, open_dim);
older_than_type = ts_dimension_get_partition_type(open_dim);
ts_chunk_do_drop_chunks(table_relid,
#if PG_VERSION_SUPPORTS_MULTINODE
/* Invoke drop chunks via fmgr so that the call can be deparsed and sent
* also to remote data nodes. */
num_dropped = chunk_invoke_drop_chunks(&hypertable->fd.schema_name,
&hypertable->fd.table_name,
older_than,
InvalidOid,
older_than_type,
args->cascade,
args->cascade_to_materializations);
#else
num_dropped = list_length(ts_chunk_do_drop_chunks(table_relid,
older_than,
(Datum) 0,
older_than_type,
InvalidOid,
args->cascade,
args->cascade_to_materializations,
LOG,
true /*user_supplied_table_name */,
&data_node_oids);
#if PG_VERSION_SUPPORTS_MULTINODE
if (data_node_oids != NIL)
chunk_drop_remote_chunks(&hypertable->fd.table_name,
&hypertable->fd.schema_name,
older_than,
InvalidOid,
older_than_type,
InvalidOid,
args->cascade,
args->cascade_to_materializations,
false,
data_node_oids);
NULL));
#endif
ts_cache_release(hcache);
elog(LOG, "job %d completed dropping chunks", job_id);
elog(LOG, "job %d completed dropping %d chunks", job_id, num_dropped);
if (started)
{

View File

@ -3,18 +3,33 @@
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include <postgres.h>
#include <foreign/foreign.h>
#include <catalog/pg_foreign_server.h>
#include <catalog/pg_foreign_table.h>
#include <catalog/dependency.h>
#include <catalog/namespace.h>
#include <access/htup_details.h>
#include <access/xact.h>
#include <nodes/makefuncs.h>
#include <utils/syscache.h>
#include <utils/inval.h>
#include <utils/tuplestore.h>
#include <utils/palloc.h>
#include <utils/memutils.h>
#include <executor/executor.h>
#include <funcapi.h>
#include <miscadmin.h>
#include <fmgr.h>
#if USE_ASSERT_CHECKING
#include <funcapi.h>
#endif
#include <compat.h>
#include <chunk_data_node.h>
#include <extension.h>
#include <errors.h>
#include "chunk.h"
@ -161,40 +176,102 @@ chunk_set_default_data_node(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(chunk_set_foreign_server(chunk, server));
}
void
chunk_drop_remote_chunks(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type, Oid newer_than_type,
bool cascade, bool cascades_to_materializations, bool verbose,
List *data_node_oids)
/* Should match definition in ddl_api.sql */
#define DROP_CHUNKS_FUNCNAME "drop_chunks"
#define DROP_CHUNKS_NARGS 7
/*
* Invoke drop_chunks via fmgr so that the call can be deparsed and sent to
* remote data nodes.
*
* Given that drop_chunks is an SRF, and has pseudo parameter types, we need
* to provide a FuncExpr with type information for the deparser.
*
* Returns the number of dropped chunks.
*/
int
chunk_invoke_drop_chunks(Name schema_name, Name table_name, Datum older_than, Datum older_than_type,
bool cascade, bool cascade_to_materializations)
{
List *data_node_names;
const char *sql_cmd;
EState *estate;
ExprContext *econtext;
FuncCandidateList funclist;
FuncExpr *fexpr;
List *args = NIL;
int i, num_results = 0;
SetExprState *state;
Oid restype;
Const *argarr[DROP_CHUNKS_NARGS] = {
makeConst(older_than_type,
-1,
InvalidOid,
get_typlen(older_than_type),
older_than,
false,
get_typbyval(older_than_type)),
makeConst(NAMEOID,
-1,
InvalidOid,
sizeof(NameData),
NameGetDatum(table_name),
false,
false),
makeConst(NAMEOID,
-1,
InvalidOid,
sizeof(NameData),
NameGetDatum(schema_name),
false,
false),
castNode(Const, makeBoolConst(cascade, false)),
makeNullConst(INT8OID, -1, InvalidOid),
castNode(Const, makeBoolConst(false, true)),
castNode(Const, makeBoolConst(cascade_to_materializations, false))
};
if (table_name == NULL && schema_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot use wildcard to drop chunks on distributed hypertables"),
errhint("Drop chunks on each distributed hypertable individually.")));
funclist = FuncnameGetCandidates(list_make2(makeString(ts_extension_schema_name()),
makeString(DROP_CHUNKS_FUNCNAME)),
DROP_CHUNKS_NARGS,
NIL,
false,
false,
false);
/* The schema name must be present when dropping remote chunks because the
* search path on the connection is always set to pg_catalog. Thus, the
* data node will not be able to resolve the same hypertables without the
* schema. */
if (schema_name == NULL)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("schema required when dropping chunks on distributed hypertables")));
if (funclist->next != NULL)
elog(ERROR, "could not find drop_chunks function");
data_node_names = data_node_oids_to_node_name_list(data_node_oids, ACL_USAGE);
sql_cmd = deparse_drop_chunks_func(table_name,
schema_name,
older_than_datum,
newer_than_datum,
older_than_type,
newer_than_type,
cascade,
cascades_to_materializations,
verbose);
/* Prepare the function expr with argument list */
get_func_result_type(funclist->oid, &restype, NULL);
ts_dist_cmd_run_on_data_nodes(sql_cmd, data_node_names);
for (i = 0; i < DROP_CHUNKS_NARGS; i++)
args = lappend(args, argarr[i]);
fexpr =
makeFuncExpr(funclist->oid, restype, args, InvalidOid, InvalidOid, COERCE_EXPLICIT_CALL);
fexpr->funcretset = true;
/* Execute the SRF */
estate = CreateExecutorState();
econtext = CreateExprContext(estate);
state = ExecInitFunctionResultSet(&fexpr->xpr, econtext, NULL);
while (true)
{
ExprDoneCond isdone;
bool isnull;
ExecMakeFunctionResultSet(state, econtext, estate->es_query_cxt, &isnull, &isdone);
if (isdone == ExprEndResult)
break;
if (!isnull)
num_results++;
}
/* Cleanup */
FreeExprContext(econtext, false);
FreeExecutorState(estate);
return num_results;
}

View File

@ -7,15 +7,14 @@
#define TIMESCALEDB_TSL_CHUNK_H
#include <postgres.h>
#include <fmgr.h>
#include <chunk.h>
extern void chunk_update_foreign_server_if_needed(int32 chunk_id, Oid existing_server_id);
extern Datum chunk_set_default_data_node(PG_FUNCTION_ARGS);
extern void chunk_drop_remote_chunks(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type,
Oid newer_than_type, bool cascade,
bool cascades_to_materializations, bool verbose,
List *data_node_oids);
extern int chunk_invoke_drop_chunks(Name schema_name, Name table_name, Datum older_than,
Datum older_than_type, bool cascade,
bool cascade_to_materializations);
#endif /* TIMESCALEDB_TSL_CHUNK_H */

View File

@ -1294,3 +1294,27 @@ data_node_oids_to_node_name_list(List *data_node_oids, AclMode mode)
return node_names;
}
void
data_node_name_list_check_acl(List *data_node_names, AclMode mode)
{
AclResult aclresult;
Oid curuserid;
ListCell *lc;
if (data_node_names == NIL)
return;
curuserid = GetUserId();
foreach (lc, data_node_names)
{
ForeignServer *server = GetForeignServerByName(lfirst(lc), false);
/* Must have permissions on the server object */
aclresult = pg_foreign_server_aclcheck(server->serverid, curuserid, mode);
if (aclresult != ACLCHECK_OK)
aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, server->servername);
}
}

View File

@ -31,6 +31,7 @@ extern List *data_node_get_node_name_list(void);
extern List *data_node_array_to_node_name_list_with_aclcheck(ArrayType *noderarr, AclMode mode);
extern List *data_node_array_to_node_name_list(ArrayType *nodearr);
extern List *data_node_oids_to_node_name_list(List *data_node_oids, AclMode mode);
extern void data_node_name_list_check_acl(List *data_node_names, AclMode mode);
extern Datum data_node_ping(PG_FUNCTION_ARGS);
extern Datum data_node_set_chunk_default_data_node(PG_FUNCTION_ARGS);

View File

@ -9,27 +9,30 @@
#include <utils/builtins.h>
#include <utils/lsyscache.h>
#include <utils/relcache.h>
#include <catalog/indexing.h>
#include <utils/ruleutils.h>
#include <utils/syscache.h>
#include <commands/tablespace.h>
#include <catalog/pg_class.h>
#include <utils/rel.h>
#include <access/relscan.h>
#include <utils/fmgroids.h>
#include <utils/lsyscache.h>
#include <utils/rel.h>
#include <commands/tablespace.h>
#include <access/relscan.h>
#include <catalog/pg_class.h>
#include <catalog/indexing.h>
#include <catalog/pg_constraint.h>
#include <catalog/pg_index.h>
#include <catalog/pg_proc.h>
#include <catalog/namespace.h>
#include <nodes/pg_list.h>
#include <funcapi.h>
#include <fmgr.h>
#include "export.h"
#include "compat.h"
#include "constraint.h"
#include "trigger.h"
#include "utils.h"
#include "deparse.h"
#include <constraint.h>
#include <extension.h>
#include <utils.h>
#include <export.h>
#include <compat.h>
#include <trigger.h>
#include "deparse.h"
/*
* Deparse a table into a set of SQL commands that can be used to recreate it.
@ -493,128 +496,215 @@ deparse_get_distributed_hypertable_create_command(Hypertable *ht)
return result;
}
typedef struct StringInfoWithSeparator
{
StringInfo buff;
char *sep;
bool empty;
} StringInfoWithSeparator;
#define DEFAULT_SCALAR_RESULT_NAME "*"
static StringInfoWithSeparator *
init_string_info_with_sep(const char *sep)
static void
deparse_result_type(StringInfo sql, FunctionCallInfo fcinfo)
{
StringInfoWithSeparator *str = palloc(sizeof(StringInfoWithSeparator));
str->buff = makeStringInfo();
str->sep = pstrdup(sep);
str->empty = true;
return str;
TupleDesc tupdesc;
char *scalarname;
Oid resulttypeid;
int i;
switch (get_call_result_type(fcinfo, &resulttypeid, &tupdesc))
{
case TYPEFUNC_SCALAR:
/* scalar result type */
Assert(NULL == tupdesc);
Assert(OidIsValid(resulttypeid));
/* Check if the function has a named OUT parameter */
scalarname = get_func_result_name(fcinfo->flinfo->fn_oid);
/* If there is no named OUT parameter, use the default name */
if (NULL != scalarname)
{
appendStringInfoString(sql, scalarname);
pfree(scalarname);
}
else
appendStringInfoString(sql, DEFAULT_SCALAR_RESULT_NAME);
break;
case TYPEFUNC_COMPOSITE:
/* determinable rowtype result */
Assert(NULL != tupdesc);
for (i = 0; i < tupdesc->natts; i++)
{
if (!tupdesc->attrs[i].attisdropped)
{
appendStringInfoString(sql, NameStr(tupdesc->attrs[i].attname));
if (i < (tupdesc->natts - 1))
appendStringInfoChar(sql, ',');
}
}
break;
case TYPEFUNC_RECORD:
/* indeterminate rowtype result */
elog(NOTICE, "record return type");
break;
case TYPEFUNC_COMPOSITE_DOMAIN:
/* domain over determinable rowtype result */
case TYPEFUNC_OTHER:
elog(ERROR, "unsupported result type for deparsing");
break;
}
}
/*
* String append with a separator. After first run a separator will be set to `,`, so
* consecutive runs will result in a proper separator being added.
* Deparse a function call.
*
* Turn a function call back into a string. In theory, we could just call
* deparse_expression() (ruleutils.c) on the original function expression (as
* given by fcinfo->flinfo->fn_expr), but we'd like to support deparsing also
* when the expression is not available (e.g., when invoking by OID from C
* code). Further, deparse_expression() doesn't explicitly give the parameter
* names, which is important in order to maintain forward-compatibility with
* the remote version of the function in case it has reordered the parameters.
*/
static void pg_attribute_printf(2, 3)
string_info_with_separator_append(StringInfoWithSeparator *str, const char *fmt, ...)
const char *
deparse_func_call(FunctionCallInfo fcinfo)
{
va_list fmt_args;
int needed_bytes;
HeapTuple ftup;
Form_pg_proc procform;
StringInfoData sql;
const char *funcnamespace;
OverrideSearchPath search_path = {
.schemas = NIL,
.addCatalog = false,
.addTemp = false,
};
Oid funcid = fcinfo->flinfo->fn_oid;
int PG_USED_FOR_ASSERTS_ONLY numargs;
Oid *argtypes;
char **argnames;
char *argmodes;
int i;
if (!str->empty)
appendStringInfo(str->buff, "%s", ", ");
while (true)
{
va_start(fmt_args, fmt);
needed_bytes = appendStringInfoVA(str->buff, fmt, fmt_args);
va_end(fmt_args);
initStringInfo(&sql);
appendStringInfoString(&sql, "SELECT ");
deparse_result_type(&sql, fcinfo);
if (needed_bytes == 0)
/* First fetch the function's pg_proc row to inspect its rettype */
ftup = SearchSysCache1(PROCOID, ObjectIdGetDatum(funcid));
if (!HeapTupleIsValid(ftup))
elog(ERROR, "cache lookup failed for function %u", funcid);
procform = (Form_pg_proc) GETSTRUCT(ftup);
funcnamespace = get_namespace_name(procform->pronamespace);
numargs = get_func_arg_info(ftup, &argtypes, &argnames, &argmodes);
Assert(numargs >= fcinfo->nargs);
appendStringInfo(&sql,
" FROM %s(",
quote_qualified_identifier(funcnamespace, NameStr(procform->proname)));
ReleaseSysCache(ftup);
/* Temporarily set a NULL search path. This makes identifier types (e.g.,
* regclass / tables) be fully qualified, which is needed since the search
* path on a remote node is not guaranteed to be the same. */
PushOverrideSearchPath(&search_path);
for (i = 0; i < fcinfo->nargs; i++)
{
str->empty = false;
const char *argvalstr = "NULL";
bool add_type_cast = false;
switch (argtypes[i])
{
case ANYOID:
case ANYELEMENTOID:
/* For pseudo types, try to resolve the "real" argument type
* from the function expression, if present */
if (NULL != fcinfo->flinfo && NULL != fcinfo->flinfo->fn_expr)
{
Oid expr_argtype = get_fn_expr_argtype(fcinfo->flinfo, i);
/* Function parameters that aren't typed need type casts,
* but only add a cast if the expr contained a "real" type
* and not an unknown or pseudo type. */
if (OidIsValid(expr_argtype) && expr_argtype != UNKNOWNOID &&
expr_argtype != argtypes[i])
add_type_cast = true;
argtypes[i] = expr_argtype;
}
break;
default:
break;
}
enlargeStringInfo(str->buff, needed_bytes);
}
}
static const char *
get_typname(Oid type_oid)
if (!FC_NULL(fcinfo, i))
{
Form_pg_type form;
char *typname;
HeapTuple tp;
bool isvarlena;
Oid outfuncid;
tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", type_oid);
form = (Form_pg_type) GETSTRUCT(tp);
typname = pstrdup(NameStr(form->typname));
ReleaseSysCache(tp);
if (!OidIsValid(argtypes[i]))
elog(ERROR, "invalid type for argument %d", i);
return typname;
getTypeOutputInfo(argtypes[i], &outfuncid, &isvarlena);
Assert(OidIsValid(outfuncid));
argvalstr = quote_literal_cstr(OidOutputFunctionCall(outfuncid, FC_ARG(fcinfo, i)));
}
appendStringInfo(&sql, "%s => %s", argnames[i], argvalstr);
if (add_type_cast)
appendStringInfo(&sql, "::%s", format_type_be(argtypes[i]));
if (i < (fcinfo->nargs - 1))
appendStringInfoChar(&sql, ',');
}
PopOverrideSearchPath();
if (NULL != argtypes)
pfree(argtypes);
if (NULL != argnames)
pfree(argnames);
if (NULL != argmodes)
pfree(argmodes);
appendStringInfoChar(&sql, ')');
return sql.data;
}
/*
* Deparse a function by OID.
*
* The function arguments should be given as datums in the vararg list and
* need to be specified in the order given by the (OID) function's signature.
*/
const char *
deparse_drop_chunks_func(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type, Oid newer_than_type,
bool cascade, bool cascades_to_materializations, bool verbose)
deparse_oid_function_call_coll(Oid funcid, Oid collation, unsigned int num_args, ...)
{
Oid out_fn;
bool type_is_varlena;
char *older_than_str = NULL;
char *newer_than_str = NULL;
StringInfoWithSeparator *cmd = init_string_info_with_sep(", ");
FunctionCallInfo fcinfo = palloc(SizeForFunctionCallInfo(num_args));
FmgrInfo flinfo;
const char *result;
va_list args;
unsigned int i;
appendStringInfo(cmd->buff,
"SELECT * FROM %s.drop_chunks(",
quote_identifier(ts_extension_schema_name()));
fmgr_info(funcid, &flinfo);
InitFunctionCallInfoData(*fcinfo, &flinfo, num_args, collation, NULL, NULL);
va_start(args, num_args);
if (older_than_type != InvalidOid)
for (i = 0; i < num_args; i++)
{
const char *type_name = get_typname(older_than_type);
getTypeOutputInfo(older_than_type, &out_fn, &type_is_varlena);
older_than_str = OidOutputFunctionCall(out_fn, older_than_datum);
string_info_with_separator_append(cmd,
"older_than => %s::%s",
quote_literal_cstr(older_than_str),
type_name);
FC_ARG(fcinfo, i) = va_arg(args, Datum);
FC_NULL(fcinfo, i) = false;
}
else
string_info_with_separator_append(cmd, "older_than => NULL");
if (newer_than_type != InvalidOid)
{
const char *type_name = get_typname(newer_than_type);
getTypeOutputInfo(newer_than_type, &out_fn, &type_is_varlena);
newer_than_str = OidOutputFunctionCall(out_fn, newer_than_datum);
string_info_with_separator_append(cmd,
"newer_than => %s::%s",
quote_literal_cstr(newer_than_str),
type_name);
}
else
string_info_with_separator_append(cmd, "newer_than => NULL");
if (table_name != NULL)
string_info_with_separator_append(cmd,
"table_name => %s",
quote_literal_cstr(NameStr(*table_name)));
else
string_info_with_separator_append(cmd, "table_name => NULL");
if (schema_name != NULL)
string_info_with_separator_append(cmd,
"schema_name => %s",
quote_literal_cstr(NameStr(*schema_name)));
else
string_info_with_separator_append(cmd, "schema_name => NULL");
string_info_with_separator_append(cmd, "cascade => '%s'", cascade ? "true" : "false");
string_info_with_separator_append(cmd,
"cascade_to_materializations => '%s'",
cascades_to_materializations ? "true" : "false");
string_info_with_separator_append(cmd, "verbose => '%s'", verbose ? "true" : "false");
appendStringInfo(cmd->buff, ");");
return cmd->buff->data;
va_end(args);
result = deparse_func_call(fcinfo);
/* Check for null result, since caller is clearly not expecting one */
if (fcinfo->isnull)
elog(ERROR, "function %u returned NULL", flinfo.fn_oid);
return result;
}

View File

@ -44,9 +44,7 @@ const char *deparse_get_tabledef_commands_concat(Oid relid);
DeparsedHypertableCommands *deparse_get_distributed_hypertable_create_command(Hypertable *ht);
const char *deparse_drop_chunks_func(Name table_name, Name schema_name, Datum older_than_datum,
Datum newer_than_datum, Oid older_than_type,
Oid newer_than_type, bool cascade,
bool cascades_to_materializations, bool verbose);
const char *deparse_func_call(FunctionCallInfo finfo);
const char *deparse_oid_function_call_coll(Oid funcid, Oid collation, unsigned int num_args, ...);
#endif

View File

@ -15,26 +15,27 @@
#include <access/xact.h>
#include <cache.h>
#include <nodes/pg_list.h>
#include <foreign/foreign.h>
#include <libpq-fe.h>
#include <hypertable_data_node.h>
#include <dimension.h>
#include <errors.h>
#include "errors.h"
#include "fdw/fdw.h"
#include "hypertable.h"
#include "dimension.h"
#include "errors.h"
#include "hypertable.h"
#include "license.h"
#include "utils.h"
#include "hypertable_cache.h"
#if PG_VERSION_SUPPORTS_MULTINODE
#include <foreign/foreign.h>
#include <libpq-fe.h>
#include "hypertable_data_node.h"
#include "fdw/fdw.h"
#include "data_node.h"
#include "deparse.h"
#include "remote/dist_commands.h"
#include "compat.h"
#include "hypertable_data_node.h"
#include <extension.h>
#include "extension.h"
#endif /* PG11_GE */
Datum
hypertable_valid_ts_interval(PG_FUNCTION_ARGS)

View File

@ -7,11 +7,10 @@
#define TIMESCALEDB_TSL_HYPERTABLE_H
#include <hypertable.h>
#include "catalog.h"
#include "dimension.h"
#include "interval.h"
#include "config.h"
#include "catalog.h"
extern Datum hypertable_valid_ts_interval(PG_FUNCTION_ARGS);

View File

@ -33,6 +33,7 @@
#include "continuous_aggs/options.h"
#include "nodes/decompress_chunk/planner.h"
#include "compat.h"
#include "hypertable.h"
#include "hypertable.h"
#include "compression/create.h"
@ -164,6 +165,13 @@ error_data_node_set_block_new_chunks_not_supported(PG_FUNCTION_ARGS, bool block)
pg_unreachable();
}
static void
error_func_call_on_data_nodes_not_supported(FunctionCallInfo fcinfo, List *data_nodes)
{
error_not_supported();
pg_unreachable();
}
#endif /* PG_VERSION_SUPPORTS_MULTINODE */
/*
@ -258,6 +266,7 @@ CrossModuleFunctions tsl_cm_functions = {
.remote_hypertable_info = error_not_supported_default_fn,
.validate_as_data_node = NULL,
.distributed_exec = error_not_supported_default_fn,
.func_call_on_data_nodes = error_func_call_on_data_nodes_not_supported,
#else
.add_data_node = data_node_add,
.delete_data_node = data_node_delete,
@ -288,8 +297,8 @@ CrossModuleFunctions tsl_cm_functions = {
.remove_from_distributed_db = dist_util_remove_from_db,
.remote_hypertable_info = dist_util_remote_hypertable_info,
.validate_as_data_node = validate_data_node_settings,
.drop_chunks_on_data_nodes = chunk_drop_remote_chunks,
.distributed_exec = ts_dist_cmd_exec,
.func_call_on_data_nodes = ts_dist_cmd_func_call_on_data_nodes,
#endif
.cache_syscache_invalidate = cache_syscache_invalidate,
};

View File

@ -17,6 +17,7 @@
#include "dist_util.h"
#include "miscadmin.h"
#include "errors.h"
#include "deparse.h"
typedef struct DistPreparedStmt
{
@ -63,21 +64,40 @@ ts_dist_cmd_collect_responses(List *requests)
return results;
}
/*
* Invoke a SQL statement (command) on the given data nodes.
*
* The list of data nodes can either be a list of data node names, or foreign
* server OIDs.
*/
DistCmdResult *
ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *node_names)
ts_dist_cmd_invoke_on_data_nodes(const char *sql, List *data_nodes)
{
ListCell *lc;
List *requests = NIL;
DistCmdResult *results;
if (node_names == NIL)
if (data_nodes == NIL)
elog(ERROR, "target data nodes must be specified for ts_dist_cmd_invoke_on_data_nodes");
foreach (lc, node_names)
switch (nodeTag(data_nodes))
{
case T_OidList:
data_nodes = data_node_oids_to_node_name_list(data_nodes, ACL_USAGE);
break;
case T_List:
/* Already in the format we want. Just check permissions. */
data_node_name_list_check_acl(data_nodes, ACL_USAGE);
break;
default:
elog(ERROR, "invalid list type %u", nodeTag(data_nodes));
break;
}
foreach (lc, data_nodes)
{
const char *node_name = lfirst(lc);
TSConnection *connection = data_node_get_connection(node_name, REMOTE_TXN_NO_PREP_STMT);
AsyncRequest *req = async_request_send(connection, sql);
async_request_attach_user_data(req, (char *) node_name);
@ -127,6 +147,40 @@ ts_dist_cmd_invoke_on_all_data_nodes(const char *sql)
return ts_dist_cmd_invoke_on_data_nodes(sql, data_node_get_node_name_list());
}
/*
* Relay a function call to data nodes.
*
* A NIL list of data nodes means invoke on ALL data nodes.
*/
DistCmdResult *
ts_dist_cmd_invoke_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_nodes)
{
if (NIL == data_nodes)
data_nodes = data_node_get_node_name_list();
return ts_dist_cmd_invoke_on_data_nodes(deparse_func_call(fcinfo), data_nodes);
}
DistCmdResult *
ts_dist_cmd_invoke_func_call_on_all_data_nodes(FunctionCallInfo fcinfo)
{
return ts_dist_cmd_invoke_on_data_nodes(deparse_func_call(fcinfo),
data_node_get_node_name_list());
}
/*
* Relay a function call to data nodes.
*
* This version throws away the result.
*/
void
ts_dist_cmd_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_nodes)
{
DistCmdResult *result = ts_dist_cmd_invoke_func_call_on_data_nodes(fcinfo, data_nodes);
ts_dist_cmd_close_response(result);
}
PGresult *
ts_dist_cmd_get_data_node_result(DistCmdResult *response, const char *node_name)
{

View File

@ -17,7 +17,10 @@ extern DistCmdResult *ts_dist_cmd_invoke_on_data_nodes_using_search_path(const c
const char *search_path,
List *node_names);
extern DistCmdResult *ts_dist_cmd_invoke_on_all_data_nodes(const char *sql);
extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_all_data_nodes(FunctionCallInfo fcinfo);
extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_data_nodes(FunctionCallInfo fcinfo,
List *data_nodes);
extern void ts_dist_cmd_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_nodes);
extern PGresult *ts_dist_cmd_get_data_node_result(DistCmdResult *response, const char *node_name);
extern void ts_dist_cmd_close_response(DistCmdResult *response);

View File

@ -10,27 +10,49 @@
(1 row)
-- test drop_chunks function deparsing
SELECT * FROM tsl_test_deparse_drop_chunks('2019-01-01'::timestamptz, 'test_table', 'test_schema', cascade => false, verbose => true);
SELECT * FROM tsl_test_deparse_drop_chunks('2019-01-01'::timestamptz, 'Test_table', 'test_schema', cascade => false, verbose => true);
tsl_test_deparse_drop_chunks
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => 'Tue Jan 01 00:00:00 2019 PST'::timestamptz, newer_than => NULL, table_name => 'test_table', schema_name => 'test_schema', cascade => 'false', cascade_to_materializations => 'false', verbose => 'true');
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => 'Tue Jan 01 00:00:00 2019 PST'::timestamp with time zone,table_name => 'Test_table',schema_name => 'test_schema',cascade => 'f',newer_than => NULL,verbose => 't',cascade_to_materializations => NULL)
(1 row)
SELECT * FROM tsl_test_deparse_drop_chunks(interval '1 day', table_name => 'weird nAme\\#^.', cascade_to_materializations => true, cascade => true);
tsl_test_deparse_drop_chunks
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => '@ 1 day'::interval, newer_than => NULL, table_name => E'weird nAme\\\\#^.', schema_name => NULL, cascade => 'true', cascade_to_materializations => 'true', verbose => 'false');
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => '@ 1 day'::interval,table_name => E'weird nAme\\\\#^.',schema_name => NULL,cascade => 't',newer_than => NULL,verbose => 'f',cascade_to_materializations => 't')
(1 row)
SELECT * FROM tsl_test_deparse_drop_chunks(newer_than => 12345);
tsl_test_deparse_drop_chunks
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => NULL, newer_than => '12345'::int4, table_name => NULL, schema_name => NULL, cascade => 'false', cascade_to_materializations => 'false', verbose => 'false');
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => NULL,table_name => NULL,schema_name => NULL,cascade => 'f',newer_than => '12345'::integer,verbose => 'f',cascade_to_materializations => NULL)
(1 row)
SELECT * FROM tsl_test_deparse_drop_chunks(older_than => interval '2 years', newer_than => '2015-01-01'::timestamp);
tsl_test_deparse_drop_chunks
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
SELECT * FROM public.drop_chunks(older_than => '@ 2 years'::interval, newer_than => 'Thu Jan 01 00:00:00 2015'::timestamp, table_name => NULL, schema_name => NULL, cascade => 'false', cascade_to_materializations => 'false', verbose => 'false');
SELECT * FROM public.drop_chunks(older_than => '@ 2 years'::interval,table_name => NULL,schema_name => NULL,cascade => 'f',newer_than => 'Thu Jan 01 00:00:00 2015'::timestamp without time zone,verbose => 'f',cascade_to_materializations => NULL)
(1 row)
-- test generalized deparsing function
SELECT * FROM tsl_test_deparse_scalar_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');
NOTICE: Deparsed: SELECT * FROM public.tsl_test_deparse_scalar_func(schema_name => 'Foo',table_name => 'bar',time => 'Tue Sep 10 11:08:00 2019 PDT',message => 'This is a test message.',not_set => 't',option => 'f')
tsl_test_deparse_scalar_func
------------------------------
t
(1 row)
SELECT * FROM tsl_test_deparse_named_scalar_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');
NOTICE: Deparsed: SELECT option FROM public.tsl_test_deparse_named_scalar_func(schema_name => 'Foo',table_name => 'bar',time => 'Tue Sep 10 11:08:00 2019 PDT',message => 'This is a test message.',not_set => 't',option => 'f')
option
--------
t
(1 row)
SELECT * FROM tsl_test_deparse_composite_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');
NOTICE: Deparsed: SELECT success,message FROM public.tsl_test_deparse_composite_func(schema_name => 'Foo',table_name => 'bar',time => 'Tue Sep 10 11:08:00 2019 PDT',message => 'This is a test message.',not_set => NULL,option => 'f')
success | message
---------+---------
|
(1 row)

View File

@ -16,6 +16,33 @@ CREATE OR REPLACE FUNCTION tsl_test_deparse_drop_chunks(older_than "any" = NULL,
verbose BOOLEAN = FALSE,
cascade_to_materializations BOOLEAN = NULL) RETURNS TEXT
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_drop_chunks' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_scalar_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = TRUE,
option BOOLEAN = FALSE
) RETURNS BOOLEAN
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_named_scalar_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = TRUE,
INOUT option BOOLEAN = FALSE
)
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_composite_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = NULL,
option BOOLEAN = FALSE
) RETURNS TABLE (success BOOLEAN, message TEXT)
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
SET ROLE :ROLE_DEFAULT_PERM_USER;
\set ON_ERROR_STOP 0
CREATE TEMP TABLE fail_table1(x INT);

View File

@ -31,7 +31,14 @@ SELECT 'TABLE DEPARSE TEST DONE';
\set ECHO all
-- test drop_chunks function deparsing
SELECT * FROM tsl_test_deparse_drop_chunks('2019-01-01'::timestamptz, 'test_table', 'test_schema', cascade => false, verbose => true);
SELECT * FROM tsl_test_deparse_drop_chunks('2019-01-01'::timestamptz, 'Test_table', 'test_schema', cascade => false, verbose => true);
SELECT * FROM tsl_test_deparse_drop_chunks(interval '1 day', table_name => 'weird nAme\\#^.', cascade_to_materializations => true, cascade => true);
SELECT * FROM tsl_test_deparse_drop_chunks(newer_than => 12345);
SELECT * FROM tsl_test_deparse_drop_chunks(older_than => interval '2 years', newer_than => '2015-01-01'::timestamp);
-- test generalized deparsing function
SELECT * FROM tsl_test_deparse_scalar_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');
SELECT * FROM tsl_test_deparse_named_scalar_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');
SELECT * FROM tsl_test_deparse_composite_func(schema_name => 'Foo', table_name => 'bar', option => false, "time" => timestamp '2019-09-10 11:08', message => 'This is a test message.');

View File

@ -16,4 +16,34 @@ CREATE OR REPLACE FUNCTION tsl_test_deparse_drop_chunks(older_than "any" = NULL,
cascade_to_materializations BOOLEAN = NULL) RETURNS TEXT
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_drop_chunks' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_scalar_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = TRUE,
option BOOLEAN = FALSE
) RETURNS BOOLEAN
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_named_scalar_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = TRUE,
INOUT option BOOLEAN = FALSE
)
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
CREATE OR REPLACE FUNCTION tsl_test_deparse_composite_func(
schema_name NAME = NULL,
table_name NAME = NULL,
"time" TIMESTAMPTZ = NOW(),
message TEXT = NULL,
not_set BOOLEAN = NULL,
option BOOLEAN = FALSE
) RETURNS TABLE (success BOOLEAN, message TEXT)
AS :TSL_MODULE_PATHNAME, 'tsl_test_deparse_func' LANGUAGE C VOLATILE;
SET ROLE :ROLE_DEFAULT_PERM_USER;

View File

@ -5,11 +5,18 @@
*/
#include <postgres.h>
#include <utils/builtins.h>
#include <access/htup.h>
#include <access/htup_details.h>
#include <catalog/pg_type.h>
#include <funcapi.h>
#include <utils.h>
#include <compat.h>
#include <extension.h>
#include "export.h"
#include "deparse.h"
TS_FUNCTION_INFO_V1(tsl_test_get_tabledef);
TS_FUNCTION_INFO_V1(tsl_test_deparse_drop_chunks);
Datum
tsl_test_get_tabledef(PG_FUNCTION_ARGS)
@ -19,29 +26,82 @@ tsl_test_get_tabledef(PG_FUNCTION_ARGS)
PG_RETURN_TEXT_P(cstring_to_text(cmd));
}
TS_FUNCTION_INFO_V1(tsl_test_deparse_drop_chunks);
Datum
tsl_test_deparse_drop_chunks(PG_FUNCTION_ARGS)
{
Name table_name = PG_ARGISNULL(1) ? NULL : PG_GETARG_NAME(1);
Name schema_name = PG_ARGISNULL(2) ? NULL : PG_GETARG_NAME(2);
Datum older_than_datum = PG_GETARG_DATUM(0);
Datum newer_than_datum = PG_GETARG_DATUM(4);
Oid older_than_type = PG_ARGISNULL(0) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 0);
Oid newer_than_type = PG_ARGISNULL(4) ? InvalidOid : get_fn_expr_argtype(fcinfo->flinfo, 4);
bool cascade = PG_GETARG_BOOL(3);
bool verbose = PG_ARGISNULL(5) ? false : PG_GETARG_BOOL(5);
bool cascades_to_materializations = PG_ARGISNULL(6) ? false : PG_GETARG_BOOL(6);
FmgrInfo flinfo;
FunctionCallInfo fcinfo2 = palloc(SizeForFunctionCallInfo(fcinfo->nargs));
Oid argtypes[7] = { ANYOID, NAMEOID, NAMEOID, BOOLOID, ANYOID, BOOLOID, BOOLOID };
Oid funcid = ts_get_function_oid("drop_chunks", ts_extension_schema_name(), 7, argtypes);
const char *sql_cmd;
int i;
sql_cmd = deparse_drop_chunks_func(table_name,
schema_name,
older_than_datum,
newer_than_datum,
older_than_type,
newer_than_type,
cascade,
cascades_to_materializations,
verbose);
fmgr_info(funcid, &flinfo);
InitFunctionCallInfoData(*fcinfo2,
&flinfo,
fcinfo->nargs,
fcinfo->fncollation,
fcinfo->context,
fcinfo->resultinfo);
/* Copy over the arguments into the new function call data */
for (i = 0; i < fcinfo->nargs; i++)
{
FC_ARG(fcinfo2, i) = FC_ARG(fcinfo, i);
FC_NULL(fcinfo2, i) = FC_NULL(fcinfo, i);
}
/* Use the expression from this function so that the deparse function can
* result the data types of args with ANY type */
fcinfo2->flinfo->fn_expr = fcinfo->flinfo->fn_expr;
sql_cmd = deparse_func_call(fcinfo2);
PG_RETURN_TEXT_P(cstring_to_text(sql_cmd));
}
TS_FUNCTION_INFO_V1(tsl_test_deparse_func);
Datum
tsl_test_deparse_func(PG_FUNCTION_ARGS)
{
TupleDesc tupdesc;
Oid resulttypeid;
const char *deparsed = deparse_func_call(fcinfo);
Datum retval;
elog(NOTICE, "Deparsed: %s", deparsed);
switch (get_call_result_type(fcinfo, &resulttypeid, &tupdesc))
{
case TYPEFUNC_SCALAR:
retval = BoolGetDatum(true);
break;
case TYPEFUNC_COMPOSITE:
{
Datum *values = palloc(tupdesc->natts * sizeof(Datum));
bool *nulls = palloc(tupdesc->natts * sizeof(bool));
HeapTuple tup;
int i;
for (i = 0; i < tupdesc->natts; i++)
nulls[i] = true;
tup = heap_form_tuple(tupdesc, values, nulls);
pfree(values);
pfree(nulls);
retval = HeapTupleGetDatum(tup);
break;
}
case TYPEFUNC_RECORD:
/* indeterminate rowtype result */
case TYPEFUNC_COMPOSITE_DOMAIN:
/* domain over determinable rowtype result */
case TYPEFUNC_OTHER:
elog(ERROR, "unsupported result type for deparsing");
break;
}
PG_RETURN_DATUM(retval);
}