mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-24 23:34:25 +08:00
Add function to fetch remote chunk relation stats
A new function, `get_chunk_relstats()`, allows fetching relstats (basically `pg_class.{relpages,reltuples`) from remote chunks on data nodes and writing it to the `pg_class` entry for the corresponding local chunk. The function expects either a chunk or a hypertable as input and returns the relstats for the given chunk or all chunks for the given hypertable, respectively. Importing relstats as described is useful as part of a distributed ANALYZE/VACUUM that won't require fetching all data into the access node for local sampling (like the current implemention does). In a future change, this function will be called as part of a local ANALYZE on the access node that runs ANALYZE on all data nodes followed by importing of the resulting relstats for the analyzed chunks.
This commit is contained in:
parent
3a35b984f8
commit
6a9db8a621
sql
src
tsl
@ -56,3 +56,8 @@ AS '@MODULE_PATHNAME@', 'ts_chunk_create' LANGUAGE C VOLATILE;
|
||||
-- change default data node for a chunk
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.set_chunk_default_data_node(chunk REGCLASS, node_name NAME) RETURNS BOOLEAN
|
||||
AS '@MODULE_PATHNAME@', 'ts_chunk_set_default_data_node' LANGUAGE C VOLATILE;
|
||||
|
||||
-- Get chunk stats.
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.get_chunk_relstats(relid REGCLASS)
|
||||
RETURNS TABLE(chunk_id INTEGER, hypertable_id INTEGER, num_pages INTEGER, num_tuples REAL, num_allvisible INTEGER)
|
||||
AS '@MODULE_PATHNAME@', 'ts_chunk_get_relstats' LANGUAGE C VOLATILE;
|
||||
|
@ -119,28 +119,45 @@ chunk_data_node_tuple_found(TupleInfo *ti, void *data)
|
||||
|
||||
static int
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(int32 chunk_id, const char *node_name,
|
||||
bool scan_by_remote_chunk_id,
|
||||
tuple_found_func tuple_found, void *data,
|
||||
LOCKMODE lockmode, MemoryContext mctx)
|
||||
{
|
||||
ScanKeyData scankey[2];
|
||||
int nkeys = 0;
|
||||
int attrnum_chunk_id;
|
||||
int attrnum_node_name;
|
||||
int indexid;
|
||||
|
||||
if (scan_by_remote_chunk_id)
|
||||
{
|
||||
attrnum_chunk_id = Anum_chunk_data_node_node_chunk_id_node_name_idx_chunk_id;
|
||||
attrnum_node_name = Anum_chunk_data_node_node_chunk_id_node_name_idx_node_name;
|
||||
indexid = CHUNK_DATA_NODE_NODE_CHUNK_ID_NODE_NAME_IDX;
|
||||
}
|
||||
else
|
||||
{
|
||||
attrnum_chunk_id = Anum_chunk_data_node_chunk_id_node_name_idx_chunk_id;
|
||||
attrnum_node_name = Anum_chunk_data_node_chunk_id_node_name_idx_node_name;
|
||||
indexid = CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX;
|
||||
}
|
||||
|
||||
ScanKeyInit(&scankey[nkeys++],
|
||||
Anum_chunk_data_node_chunk_id_node_name_idx_chunk_id,
|
||||
attrnum_chunk_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(chunk_id));
|
||||
|
||||
if (NULL != node_name)
|
||||
ScanKeyInit(&scankey[nkeys++],
|
||||
Anum_chunk_data_node_chunk_id_node_name_idx_node_name,
|
||||
attrnum_node_name,
|
||||
BTEqualStrategyNumber,
|
||||
F_NAMEEQ,
|
||||
DirectFunctionCall1(namein, CStringGetDatum(node_name)));
|
||||
|
||||
return chunk_data_node_scan_limit_internal(scankey,
|
||||
nkeys,
|
||||
CHUNK_DATA_NODE_CHUNK_ID_NODE_NAME_IDX,
|
||||
indexid,
|
||||
tuple_found,
|
||||
data,
|
||||
0,
|
||||
@ -177,6 +194,7 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx)
|
||||
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
|
||||
NULL,
|
||||
false,
|
||||
chunk_data_node_tuple_found,
|
||||
&chunk_data_nodes,
|
||||
AccessShareLock,
|
||||
@ -184,15 +202,16 @@ ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, MemoryContext mctx)
|
||||
return chunk_data_nodes;
|
||||
}
|
||||
|
||||
ChunkDataNode *
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
MemoryContext mctx)
|
||||
static ChunkDataNode *
|
||||
chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
bool scan_by_remote_chunk_id, MemoryContext mctx)
|
||||
|
||||
{
|
||||
List *chunk_data_nodes = NIL;
|
||||
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
|
||||
node_name,
|
||||
scan_by_remote_chunk_id,
|
||||
chunk_data_node_tuple_found,
|
||||
&chunk_data_nodes,
|
||||
AccessShareLock,
|
||||
@ -204,6 +223,22 @@ ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *no
|
||||
return linitial(chunk_data_nodes);
|
||||
}
|
||||
|
||||
ChunkDataNode *
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
MemoryContext mctx)
|
||||
|
||||
{
|
||||
return chunk_data_node_scan_by_chunk_id_and_node_name(chunk_id, node_name, false, mctx);
|
||||
}
|
||||
|
||||
ChunkDataNode *
|
||||
ts_chunk_data_node_scan_by_remote_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
MemoryContext mctx)
|
||||
|
||||
{
|
||||
return chunk_data_node_scan_by_chunk_id_and_node_name(chunk_id, node_name, true, mctx);
|
||||
}
|
||||
|
||||
static ScanTupleResult
|
||||
chunk_data_node_tuple_delete(TupleInfo *ti, void *data)
|
||||
{
|
||||
@ -221,6 +256,7 @@ ts_chunk_data_node_delete_by_chunk_id(int32 chunk_id)
|
||||
{
|
||||
return ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
|
||||
NULL,
|
||||
false,
|
||||
chunk_data_node_tuple_delete,
|
||||
NULL,
|
||||
RowExclusiveLock,
|
||||
@ -232,6 +268,7 @@ ts_chunk_data_node_delete_by_chunk_id_and_node_name(int32 chunk_id, const char *
|
||||
{
|
||||
return ts_chunk_data_node_scan_by_chunk_id_and_node_internal(chunk_id,
|
||||
node_name,
|
||||
false,
|
||||
chunk_data_node_tuple_delete,
|
||||
NULL,
|
||||
RowExclusiveLock,
|
||||
|
@ -19,6 +19,9 @@ extern TSDLLEXPORT List *ts_chunk_data_node_scan_by_chunk_id(int32 chunk_id, Mem
|
||||
extern TSDLLEXPORT ChunkDataNode *
|
||||
ts_chunk_data_node_scan_by_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
MemoryContext mctx);
|
||||
extern TSDLLEXPORT ChunkDataNode *
|
||||
ts_chunk_data_node_scan_by_remote_chunk_id_and_node_name(int32 chunk_id, const char *node_name,
|
||||
MemoryContext mctx);
|
||||
extern TSDLLEXPORT void ts_chunk_data_node_insert(ChunkDataNode *node);
|
||||
extern void ts_chunk_data_node_insert_multi(List *chunk_data_nodes);
|
||||
extern int ts_chunk_data_node_delete_by_chunk_id(int32 chunk_id);
|
||||
|
@ -53,6 +53,7 @@ TS_FUNCTION_INFO_V1(ts_data_node_detach);
|
||||
TS_FUNCTION_INFO_V1(ts_data_node_block_new_chunks);
|
||||
TS_FUNCTION_INFO_V1(ts_data_node_allow_new_chunks);
|
||||
TS_FUNCTION_INFO_V1(ts_chunk_set_default_data_node);
|
||||
TS_FUNCTION_INFO_V1(ts_chunk_get_relstats);
|
||||
TS_FUNCTION_INFO_V1(ts_timescaledb_fdw_handler);
|
||||
TS_FUNCTION_INFO_V1(ts_timescaledb_fdw_validator);
|
||||
TS_FUNCTION_INFO_V1(ts_remote_txn_id_in);
|
||||
@ -172,6 +173,12 @@ ts_chunk_set_default_data_node(PG_FUNCTION_ARGS)
|
||||
PG_RETURN_DATUM(ts_cm_functions->set_chunk_default_data_node(fcinfo));
|
||||
}
|
||||
|
||||
Datum
|
||||
ts_chunk_get_relstats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_RETURN_DATUM(ts_cm_functions->get_chunk_relstats(fcinfo));
|
||||
}
|
||||
|
||||
Datum
|
||||
ts_timescaledb_fdw_handler(PG_FUNCTION_ARGS)
|
||||
{
|
||||
@ -644,6 +651,7 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
|
||||
.remote_hypertable_info = error_no_default_fn_pg_community,
|
||||
.validate_as_data_node = error_no_default_fn_community,
|
||||
.func_call_on_data_nodes = func_call_on_data_nodes_default,
|
||||
.get_chunk_relstats = error_no_default_fn_pg_community,
|
||||
};
|
||||
|
||||
TSDLLEXPORT CrossModuleFunctions *ts_cm_functions = &ts_cm_functions_default;
|
||||
|
@ -143,6 +143,7 @@ typedef struct CrossModuleFunctions
|
||||
void (*validate_as_data_node)(void);
|
||||
void (*func_call_on_data_nodes)(FunctionCallInfo fcinfo, List *data_node_oids);
|
||||
PGFunction distributed_exec;
|
||||
PGFunction get_chunk_relstats;
|
||||
} CrossModuleFunctions;
|
||||
|
||||
extern TSDLLEXPORT CrossModuleFunctions *ts_cm_functions;
|
||||
|
@ -6,8 +6,17 @@
|
||||
#include <postgres.h>
|
||||
#include <utils/jsonb.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/syscache.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <catalog/pg_type.h>
|
||||
#include <catalog/pg_class.h>
|
||||
#include <catalog/pg_inherits.h>
|
||||
#include <access/htup.h>
|
||||
#include <access/htup_details.h>
|
||||
#include <access/visibilitymap.h>
|
||||
#include <access/xact.h>
|
||||
#include <access/multixact.h>
|
||||
#include <commands/vacuum.h>
|
||||
#include <fmgr.h>
|
||||
#include <funcapi.h>
|
||||
#include <miscadmin.h>
|
||||
@ -24,6 +33,8 @@
|
||||
#include "remote/async.h"
|
||||
#include "remote/dist_txn.h"
|
||||
#include "remote/stmt_params.h"
|
||||
#include "remote/dist_commands.h"
|
||||
#include "remote/tuplefactory.h"
|
||||
#include "chunk_api.h"
|
||||
|
||||
/*
|
||||
@ -447,3 +458,259 @@ chunk_api_create_on_data_nodes(Chunk *chunk, Hypertable *ht)
|
||||
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_create_chunk_id)]);
|
||||
}
|
||||
}
|
||||
|
||||
enum Anum_chunk_relstats
|
||||
{
|
||||
Anum_chunk_relstats_chunk_id = 1,
|
||||
Anum_chunk_relstats_hypertable_id,
|
||||
Anum_chunk_relstats_num_pages,
|
||||
Anum_chunk_relstats_num_tuples,
|
||||
Anum_chunk_relstats_num_allvisible,
|
||||
_Anum_chunk_relstats_max,
|
||||
};
|
||||
|
||||
/*
|
||||
* Construct a tuple for the get_chunk_relstats SQL function.
|
||||
*/
|
||||
static HeapTuple
|
||||
chunk_get_single_stats_tuple(Chunk *chunk, TupleDesc tupdesc)
|
||||
{
|
||||
HeapTuple ctup;
|
||||
Form_pg_class pgcform;
|
||||
Datum values[_Anum_chunk_relstats_max];
|
||||
bool nulls[_Anum_chunk_relstats_max] = { false };
|
||||
|
||||
ctup = SearchSysCache1(RELOID, ObjectIdGetDatum(chunk->table_id));
|
||||
|
||||
if (!HeapTupleIsValid(ctup))
|
||||
elog(ERROR,
|
||||
"pg_class entry for chunk \"%s.%s\" not found",
|
||||
NameStr(chunk->fd.schema_name),
|
||||
NameStr(chunk->fd.table_name));
|
||||
|
||||
pgcform = (Form_pg_class) GETSTRUCT(ctup);
|
||||
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_relstats_chunk_id)] = Int32GetDatum(chunk->fd.id);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_relstats_hypertable_id)] =
|
||||
Int32GetDatum(chunk->fd.hypertable_id);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_pages)] =
|
||||
Int32GetDatum(pgcform->relpages);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_tuples)] =
|
||||
Float4GetDatum(pgcform->reltuples);
|
||||
values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_allvisible)] =
|
||||
Int32GetDatum(pgcform->relallvisible);
|
||||
|
||||
ReleaseSysCache(ctup);
|
||||
|
||||
return heap_form_tuple(tupdesc, values, nulls);
|
||||
}
|
||||
|
||||
/*
|
||||
* Update the stats in the pg_class catalog entry for a chunk.
|
||||
*
|
||||
* Similar to code for vacuum/analyze.
|
||||
*
|
||||
* We do not update pg_class.relhasindex because vac_update_relstats() only
|
||||
* sets that field if it reverts back to false (see internal implementation).
|
||||
*/
|
||||
static void
|
||||
chunk_update_relstats(Chunk *chunk, int32 num_pages, float num_tuples, int32 num_allvisible)
|
||||
{
|
||||
Relation rel;
|
||||
|
||||
rel = try_relation_open(chunk->table_id, ShareUpdateExclusiveLock);
|
||||
|
||||
/* If a vacuum is running we might not be able to grab the lock, so just
|
||||
* raise an error and let the user try again. */
|
||||
if (NULL == rel)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_LOCK_NOT_AVAILABLE),
|
||||
errmsg("skipping relstats update of \"%s\" --- lock not available",
|
||||
NameStr(chunk->fd.table_name))));
|
||||
|
||||
vac_update_relstats(rel,
|
||||
num_pages,
|
||||
num_tuples,
|
||||
num_allvisible,
|
||||
true,
|
||||
InvalidTransactionId,
|
||||
InvalidMultiXactId,
|
||||
false);
|
||||
|
||||
relation_close(rel, ShareUpdateExclusiveLock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Fetch chunk relation stats from remote data nodes.
|
||||
*
|
||||
* This will remotely fetch, and locally update, relation stats (relpages,
|
||||
* reltuples, relallvisible in pg_class) for all chunks in a distributed
|
||||
* hypertable. We do not fetch 'relhasindex' because there is no way to set it
|
||||
* using vac_update_relstats() unless the values reverts back to 'false' (see
|
||||
* internal implementation of PG's vac_update_relstats).
|
||||
*
|
||||
* Note that we currently fetch stats from all chunk replicas, i.e., we might
|
||||
* fetch stats for a local chunk multiple times (once for each
|
||||
* replica). Presumably, stats should be the same for all replicas, but they
|
||||
* might vary if ANALYZE didn't run recently on the data node. We currently
|
||||
* don't care, however, and the "last" chunk replica will win w.r.t. which
|
||||
* stats will take precedence. We might consider optimizing this in the
|
||||
* future.
|
||||
*/
|
||||
static void
|
||||
fetch_remote_chunk_relstats(Hypertable *ht, FunctionCallInfo fcinfo)
|
||||
{
|
||||
DistCmdResult *cmdres;
|
||||
TupleDesc tupdesc;
|
||||
TupleFactory *tf;
|
||||
Size i;
|
||||
|
||||
Assert(hypertable_is_distributed(ht));
|
||||
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function returning record called in context "
|
||||
"that cannot accept type record")));
|
||||
|
||||
cmdres = ts_dist_cmd_invoke_func_call_on_all_data_nodes(fcinfo);
|
||||
/* Expect TEXT response format since dist command API currently defaults
|
||||
* to requesting TEXT */
|
||||
tf = tuplefactory_create_for_tupdesc(tupdesc, true);
|
||||
|
||||
for (i = 0; /* exit when res == NULL below */; i++)
|
||||
{
|
||||
PGresult *res;
|
||||
const char *node_name;
|
||||
int row;
|
||||
|
||||
res = ts_dist_cmd_get_result_by_index(cmdres, i, &node_name);
|
||||
|
||||
if (NULL == res)
|
||||
break;
|
||||
|
||||
for (row = 0; row < PQntuples(res); row++)
|
||||
{
|
||||
Datum values[_Anum_chunk_relstats_max];
|
||||
bool nulls[_Anum_chunk_relstats_max] = { false };
|
||||
HeapTuple tuple;
|
||||
int32 chunk_id;
|
||||
ChunkDataNode *cdn;
|
||||
Chunk *chunk;
|
||||
int32 num_pages;
|
||||
float num_tuples;
|
||||
int32 num_allvisible;
|
||||
|
||||
tuple = tuplefactory_make_tuple(tf, res, row);
|
||||
heap_deform_tuple(tuple, tupdesc, values, nulls);
|
||||
chunk_id = DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_relstats_chunk_id)]);
|
||||
cdn = ts_chunk_data_node_scan_by_remote_chunk_id_and_node_name(chunk_id,
|
||||
node_name,
|
||||
CurrentMemoryContext);
|
||||
chunk = ts_chunk_get_by_id(cdn->fd.chunk_id, true);
|
||||
num_pages =
|
||||
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_pages)]);
|
||||
num_tuples =
|
||||
DatumGetFloat4(values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_tuples)]);
|
||||
num_allvisible =
|
||||
DatumGetInt32(values[AttrNumberGetAttrOffset(Anum_chunk_relstats_num_allvisible)]);
|
||||
chunk_update_relstats(chunk, num_pages, num_tuples, num_allvisible);
|
||||
}
|
||||
}
|
||||
|
||||
ts_dist_cmd_close_response(cmdres);
|
||||
}
|
||||
|
||||
/*
|
||||
* Get relation stats for chunks.
|
||||
*
|
||||
* This function takes a hypertable or chunk as input (regclass). In case of a
|
||||
* hypertable, it will get the relstats for all the chunks in the hypertable,
|
||||
* otherwise only the given chunk.
|
||||
*
|
||||
* If a hypertable is distributed, the function will first refresh the local
|
||||
* chunk stats by fetching stats from remote data nodes.
|
||||
*/
|
||||
Datum
|
||||
chunk_api_get_chunk_relstats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
FuncCallContext *funcctx;
|
||||
List *chunk_oids = NIL;
|
||||
|
||||
if (SRF_IS_FIRSTCALL())
|
||||
{
|
||||
Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
|
||||
MemoryContext oldcontext;
|
||||
TupleDesc tupdesc;
|
||||
Cache *hcache;
|
||||
Hypertable *ht;
|
||||
|
||||
if (!OidIsValid(relid))
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("invalid table")));
|
||||
|
||||
hcache = ts_hypertable_cache_pin();
|
||||
ht = ts_hypertable_cache_get_entry(hcache, relid, CACHE_FLAG_MISSING_OK);
|
||||
|
||||
if (NULL == ht)
|
||||
{
|
||||
Chunk *chunk = ts_chunk_get_by_relid(relid, false);
|
||||
|
||||
if (NULL == chunk)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("must be a hypertable or chunk")));
|
||||
|
||||
chunk_oids = list_make1_oid(chunk->table_id);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (hypertable_is_distributed(ht))
|
||||
{
|
||||
/* If this is a distributed hypertable, we fetch stats from
|
||||
* remote nodes */
|
||||
fetch_remote_chunk_relstats(ht, fcinfo);
|
||||
/* Make updated stats visible so that we can retreive them locally below */
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
||||
chunk_oids = find_inheritance_children(relid, NoLock);
|
||||
}
|
||||
|
||||
ts_cache_release(hcache);
|
||||
|
||||
funcctx = SRF_FIRSTCALL_INIT();
|
||||
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||
|
||||
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("function returning record called in context "
|
||||
"that cannot accept type record")));
|
||||
|
||||
/* Save the chunk oid list on the multi-call memory context so that it
|
||||
* survives across multiple calls to this function (until SRF is
|
||||
* done). */
|
||||
funcctx->user_fctx = list_copy(chunk_oids);
|
||||
funcctx->tuple_desc = BlessTupleDesc(tupdesc);
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
chunk_oids = (List *) funcctx->user_fctx;
|
||||
|
||||
if (chunk_oids == NIL)
|
||||
SRF_RETURN_DONE(funcctx);
|
||||
else
|
||||
{
|
||||
Oid relid = linitial_oid(chunk_oids);
|
||||
Chunk *chunk = ts_chunk_get_by_relid(relid, true);
|
||||
HeapTuple tuple = chunk_get_single_stats_tuple(chunk, funcctx->tuple_desc);
|
||||
MemoryContext oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
|
||||
|
||||
chunk_oids = list_delete_first(chunk_oids);
|
||||
funcctx->user_fctx = chunk_oids;
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
SRF_RETURN_NEXT(funcctx, HeapTupleGetDatum(tuple));
|
||||
}
|
||||
}
|
||||
|
@ -11,5 +11,6 @@
|
||||
extern Datum chunk_show(PG_FUNCTION_ARGS);
|
||||
extern Datum chunk_create(PG_FUNCTION_ARGS);
|
||||
extern void chunk_api_create_on_data_nodes(Chunk *chunk, Hypertable *ht);
|
||||
extern Datum chunk_api_get_chunk_relstats(PG_FUNCTION_ARGS);
|
||||
|
||||
#endif /* TIMESCALEDB_TSL_CHUNK_API_H */
|
||||
|
@ -204,7 +204,7 @@ dist_util_remote_hypertable_info(PG_FUNCTION_ARGS)
|
||||
}
|
||||
|
||||
funcctx = SRF_PERCALL_SETUP();
|
||||
result = ts_dist_cmd_get_data_node_result(funcctx->user_fctx, node_name);
|
||||
result = ts_dist_cmd_get_result_by_node_name(funcctx->user_fctx, node_name);
|
||||
|
||||
if (funcctx->call_cntr < PQntuples(result))
|
||||
{
|
||||
|
@ -160,7 +160,7 @@ create_foreign_modify(EState *estate, Relation rel, CmdType operation, Oid check
|
||||
|
||||
/* Prepare for input conversion of RETURNING results. */
|
||||
if (fmstate->has_returning)
|
||||
fmstate->att_conv_metadata = data_format_create_att_conv_in_metadata(tupdesc);
|
||||
fmstate->att_conv_metadata = data_format_create_att_conv_in_metadata(tupdesc, false);
|
||||
|
||||
if (operation == CMD_UPDATE || operation == CMD_DELETE)
|
||||
{
|
||||
|
@ -81,7 +81,7 @@ hypertable_create_backend_tables(int32 hypertable_id, List *data_nodes)
|
||||
dist_res = ts_dist_cmd_invoke_on_data_nodes(commands->table_create_command, data_nodes, true);
|
||||
foreach (cell, data_nodes)
|
||||
{
|
||||
PGresult *res = ts_dist_cmd_get_data_node_result(dist_res, lfirst(cell));
|
||||
PGresult *res = ts_dist_cmd_get_result_by_node_name(dist_res, lfirst(cell));
|
||||
|
||||
Assert(PQntuples(res) == 1);
|
||||
Assert(PQnfields(res) == AttrNumberGetAttrOffset(_Anum_create_hypertable_max));
|
||||
|
@ -261,6 +261,7 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.validate_as_data_node = NULL,
|
||||
.distributed_exec = error_not_supported_default_fn,
|
||||
.func_call_on_data_nodes = error_func_call_on_data_nodes_not_supported,
|
||||
.get_chunk_relstats = error_not_supported_default_fn,
|
||||
#else
|
||||
.add_data_node = data_node_add,
|
||||
.delete_data_node = data_node_delete,
|
||||
@ -294,6 +295,7 @@ CrossModuleFunctions tsl_cm_functions = {
|
||||
.validate_as_data_node = validate_data_node_settings,
|
||||
.distributed_exec = ts_dist_cmd_exec,
|
||||
.func_call_on_data_nodes = ts_dist_cmd_func_call_on_data_nodes,
|
||||
.get_chunk_relstats = chunk_api_get_chunk_relstats,
|
||||
#endif
|
||||
.cache_syscache_invalidate = cache_syscache_invalidate,
|
||||
};
|
||||
|
@ -83,7 +83,7 @@ data_format_get_type_input_func(Oid type, bool *is_binary, bool force_text, Oid
|
||||
}
|
||||
|
||||
AttConvInMetadata *
|
||||
data_format_create_att_conv_in_metadata(TupleDesc tupdesc)
|
||||
data_format_create_att_conv_in_metadata(TupleDesc tupdesc, bool force_text)
|
||||
{
|
||||
AttConvInMetadata *att_conv_metadata;
|
||||
int i = 0;
|
||||
@ -103,15 +103,14 @@ data_format_create_att_conv_in_metadata(TupleDesc tupdesc)
|
||||
|
||||
if (!TupleDescAttr(tupdesc, i)->attisdropped)
|
||||
{
|
||||
funcoid =
|
||||
data_format_get_type_input_func(TupleDescAttr(tupdesc, i)->atttypid,
|
||||
&isbinary,
|
||||
!ts_guc_enable_connection_binary_data || !isbinary,
|
||||
&att_conv_metadata->ioparams[i]);
|
||||
funcoid = data_format_get_type_input_func(TupleDescAttr(tupdesc, i)->atttypid,
|
||||
&isbinary,
|
||||
force_text || !isbinary,
|
||||
&att_conv_metadata->ioparams[i]);
|
||||
if (prev == !isbinary)
|
||||
{
|
||||
i = 0; /* in/out functions has to be eiher all binary or all text (PostgreSQL
|
||||
limitation). Lets restart function discovery process */
|
||||
limitation). Let's restart function discovery process */
|
||||
prev = false;
|
||||
continue;
|
||||
}
|
||||
|
@ -22,7 +22,8 @@ typedef struct AttConvInMetadata
|
||||
bool binary; /* if we use function with binary input */
|
||||
} AttConvInMetadata;
|
||||
|
||||
extern AttConvInMetadata *data_format_create_att_conv_in_metadata(TupleDesc tupdesc);
|
||||
extern AttConvInMetadata *data_format_create_att_conv_in_metadata(TupleDesc tupdesc,
|
||||
bool force_text);
|
||||
|
||||
extern Oid data_format_get_type_output_func(Oid type, bool *is_binary, bool force_text);
|
||||
extern Oid data_format_get_type_input_func(Oid type, bool *is_binary, bool force_text,
|
||||
|
@ -186,7 +186,7 @@ ts_dist_cmd_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_nodes)
|
||||
}
|
||||
|
||||
PGresult *
|
||||
ts_dist_cmd_get_data_node_result(DistCmdResult *response, const char *node_name)
|
||||
ts_dist_cmd_get_result_by_node_name(DistCmdResult *response, const char *node_name)
|
||||
{
|
||||
int i;
|
||||
|
||||
@ -200,6 +200,31 @@ ts_dist_cmd_get_data_node_result(DistCmdResult *response, const char *node_name)
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Get the n:th command result.
|
||||
*
|
||||
* Returns the n:th command result as given by the index, or NULL if no such
|
||||
* result.
|
||||
*
|
||||
* Optionally get the name of the node that the result was from via the
|
||||
* node_name parameter.
|
||||
*/
|
||||
PGresult *
|
||||
ts_dist_cmd_get_result_by_index(DistCmdResult *response, Size index, const char **node_name)
|
||||
{
|
||||
DistCmdResponse *rsp;
|
||||
|
||||
if (index >= response->num_responses)
|
||||
return NULL;
|
||||
|
||||
rsp = &response->responses[index];
|
||||
|
||||
if (NULL != node_name)
|
||||
*node_name = rsp->data_node;
|
||||
|
||||
return async_response_result_get_pg_result(rsp->result);
|
||||
}
|
||||
|
||||
void
|
||||
ts_dist_cmd_close_response(DistCmdResult *response)
|
||||
{
|
||||
|
@ -23,7 +23,10 @@ extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_all_data_nodes(FunctionCal
|
||||
extern DistCmdResult *ts_dist_cmd_invoke_func_call_on_data_nodes(FunctionCallInfo fcinfo,
|
||||
List *data_nodes);
|
||||
extern void ts_dist_cmd_func_call_on_data_nodes(FunctionCallInfo fcinfo, List *data_nodes);
|
||||
extern PGresult *ts_dist_cmd_get_data_node_result(DistCmdResult *response, const char *node_name);
|
||||
extern PGresult *ts_dist_cmd_get_result_by_node_name(DistCmdResult *response,
|
||||
const char *node_name);
|
||||
extern PGresult *ts_dist_cmd_get_result_by_index(DistCmdResult *response, Size index,
|
||||
const char **node_name);
|
||||
|
||||
extern void ts_dist_cmd_close_response(DistCmdResult *response);
|
||||
|
||||
|
@ -29,6 +29,7 @@
|
||||
#include <utils/float.h>
|
||||
#endif
|
||||
|
||||
#include <guc.h>
|
||||
#include "utils.h"
|
||||
#include "compat.h"
|
||||
#include "remote/data_format.h"
|
||||
@ -145,7 +146,7 @@ conversion_error_callback(void *arg)
|
||||
}
|
||||
|
||||
static TupleFactory *
|
||||
tuplefactory_create(Relation rel, ScanState *ss, List *retrieved_attrs)
|
||||
tuplefactory_create_common(TupleDesc tupdesc, List *retrieved_attrs, bool force_text)
|
||||
{
|
||||
TupleFactory *tf = palloc0(sizeof(TupleFactory));
|
||||
|
||||
@ -153,22 +154,48 @@ tuplefactory_create(Relation rel, ScanState *ss, List *retrieved_attrs)
|
||||
"tuple factory temporary data",
|
||||
ALLOCSET_SMALL_SIZES);
|
||||
|
||||
if (NULL != rel)
|
||||
tf->tupdesc = RelationGetDescr(rel);
|
||||
else
|
||||
{
|
||||
Assert(ss);
|
||||
tf->tupdesc = ss->ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||
}
|
||||
|
||||
tf->tupdesc = tupdesc;
|
||||
tf->retrieved_attrs = retrieved_attrs;
|
||||
tf->attconv = data_format_create_att_conv_in_metadata(tf->tupdesc);
|
||||
tf->attconv = data_format_create_att_conv_in_metadata(tf->tupdesc, force_text);
|
||||
tf->values = (Datum *) palloc0(tf->tupdesc->natts * sizeof(Datum));
|
||||
tf->nulls = (bool *) palloc(tf->tupdesc->natts * sizeof(bool));
|
||||
|
||||
/* Initialize to nulls for any columns not present in result */
|
||||
memset(tf->nulls, true, tf->tupdesc->natts * sizeof(bool));
|
||||
|
||||
return tf;
|
||||
}
|
||||
|
||||
TupleFactory *
|
||||
tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text)
|
||||
{
|
||||
List *retrieved_attrs = NIL;
|
||||
int i;
|
||||
|
||||
for (i = 0; i < tupdesc->natts; i++)
|
||||
{
|
||||
if (!TupleDescAttr(tupdesc, i)->attisdropped)
|
||||
retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
|
||||
}
|
||||
|
||||
return tuplefactory_create_common(tupdesc, retrieved_attrs, force_text);
|
||||
}
|
||||
|
||||
static TupleFactory *
|
||||
tuplefactory_create(Relation rel, ScanState *ss, List *retrieved_attrs)
|
||||
{
|
||||
TupleFactory *tf;
|
||||
TupleDesc tupdesc;
|
||||
|
||||
Assert(!(rel && ss) && (rel || ss));
|
||||
|
||||
if (NULL != rel)
|
||||
tupdesc = RelationGetDescr(rel);
|
||||
else
|
||||
tupdesc = ss->ss_ScanTupleSlot->tts_tupleDescriptor;
|
||||
|
||||
tf =
|
||||
tuplefactory_create_common(tupdesc, retrieved_attrs, !ts_guc_enable_connection_binary_data);
|
||||
tf->errpos.rel = rel;
|
||||
tf->errpos.cur_attno = 0;
|
||||
tf->errpos.ss = ss;
|
||||
@ -223,8 +250,11 @@ tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row)
|
||||
buf = makeStringInfo();
|
||||
|
||||
/* Install error callback */
|
||||
tf->errcallback.previous = error_context_stack;
|
||||
error_context_stack = &tf->errcallback;
|
||||
if (tf->errcallback.callback != NULL)
|
||||
{
|
||||
tf->errcallback.previous = error_context_stack;
|
||||
error_context_stack = &tf->errcallback;
|
||||
}
|
||||
|
||||
/*
|
||||
* i indexes columns in the relation, j indexes columns in the PGresult.
|
||||
@ -315,7 +345,8 @@ tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row)
|
||||
}
|
||||
|
||||
/* Uninstall error context callback. */
|
||||
error_context_stack = tf->errcallback.previous;
|
||||
if (tf->errcallback.callback != NULL)
|
||||
error_context_stack = tf->errcallback.previous;
|
||||
|
||||
/*
|
||||
* Check we got the expected number of columns. Note: j == 0 and
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
typedef struct TupleFactory TupleFactory;
|
||||
|
||||
extern TupleFactory *tuplefactory_create_for_tupdesc(TupleDesc tupdesc, bool force_text);
|
||||
extern TupleFactory *tuplefactory_create_for_rel(Relation rel, List *retrieved_attrs);
|
||||
extern TupleFactory *tuplefactory_create_for_scan(ScanState *ss, List *retrieved_attrs);
|
||||
extern HeapTuple tuplefactory_make_tuple(TupleFactory *tf, PGresult *res, int row);
|
||||
|
@ -102,3 +102,122 @@ FROM show_chunks('chunkapi');
|
||||
ChunkSchema | My_chunk_Table_name | table | default_perm_user
|
||||
(1 row)
|
||||
|
||||
-- Test getting relation stats for chunk. First get stats
|
||||
-- chunk-by-chunk. Note that the table isn't ANALYZED, so no stats
|
||||
-- present yet.
|
||||
SELECT (_timescaledb_internal.get_chunk_relstats(show_chunks)).*
|
||||
FROM show_chunks('chunkapi');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
1 | 1 | 0 | 0 | 0
|
||||
2 | 1 | 0 | 0 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Get the same stats but by giving the hypertable as input
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('chunkapi');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
1 | 1 | 0 | 0 | 0
|
||||
2 | 1 | 0 | 0 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Show stats after analyze
|
||||
ANALYZE chunkapi;
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('chunkapi');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
1 | 1 | 1 | 1 | 0
|
||||
2 | 1 | 0 | 0 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Test getting chunk stats on a distribute hypertable
|
||||
SET ROLE :ROLE_CLUSTER_SUPERUSER;
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP DATABASE IF EXISTS data_node_1;
|
||||
DROP DATABASE IF EXISTS data_node_2;
|
||||
SET client_min_messages TO NOTICE;
|
||||
SELECT * FROM add_data_node('data_node_1', host => 'localhost',
|
||||
database => 'data_node_1');
|
||||
node_name | host | port | database | node_created | database_created | extension_created
|
||||
-------------+-----------+-------+-------------+--------------+------------------+-------------------
|
||||
data_node_1 | localhost | 15432 | data_node_1 | t | t | t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM add_data_node('data_node_2', host => 'localhost',
|
||||
database => 'data_node_2');
|
||||
node_name | host | port | database | node_created | database_created | extension_created
|
||||
-------------+-----------+-------+-------------+--------------+------------------+-------------------
|
||||
data_node_2 | localhost | 15432 | data_node_2 | t | t | t
|
||||
(1 row)
|
||||
|
||||
GRANT USAGE
|
||||
ON FOREIGN SERVER data_node_1, data_node_2
|
||||
TO :ROLE_1;
|
||||
SET ROLE :ROLE_1;
|
||||
CREATE TABLE disttable (time timestamptz, device int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('disttable', 'time', 'device');
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
hypertable_id | schema_name | table_name | created
|
||||
---------------+-------------+------------+---------
|
||||
2 | public | disttable | t
|
||||
(1 row)
|
||||
|
||||
INSERT INTO disttable VALUES ('2018-01-01 05:00:00-8', 1, 23.4),
|
||||
('2018-01-01 06:00:00-8', 4, 22.3),
|
||||
('2018-01-01 06:00:00-8', 1, 21.1);
|
||||
-- No stats on the local table
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
3 | 2 | 0 | 0 | 0
|
||||
4 | 2 | 0 | 0 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Run ANALYZE on data node 1
|
||||
SELECT * FROM distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
|
||||
distributed_exec
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Stats should now be refreshed locally
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
3 | 2 | 1 | 2 | 0
|
||||
4 | 2 | 0 | 0 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Run ANALYZE again, but on both nodes
|
||||
SELECT * FROM distributed_exec('ANALYZE disttable');
|
||||
distributed_exec
|
||||
------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Now expect stats from all data node chunks
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
chunk_id | hypertable_id | num_pages | num_tuples | num_allvisible
|
||||
----------+---------------+-----------+------------+----------------
|
||||
3 | 2 | 1 | 2 | 0
|
||||
4 | 2 | 1 | 1 | 0
|
||||
(2 rows)
|
||||
|
||||
RESET ROLE;
|
||||
-- Clean up
|
||||
TRUNCATE disttable;
|
||||
SELECT * FROM delete_data_node('data_node_1', force => true);
|
||||
delete_data_node
|
||||
------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM delete_data_node('data_node_2', force => true);
|
||||
WARNING: new data for hypertable "disttable" will be under-replicated due to deleting data node "data_node_2"
|
||||
delete_data_node
|
||||
------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP DATABASE data_node_1;
|
||||
DROP DATABASE data_node_2;
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -55,3 +55,64 @@ FROM show_chunks('chunkapi');
|
||||
-- Show the new chunks
|
||||
\dt public.*
|
||||
\dt "ChunkSchema".*
|
||||
|
||||
|
||||
-- Test getting relation stats for chunk. First get stats
|
||||
-- chunk-by-chunk. Note that the table isn't ANALYZED, so no stats
|
||||
-- present yet.
|
||||
SELECT (_timescaledb_internal.get_chunk_relstats(show_chunks)).*
|
||||
FROM show_chunks('chunkapi');
|
||||
|
||||
-- Get the same stats but by giving the hypertable as input
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('chunkapi');
|
||||
|
||||
-- Show stats after analyze
|
||||
ANALYZE chunkapi;
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('chunkapi');
|
||||
|
||||
-- Test getting chunk stats on a distribute hypertable
|
||||
SET ROLE :ROLE_CLUSTER_SUPERUSER;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP DATABASE IF EXISTS data_node_1;
|
||||
DROP DATABASE IF EXISTS data_node_2;
|
||||
SET client_min_messages TO NOTICE;
|
||||
|
||||
SELECT * FROM add_data_node('data_node_1', host => 'localhost',
|
||||
database => 'data_node_1');
|
||||
SELECT * FROM add_data_node('data_node_2', host => 'localhost',
|
||||
database => 'data_node_2');
|
||||
|
||||
GRANT USAGE
|
||||
ON FOREIGN SERVER data_node_1, data_node_2
|
||||
TO :ROLE_1;
|
||||
|
||||
SET ROLE :ROLE_1;
|
||||
CREATE TABLE disttable (time timestamptz, device int, temp float);
|
||||
SELECT * FROM create_distributed_hypertable('disttable', 'time', 'device');
|
||||
INSERT INTO disttable VALUES ('2018-01-01 05:00:00-8', 1, 23.4),
|
||||
('2018-01-01 06:00:00-8', 4, 22.3),
|
||||
('2018-01-01 06:00:00-8', 1, 21.1);
|
||||
|
||||
-- No stats on the local table
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
|
||||
-- Run ANALYZE on data node 1
|
||||
SELECT * FROM distributed_exec('ANALYZE disttable', '{ "data_node_1" }');
|
||||
|
||||
-- Stats should now be refreshed locally
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
|
||||
-- Run ANALYZE again, but on both nodes
|
||||
SELECT * FROM distributed_exec('ANALYZE disttable');
|
||||
|
||||
-- Now expect stats from all data node chunks
|
||||
SELECT * FROM _timescaledb_internal.get_chunk_relstats('disttable');
|
||||
|
||||
RESET ROLE;
|
||||
-- Clean up
|
||||
TRUNCATE disttable;
|
||||
SELECT * FROM delete_data_node('data_node_1', force => true);
|
||||
SELECT * FROM delete_data_node('data_node_2', force => true);
|
||||
DROP DATABASE data_node_1;
|
||||
DROP DATABASE data_node_2;
|
||||
|
@ -19,7 +19,7 @@ TS_FUNCTION_INFO_V1(tsl_invoke_faulty_distributed_command);
|
||||
elog(INFO, \
|
||||
"%s result: %s", \
|
||||
TARGET, \
|
||||
PQresStatus(PQresultStatus(ts_dist_cmd_get_data_node_result(RESULT, TARGET))));
|
||||
PQresStatus(PQresultStatus(ts_dist_cmd_get_result_by_node_name(RESULT, TARGET))));
|
||||
|
||||
Datum
|
||||
tsl_invoke_distributed_commands(PG_FUNCTION_ARGS)
|
||||
@ -57,7 +57,7 @@ tsl_invoke_distributed_commands(PG_FUNCTION_ARGS)
|
||||
"device int, temp float);",
|
||||
subset_nodes,
|
||||
true);
|
||||
TestAssertTrue(ts_dist_cmd_get_data_node_result(results, llast(data_nodes)) == NULL);
|
||||
TestAssertTrue(ts_dist_cmd_get_result_by_node_name(results, llast(data_nodes)) == NULL);
|
||||
|
||||
foreach (lc, subset_nodes)
|
||||
{
|
||||
|
Loading…
x
Reference in New Issue
Block a user