mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 02:53:51 +08:00
Multinode-related APIs now raise errors when called any PostgreSQL version below 11, as these versions do not have the required features to support multinode or have different behavior. Raising errors at runtime on affected APIs is preferred over excluding these functions altogether. Having a different user-facing SQL API would severly complicate the upgrade process for the extension. A new CMake check has been added to disable multinode features on unsupported PostgreSQL versions. It also generates a macro in `config.h` that can be used in code to check for multinode support.
446 lines
12 KiB
C
446 lines
12 KiB
C
/*
|
|
* This file and its contents are licensed under the Timescale License.
|
|
* Please see the included NOTICE for copyright information and
|
|
* LICENSE-TIMESCALE for a copy of the license.
|
|
*/
|
|
#include <postgres.h>
|
|
#include <utils/jsonb.h>
|
|
#include <utils/lsyscache.h>
|
|
#include <utils/builtins.h>
|
|
#include <catalog/pg_type.h>
|
|
#include <fmgr.h>
|
|
#include <funcapi.h>
|
|
#include <miscadmin.h>
|
|
|
|
#include <catalog.h>
|
|
#include <compat.h>
|
|
#include <chunk.h>
|
|
#include <chunk_server.h>
|
|
#include <errors.h>
|
|
#include <hypercube.h>
|
|
#include <hypertable.h>
|
|
#include <hypertable_cache.h>
|
|
|
|
#include "remote/async.h"
|
|
#include "remote/dist_txn.h"
|
|
#include "chunk_api.h"
|
|
|
|
/*
|
|
* Convert a hypercube to a JSONB value.
|
|
*
|
|
* For instance, a two-dimensional hypercube, with dimensions "time" and
|
|
* "device", might look as follows:
|
|
*
|
|
* {"time": [1514419200000000, 1515024000000000],
|
|
* "device": [-9223372036854775808, 1073741823]}
|
|
*/
|
|
static JsonbValue *
|
|
hypercube_to_jsonb_value(Hypercube *hc, Hyperspace *hs, JsonbParseState **ps)
|
|
{
|
|
int i;
|
|
|
|
Assert(hs->num_dimensions == hc->num_slices);
|
|
|
|
pushJsonbValue(ps, WJB_BEGIN_OBJECT, NULL);
|
|
|
|
for (i = 0; i < hc->num_slices; i++)
|
|
{
|
|
JsonbValue k, v;
|
|
char *dim_name = NameStr(hs->dimensions[i].fd.column_name);
|
|
Datum range_start =
|
|
DirectFunctionCall1(int8_numeric, Int64GetDatum(hc->slices[i]->fd.range_start));
|
|
Datum range_end =
|
|
DirectFunctionCall1(int8_numeric, Int64GetDatum(hc->slices[i]->fd.range_end));
|
|
|
|
Assert(hs->dimensions[i].fd.id == hc->slices[i]->fd.dimension_id);
|
|
|
|
k.type = jbvString;
|
|
k.val.string.len = strlen(dim_name);
|
|
k.val.string.val = dim_name;
|
|
|
|
pushJsonbValue(ps, WJB_KEY, &k);
|
|
pushJsonbValue(ps, WJB_BEGIN_ARRAY, NULL);
|
|
|
|
v.type = jbvNumeric;
|
|
v.val.numeric = DatumGetNumeric(range_start);
|
|
pushJsonbValue(ps, WJB_ELEM, &v);
|
|
v.val.numeric = DatumGetNumeric(range_end);
|
|
pushJsonbValue(ps, WJB_ELEM, &v);
|
|
|
|
pushJsonbValue(ps, WJB_END_ARRAY, NULL);
|
|
}
|
|
|
|
return pushJsonbValue(ps, WJB_END_OBJECT, NULL);
|
|
}
|
|
|
|
/*
|
|
* Create a hypercube from a JSONB object.
|
|
*
|
|
* Takes a JSONB object with a hypercube's dimensional constraints and outputs
|
|
* a Hypercube. The JSONB is the same format as output by
|
|
* hypercube_to_jsonb_value() above, i.e.:
|
|
*
|
|
* {"time": [1514419200000000, 1515024000000000],
|
|
* "device": [-9223372036854775808, 1073741823]}
|
|
*/
|
|
static Hypercube *
|
|
hypercube_from_jsonb(Jsonb *json, Hyperspace *hs, const char **parse_error)
|
|
{
|
|
JsonbIterator *it;
|
|
JsonbIteratorToken type;
|
|
JsonbValue v;
|
|
Hypercube *hc = NULL;
|
|
const char *err = NULL;
|
|
|
|
it = JsonbIteratorInit(&json->root);
|
|
|
|
type = JsonbIteratorNext(&it, &v, false);
|
|
|
|
if (type != WJB_BEGIN_OBJECT)
|
|
{
|
|
err = "invalid JSON format";
|
|
goto out_err;
|
|
}
|
|
|
|
if (v.val.object.nPairs != hs->num_dimensions)
|
|
{
|
|
err = "invalid number of hypercube dimensions";
|
|
goto out_err;
|
|
}
|
|
|
|
hc = ts_hypercube_alloc(hs->num_dimensions);
|
|
|
|
while ((type = JsonbIteratorNext(&it, &v, false)))
|
|
{
|
|
int i;
|
|
Dimension *dim;
|
|
DimensionSlice *slice;
|
|
int64 range[2];
|
|
const char *name;
|
|
|
|
if (type == WJB_END_OBJECT)
|
|
break;
|
|
|
|
if (type != WJB_KEY)
|
|
{
|
|
err = "invalid JSON format";
|
|
goto out_err;
|
|
}
|
|
|
|
name = pnstrdup(v.val.string.val, v.val.string.len);
|
|
dim = ts_hyperspace_get_dimension_by_name(hs, DIMENSION_TYPE_ANY, name);
|
|
|
|
if (NULL == dim)
|
|
{
|
|
err = psprintf("dimension \"%s\" does not exist in hypertable", name);
|
|
goto out_err;
|
|
}
|
|
|
|
type = JsonbIteratorNext(&it, &v, false);
|
|
|
|
if (type != WJB_BEGIN_ARRAY)
|
|
{
|
|
err = "invalid JSON format";
|
|
goto out_err;
|
|
}
|
|
|
|
if (v.val.array.nElems != 2)
|
|
{
|
|
err = psprintf("unexpected number of dimensional bounds for dimension \"%s\"", name);
|
|
goto out_err;
|
|
}
|
|
|
|
for (i = 0; i < 2; i++)
|
|
{
|
|
type = JsonbIteratorNext(&it, &v, false);
|
|
|
|
if (type != WJB_ELEM)
|
|
{
|
|
err = "invalid JSON format";
|
|
goto out_err;
|
|
}
|
|
|
|
if (v.type != jbvNumeric)
|
|
{
|
|
err = psprintf("constraint for dimension \"%s\" is not numeric", name);
|
|
goto out_err;
|
|
}
|
|
|
|
range[i] =
|
|
DatumGetInt64(DirectFunctionCall1(numeric_int8, NumericGetDatum(v.val.numeric)));
|
|
}
|
|
|
|
type = JsonbIteratorNext(&it, &v, false);
|
|
|
|
if (type != WJB_END_ARRAY)
|
|
{
|
|
err = "invalid JSON format";
|
|
goto out_err;
|
|
}
|
|
|
|
slice = ts_dimension_slice_create(dim->fd.id, range[0], range[1]);
|
|
ts_hypercube_add_slice(hc, slice);
|
|
}
|
|
|
|
out_err:
|
|
if (NULL != parse_error)
|
|
*parse_error = err;
|
|
|
|
if (NULL != err)
|
|
return NULL;
|
|
|
|
return hc;
|
|
}
|
|
|
|
enum Anum_create_chunk
|
|
{
|
|
Anum_create_chunk_id = 1,
|
|
Anum_create_chunk_hypertable_id,
|
|
Anum_create_chunk_schema_name,
|
|
Anum_create_chunk_table_name,
|
|
Anum_create_chunk_relkind,
|
|
Anum_create_chunk_slices,
|
|
Anum_create_chunk_created,
|
|
_Anum_create_chunk_max,
|
|
};
|
|
|
|
#define Natts_create_chunk (_Anum_create_chunk_max - 1)
|
|
|
|
static HeapTuple
|
|
chunk_form_tuple(Chunk *chunk, Hypertable *ht, TupleDesc tupdesc, bool created)
|
|
{
|
|
Datum values[Natts_create_chunk];
|
|
bool nulls[Natts_create_chunk] = { false };
|
|
JsonbParseState *ps = NULL;
|
|
JsonbValue *jv = hypercube_to_jsonb_value(chunk->cube, ht->space, &ps);
|
|
|
|
if (NULL == jv)
|
|
return NULL;
|
|
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_id)] = Int32GetDatum(chunk->fd.id);
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_hypertable_id)] =
|
|
Int32GetDatum(chunk->fd.hypertable_id);
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_schema_name)] =
|
|
NameGetDatum(&chunk->fd.schema_name);
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_table_name)] =
|
|
NameGetDatum(&chunk->fd.table_name);
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_relkind)] = CharGetDatum(chunk->relkind);
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_slices)] =
|
|
JsonbPGetDatum(JsonbValueToJsonb(jv));
|
|
values[AttrNumberGetAttrOffset(Anum_create_chunk_created)] = BoolGetDatum(created);
|
|
|
|
return heap_form_tuple(tupdesc, values, nulls);
|
|
}
|
|
|
|
Datum
|
|
chunk_show(PG_FUNCTION_ARGS)
|
|
{
|
|
Oid chunk_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
|
|
Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
|
|
Cache *hcache = ts_hypertable_cache_pin();
|
|
Hypertable *ht =
|
|
ts_hypertable_cache_get_entry(hcache, chunk->hypertable_relid, CACHE_FLAG_NONE);
|
|
TupleDesc tupdesc;
|
|
HeapTuple tuple;
|
|
|
|
Assert(NULL != chunk);
|
|
Assert(NULL != ht);
|
|
|
|
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("function returning record called in context "
|
|
"that cannot accept type record")));
|
|
|
|
/*
|
|
* We use the create_chunk tuple for show_chunk, because they only differ
|
|
* in the created column at the end. That column will not be included here
|
|
* since it is not part of the tuple descriptor.
|
|
*/
|
|
tuple = chunk_form_tuple(chunk, ht, tupdesc, false);
|
|
|
|
ts_cache_release(hcache);
|
|
|
|
if (NULL == tuple)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_TS_INTERNAL_ERROR), errmsg("could not create tuple from chunk")));
|
|
|
|
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
|
|
}
|
|
|
|
Datum
|
|
chunk_create(PG_FUNCTION_ARGS)
|
|
{
|
|
Oid hypertable_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
|
|
Jsonb *slices = PG_ARGISNULL(1) ? NULL : PG_GETARG_JSONB_P(1);
|
|
const char *schema_name = PG_ARGISNULL(2) ? NULL : PG_GETARG_CSTRING(2);
|
|
const char *table_name = PG_ARGISNULL(3) ? NULL : PG_GETARG_CSTRING(3);
|
|
Cache *hcache = ts_hypertable_cache_pin();
|
|
Hypertable *ht = ts_hypertable_cache_get_entry(hcache, hypertable_relid, CACHE_FLAG_NONE);
|
|
Hypercube *hc;
|
|
Chunk *chunk;
|
|
TupleDesc tupdesc;
|
|
HeapTuple tuple;
|
|
bool created;
|
|
const char *parse_err;
|
|
|
|
Assert(NULL != ht);
|
|
|
|
if (NULL == slices)
|
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid slices")));
|
|
|
|
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("function returning record called in context "
|
|
"that cannot accept type record")));
|
|
|
|
hc = hypercube_from_jsonb(slices, ht->space, &parse_err);
|
|
|
|
if (NULL == hc)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
|
errmsg("invalid hypercube for hypertable \"%s\"", get_rel_name(hypertable_relid)),
|
|
errdetail("%s", parse_err)));
|
|
|
|
chunk = ts_chunk_find_or_create_without_cuts(ht, hc, schema_name, table_name, &created);
|
|
|
|
Assert(NULL != chunk);
|
|
|
|
tuple = chunk_form_tuple(chunk, ht, tupdesc, created);
|
|
|
|
ts_cache_release(hcache);
|
|
|
|
if (NULL == tuple)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_TS_INTERNAL_ERROR), errmsg("could not create tuple from chunk")));
|
|
|
|
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
|
|
}
|
|
|
|
#if PG_VERSION_SUPPORTS_MULTINODE
|
|
#define CREATE_CHUNK_FUNCTION_NAME "create_chunk"
|
|
#define CHUNK_CREATE_STMT \
|
|
"SELECT * FROM " INTERNAL_SCHEMA_NAME "." CREATE_CHUNK_FUNCTION_NAME "($1, $2, $3, $4)"
|
|
|
|
#define ESTIMATE_JSON_STR_SIZE(num_dims) (60 * (num_dims))
|
|
|
|
static Oid create_chunk_argtypes[4] = { REGCLASSOID, JSONBOID, NAMEOID, NAMEOID };
|
|
|
|
/*
|
|
* Fill in / get the TupleDesc for the result type of the create_chunk()
|
|
* function.
|
|
*/
|
|
static void
|
|
get_create_chunk_result_type(TupleDesc *tupdesc)
|
|
{
|
|
Oid funcoid = ts_get_function_oid(CREATE_CHUNK_FUNCTION_NAME,
|
|
INTERNAL_SCHEMA_NAME,
|
|
4,
|
|
create_chunk_argtypes);
|
|
|
|
if (get_func_result_type(funcoid, NULL, tupdesc) != TYPEFUNC_COMPOSITE)
|
|
ereport(ERROR,
|
|
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
|
errmsg("function returning record called in context "
|
|
"that cannot accept type record")));
|
|
}
|
|
|
|
static void
|
|
get_result_datums(Datum *values, bool *nulls, unsigned int numvals, AttInMetadata *attinmeta,
|
|
PGresult *res)
|
|
{
|
|
unsigned int i;
|
|
|
|
memset(nulls, 0, sizeof(bool) * numvals);
|
|
|
|
for (i = 0; i < numvals; i++)
|
|
{
|
|
if (PQgetisnull(res, 0, i))
|
|
nulls[i] = true;
|
|
else
|
|
values[i] = InputFunctionCall(&attinmeta->attinfuncs[i],
|
|
PQgetvalue(res, 0, i),
|
|
attinmeta->attioparams[i],
|
|
attinmeta->atttypmods[i]);
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Create a replica of a chunk on all its assigned servers.
|
|
*/
|
|
void
|
|
chunk_api_create_on_servers(Chunk *chunk, Hypertable *ht)
|
|
{
|
|
AsyncRequestSet *reqset = async_request_set_create();
|
|
JsonbParseState *ps = NULL;
|
|
JsonbValue *jv = hypercube_to_jsonb_value(chunk->cube, ht->space, &ps);
|
|
Jsonb *hcjson = JsonbValueToJsonb(jv);
|
|
const char *params[4] = {
|
|
quote_qualified_identifier(NameStr(ht->fd.schema_name), NameStr(ht->fd.table_name)),
|
|
JsonbToCString(NULL, &hcjson->root, ESTIMATE_JSON_STR_SIZE(ht->space->num_dimensions)),
|
|
NameStr(chunk->fd.schema_name),
|
|
NameStr(chunk->fd.table_name),
|
|
};
|
|
AsyncResponseResult *res;
|
|
ListCell *lc;
|
|
TupleDesc tupdesc;
|
|
AttInMetadata *attinmeta;
|
|
|
|
get_create_chunk_result_type(&tupdesc);
|
|
attinmeta = TupleDescGetAttInMetadata(tupdesc);
|
|
|
|
foreach (lc, chunk->servers)
|
|
{
|
|
ChunkServer *cs = lfirst(lc);
|
|
UserMapping *um = GetUserMapping(GetUserId(), cs->foreign_server_oid);
|
|
PGconn *conn = remote_dist_txn_get_connection(um, REMOTE_TXN_NO_PREP_STMT);
|
|
AsyncRequest *req;
|
|
|
|
req = async_request_send_with_params(conn, CHUNK_CREATE_STMT, 4, params);
|
|
|
|
async_request_attach_user_data(req, cs);
|
|
async_request_set_add(reqset, req);
|
|
}
|
|
|
|
while ((res = async_request_set_wait_ok_result(reqset)) != NULL)
|
|
{
|
|
PGresult *pgres = async_response_result_get_pg_result(res);
|
|
ChunkServer *cs = async_response_result_get_user_data(res);
|
|
Datum *values = palloc0(sizeof(Datum) * tupdesc->natts);
|
|
bool *nulls = palloc0(sizeof(bool) * tupdesc->natts);
|
|
const char *schema_name, *table_name;
|
|
bool created;
|
|
|
|
get_result_datums(values, nulls, tupdesc->natts, attinmeta, pgres);
|
|
|
|
created = DatumGetBool(values[AttrNumberGetAttrOffset(Anum_create_chunk_created)]);
|
|
|
|
/*
|
|
* Sanity check the result. Use error rather than an assert since this
|
|
* is the result of a remote call to a server that could potentially
|
|
* run a different version of the remote function than we'd expect.
|
|
*/
|
|
if (!created)
|
|
elog(ERROR, "chunk creation failed on server \"%s\"", NameStr(cs->fd.server_name));
|
|
|
|
if (nulls[AttrNumberGetAttrOffset(Anum_create_chunk_id)] ||
|
|
nulls[AttrNumberGetAttrOffset(Anum_create_chunk_schema_name)] ||
|
|
nulls[AttrNumberGetAttrOffset(Anum_create_chunk_table_name)])
|
|
elog(ERROR, "unexpected chunk creation result on remote server");
|
|
|
|
schema_name =
|
|
DatumGetCString(values[AttrNumberGetAttrOffset(Anum_create_chunk_schema_name)]);
|
|
table_name = DatumGetCString(values[AttrNumberGetAttrOffset(Anum_create_chunk_table_name)]);
|
|
|
|
if (namestrcmp(&chunk->fd.schema_name, schema_name) != 0 ||
|
|
namestrcmp(&chunk->fd.table_name, table_name) != 0)
|
|
elog(ERROR, "remote chunk has mismatching schema or table name");
|
|
|
|
cs->fd.server_chunk_id =
|
|
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_create_chunk_id)]);
|
|
}
|
|
}
|
|
|
|
#endif /* PG_VERSION_SUPPORTS_MULTINODE */
|