Merged in mat/c-based-main-table-queries (pull request #18)

Mat/c based main table queries
This commit is contained in:
Matvey Arye 2016-12-23 15:01:30 -05:00
commit ba990d4a10
7 changed files with 150 additions and 175 deletions

View File

@ -29,11 +29,13 @@ BEGIN
INTO STRICT time_field_type
FROM pg_attribute
WHERE attrelid = main_table AND attname = time_field_name;
PERFORM dblink_connect('meta_conn', get_meta_server_name());
PERFORM dblink_exec('meta_conn', 'BEGIN');
SELECT (t.r::hypertable).*
INTO hypertable_row
FROM dblink(
get_meta_server_name(),
'meta_conn',
format('SELECT t FROM _meta.add_hypertable(%L, %L, %L, %L, %L, %L, %L, %L, %L, %L, %L, %L) t ',
schema_name,
table_name,
@ -53,14 +55,14 @@ BEGIN
FROM pg_attribute att
WHERE attrelid = main_table AND attnum > 0 AND NOT attisdropped
LOOP
PERFORM _sysinternal.create_column_from_attribute(hypertable_row.name, att_row);
PERFORM _sysinternal.create_column_from_attribute(hypertable_row.name, att_row, 'meta_conn');
END LOOP;
PERFORM 1
FROM pg_index,
LATERAL dblink(
get_meta_server_name(),
'meta_conn',
format('SELECT _meta.add_index(%L, %L,%L, %L, %L)',
hypertable_row.name,
hypertable_row.main_schema_name,
@ -70,6 +72,8 @@ BEGIN
)) AS t(r TEXT)
WHERE indrelid = main_table;
PERFORM dblink_exec('meta_conn', 'COMMIT');
PERFORM dblink_disconnect('meta_conn');
RETURN hypertable_row;
END
$BODY$;

View File

@ -49,15 +49,19 @@ $BODY$;
--create the hypertable column based on a pg_attribute (table column) entry.
CREATE OR REPLACE FUNCTION _sysinternal.create_column_from_attribute(
hypertable_name NAME,
att pg_attribute
att pg_attribute,
conn_name TEXT = NULL
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
BEGIN
IF conn_name IS NULL THEN
conn_name = get_meta_server_name();
END IF;
PERFORM *
FROM dblink(
get_meta_server_name(),
conn_name,
format('SELECT _meta.add_field(%L, %L, %L, %L, %L, %L, %L, %L)',
hypertable_name,
att.attname,

View File

@ -44,8 +44,8 @@ SELECT *
FROM insert_data('public."Hypertable_1"', 'copy_t');
COMMIT;
--TODO: make following work.
--SELECT * FROM PUBLIC."Hypertable_1";
SELECT * FROM PUBLIC."Hypertable_1";
EXPLAIN SELECT * FROM PUBLIC."Hypertable_1";
\d+ PUBLIC."Hypertable_1"
\d+ "_sys_1_"."_hyper_1_root"
@ -93,3 +93,6 @@ SELECT * FROM _sys_1_._hyper_1_0_1_distinct_data;
\d+ PUBLIC."Hypertable_1"
\d+ "_sys_1_"."_hyper_1_root"
\d+ _sys_1_._hyper_1_1_0_1_data
SELECT * FROM PUBLIC."Hypertable_1";

View File

@ -106,8 +106,22 @@ FROM insert_data('public."Hypertable_1"', 'copy_t');
(1 row)
COMMIT;
--TODO: make following work.
--SELECT * FROM PUBLIC."Hypertable_1";
SELECT * FROM PUBLIC."Hypertable_1";
time | Device_id | temp_c | humidity | sensor_1 | sensor_2 | sensor_3 | sensor_4
---------------------+-----------+--------+----------+----------+----------+----------+----------
1257894000000000000 | dev1 | 30 | 70 | 1 | 2 | 3 | 100
(1 row)
EXPLAIN SELECT * FROM PUBLIC."Hypertable_1";
QUERY PLAN
---------------------------------------------------------------------------------
Append (cost=0.00..27.00 rows=702 width=204)
-> Seq Scan on _hyper_1_0_replica (cost=0.00..0.00 rows=1 width=204)
-> Seq Scan on _hyper_1_1_0_partition (cost=0.00..0.00 rows=1 width=204)
-> Seq Scan on _hyper_1_2_0_partition (cost=0.00..13.50 rows=350 width=204)
-> Seq Scan on _hyper_1_1_0_1_data (cost=0.00..13.50 rows=350 width=204)
(5 rows)
\d+ PUBLIC."Hypertable_1"
Table "public.Hypertable_1"
Column | Type | Modifiers | Storage | Stats target | Description
@ -325,3 +339,9 @@ Check constraints:
"partition" CHECK (get_partition_for_key("Device_id", 32768) >= '0'::smallint AND get_partition_for_key("Device_id", 32768) <= '16383'::smallint)
Inherits: _sys_1_._hyper_1_1_0_partition
SELECT * FROM PUBLIC."Hypertable_1";
time | Device_id | humidity | sensor_1 | sensor_2_renamed | sensor_3_renamed | temp_f | sensor_3 | sensor_4
---------------------+-----------+----------+----------+------------------+------------------+--------+----------+----------
1257894000000000000 | dev1 | 70 | 1 | 2 | 3 | 31 | 131 | 131
(1 row)

View File

@ -1,8 +1,10 @@
#include "postgres.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace.h"
#include "optimizer/planner.h"
#include "nodes/nodes.h"
#include "nodes/print.h"
#include "nodes/nodeFuncs.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "executor/spi.h"
@ -23,21 +25,22 @@ static planner_hook_type prev_planner_hook = NULL;
static bool isLoaded = false;
PlannedStmt *iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams);
void
void
_PG_init(void)
{
elog(INFO, "iobeamdb loaded");
prev_planner_hook = planner_hook;
planner_hook = iobeamdb_planner;
elog(INFO, "iobeamdb loaded");
prev_planner_hook = planner_hook;
planner_hook = iobeamdb_planner;
}
void
void
_PG_fini(void)
{
planner_hook = prev_planner_hook;
planner_hook = prev_planner_hook;
}
bool IobeamLoaded(void)
bool
IobeamLoaded(void)
{
if (!isLoaded)
{
@ -50,178 +53,117 @@ bool IobeamLoaded(void)
return isLoaded;
}
char*
cmdToString(CmdType cmdType)
{
switch (cmdType) {
case CMD_SELECT: return("select");
case CMD_UPDATE: return("update");
case CMD_INSERT: return("insert");
case CMD_DELETE: return("delete");
case CMD_UTILITY: return("utility");
case CMD_UNKNOWN:
default: return("unknown");
}
return("unknown");
}
PlannedStmt *
PlannedStmt *
iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{
PlannedStmt *rv = NULL;
PlannedStmt *rv = NULL;
// elog(INFO, "iobeamdb got called");
if (rv != NULL)
return rv;
if (prev_planner_hook != NULL) {
/* Call any earlier hooks */
elog(LOG, " calling prev planner-hook");
rv = (prev_planner_hook)(parse, cursorOptions, boundParams);
} else {
/* Call the standard planner */
//elog(LOG, " calling standard_planner");
rv = standard_planner(parse, cursorOptions, boundParams);
}
if(!IobeamLoaded()) \
if(IobeamLoaded())
{
return rv;
} else
{
/*
here we have the plan and can start to mess around with it.
*/
Oid singleFromTable = get_single_from_oid(parse);
if (rv->commandType == CMD_SELECT && singleFromTable!= InvalidOid)
{
char *hypertableName = get_hypertable_name(singleFromTable);
if (hypertableName != NULL)
{
Query *optimizedQuery;
char *optimizedSql = get_optimized_query_sql(parse, hypertableName);
pprint(parse);
optimizedQuery = re_parse_optimized(optimizedSql);
pprint(optimizedQuery);
rv = standard_planner(optimizedQuery, cursorOptions, NULL);
}
}
/* replace call to main table with call to the replica table */
change_table_name_walker((Node *) parse, NULL);
}
if (prev_planner_hook != NULL) {
/* Call any earlier hooks */
elog(LOG, " calling prev planner-hook");
rv = (prev_planner_hook)(parse, cursorOptions, boundParams);
} else {
/* Call the standard planner */
//elog(LOG, " calling standard_planner");
rv = standard_planner(parse, cursorOptions, boundParams);
}
return rv;
}
Query *re_parse_optimized(char * sql) {
/*
* Parse the SQL string into a list of raw parse trees.
*/
List *raw_parsetree_list = pg_parse_query(sql);
Node *parsetree;
List *querytree_list;
Query * parse;
if(list_length(raw_parsetree_list) != 1) {
elog(ERROR, "Expected one parsetree");
/*
* Change all main tables to one of the replicas in the parse tree.
*
*/
bool
change_table_name_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}
parsetree = (Node *) linitial(raw_parsetree_list);
querytree_list = pg_analyze_and_rewrite(parsetree,
sql,
NULL,
0);
if(list_length(querytree_list) != 1) {
elog(ERROR, "Expected one querytree");
if (IsA(node, RangeTblEntry))
{
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node;
if (rangeTableEntry->rtekind == RTE_RELATION)
{
Oid replicaOid;
replicaOid = get_replica_oid(rangeTableEntry->relid);
if (replicaOid != InvalidOid) {
rangeTableEntry->relid = replicaOid;
}
}
return false;
}
parse = (Query *) linitial(querytree_list);
return parse;
if (IsA(node, Query))
{
return query_tree_walker((Query *) node, change_table_name_walker,
context, QTW_EXAMINE_RTES);
}
return expression_tree_walker(node, change_table_name_walker, context);
}
char* get_optimized_query_sql(Query *parse, char *hypertableName) {
StringInfo codeGenSql = makeStringInfo();
appendStringInfo(codeGenSql, "SELECT ioql_exec_query_record_sql(new_ioql_query(namespace_name => '%s'))", hypertableName);
char *resultSql = NULL;
/*
*
* Use the default_replica_node to look up the oid for a replica table from the oid of the main table.
* TODO: make this use a cache instead of a db lookup every time.
*
* */
Oid
get_replica_oid(Oid mainRelationOid)
{
Oid namespace = get_rel_namespace(mainRelationOid);
//TODO: cache this
Oid hypertable_meta = get_relname_relid("hypertable", get_namespace_oid("public", false));
char *tableName = get_rel_name(mainRelationOid);
char *schemaName = get_namespace_name(namespace);
StringInfo sql = makeStringInfo();
int ret;
Oid replicaOid = InvalidOid;
/* prevents infinite recursion, don't check hypertable meta tables */
if (
hypertable_meta == InvalidOid
|| namespace == PG_CATALOG_NAMESPACE
|| mainRelationOid == hypertable_meta
|| mainRelationOid == get_relname_relid("hypertable_replica", get_namespace_oid("public", false))
|| mainRelationOid == get_relname_relid("default_replica_node", get_namespace_oid("public", false))
)
{
return InvalidOid;
}
appendStringInfo(sql, REPLICA_OID_QUERY, schemaName, tableName);
SPI_connect();
if (SPI_exec(codeGenSql->data, 1) <= 0) {
ret = SPI_exec(sql->data, 1);
if (ret <= 0)
{
elog(ERROR, "Got an SPI error");
}
if(SPI_processed != 1) {
elog(ERROR, "Didn't get 1 row in code gen");
SPI_finish();
return NULL;
}
resultSql = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
elog(LOG, "The optimized query is %s", resultSql);
SPI_finish();
return resultSql;
}
Oid get_single_from_oid(Query *parse) {
/* one from entry which is to a table that is a hypertable */
if (list_length(parse->jointree->fromlist)==1)
if(SPI_processed == 1)
{
Node *jtnode = (Node *) linitial(parse->jointree->fromlist);
if (IsA(jtnode, RangeTblRef))
{
int varno = ((RangeTblRef *) jtnode)->rtindex;
RangeTblEntry *rte = rt_fetch(varno, parse->rtable);
if (rte->rtekind == RTE_RELATION)
{
Oid relationOid = rte->relid;
return relationOid;
}
}
bool isnull;
Datum res;
TupleDesc tupdesc = SPI_tuptable->tupdesc;
HeapTuple tuple = SPI_tuptable->vals[0];
res = SPI_getbinval(tuple, tupdesc, 1, &isnull);
replicaOid = DatumGetObjectId(res);
}
return InvalidOid;
SPI_finish();
return replicaOid;
}
char * get_hypertable_name(Oid relationOid) {
Oid namespace = get_rel_namespace(relationOid);
//todo cache this
Oid hypertable_meta = get_relname_relid("hypertable", get_namespace_oid("public", false));
char *tableName = get_rel_name(relationOid);
char *schemaName = get_namespace_name(namespace);
StringInfo sql = makeStringInfo();
int ret;
uint64 results;
char *name = NULL;
/* prevents infinite recursion, don't check hypertable meta table */
if (hypertable_meta == relationOid) {
return NULL;
}
appendStringInfo(sql, HYPERTABLE_NAME_QUERY, schemaName, tableName);
SPI_connect();
ret = SPI_exec(sql->data, 1);
if (ret <= 0) {
elog(ERROR, "Got an SPI error");
}
results = SPI_processed;
if(results == 1) {
TupleDesc tupdesc = SPI_tuptable->tupdesc;
HeapTuple tuple = SPI_tuptable->vals[0];
name = SPI_getvalue(tuple, tupdesc, 1);
elog(LOG, "Is a hypertable query for %s %s.%s", name, schemaName, tableName);
}
SPI_finish();
return name;
}

View File

@ -3,20 +3,22 @@
#define HYPERTABLE_NAME_QUERY "SELECT name FROM hypertable WHERE root_schema_name = '%s' AND root_table_name = '%s'"
#define REPLICA_OID_QUERY "SELECT format('%%I.%%I', hr.schema_name, hr.table_name)::regclass::oid \
FROM public.hypertable h \
INNER JOIN public.default_replica_node drn ON (drn.hypertable_name = h.name AND drn.database_name = current_database()) \
INNER JOIN public.hypertable_replica hr ON (hr.replica_id = drn.replica_id AND hr.hypertable_name = drn.hypertable_name) \
WHERE main_schema_name = '%s' AND main_table_name = '%s'"
#include "postgres.h"
#include "optimizer/planner.h"
#include "nodes/nodes.h"
void _PG_init(void);
void _PG_fini(void);
bool IobeamLoaded(void);
char* cmdToString(CmdType cmdType);
Oid get_single_from_oid(Query *parse);
char* get_hypertable_name(Oid relationOid);
char* get_optimized_query_sql(Query *parse, char *hypertableName);
Query *re_parse_optimized(char * sql);
bool change_table_name_walker(Node *node, void *context);
Oid get_replica_oid(Oid mainRelationOid);
#endif /* IOBEAMDB_H */

View File

@ -8,7 +8,7 @@ docker run -d --name iobeamdb -p 5432:5432 -e POSTGRES_DB=test -m 4g \
$IMAGE_NAME postgres \
-csynchronous_commit=off -cwal_writer_delay=1000 \
-cmax_locks_per_transaction=1000 \
-cshared_preload_libraries=pg_stat_statements \
-cshared_preload_libraries=pg_stat_statements,iobeamdb \
-cvacuum_cost_delay=20 -cautovacuum_max_workers=1 \
-clog_autovacuum_min_duration=1000 -cshared_buffers=1GB \
-ceffective_cache_size=3GB -cwork_mem=52428kB \