From 9089fe992a2cd038603f7e1c67dc776b6d576b44 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 22 Dec 2016 16:56:57 -0500 Subject: [PATCH 1/2] make hypertable create transactional --- extension/sql/main/ddl.sql | 10 +++++++--- extension/sql/main/ddl_util.sql | 8 ++++++-- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/extension/sql/main/ddl.sql b/extension/sql/main/ddl.sql index 56d5ca7ee..8dd9fe53f 100644 --- a/extension/sql/main/ddl.sql +++ b/extension/sql/main/ddl.sql @@ -29,11 +29,13 @@ BEGIN INTO STRICT time_field_type FROM pg_attribute WHERE attrelid = main_table AND attname = time_field_name; + PERFORM dblink_connect('meta_conn', get_meta_server_name()); + PERFORM dblink_exec('meta_conn', 'BEGIN'); SELECT (t.r::hypertable).* INTO hypertable_row FROM dblink( - get_meta_server_name(), + 'meta_conn', format('SELECT t FROM _meta.add_hypertable(%L, %L, %L, %L, %L, %L, %L, %L, %L, %L, %L, %L) t ', schema_name, table_name, @@ -53,14 +55,14 @@ BEGIN FROM pg_attribute att WHERE attrelid = main_table AND attnum > 0 AND NOT attisdropped LOOP - PERFORM _sysinternal.create_column_from_attribute(hypertable_row.name, att_row); + PERFORM _sysinternal.create_column_from_attribute(hypertable_row.name, att_row, 'meta_conn'); END LOOP; PERFORM 1 FROM pg_index, LATERAL dblink( - get_meta_server_name(), + 'meta_conn', format('SELECT _meta.add_index(%L, %L,%L, %L, %L)', hypertable_row.name, hypertable_row.main_schema_name, @@ -70,6 +72,8 @@ BEGIN )) AS t(r TEXT) WHERE indrelid = main_table; + PERFORM dblink_exec('meta_conn', 'COMMIT'); + PERFORM dblink_disconnect('meta_conn'); RETURN hypertable_row; END $BODY$; diff --git a/extension/sql/main/ddl_util.sql b/extension/sql/main/ddl_util.sql index 77e11fbcc..8dbcae1cc 100644 --- a/extension/sql/main/ddl_util.sql +++ b/extension/sql/main/ddl_util.sql @@ -49,15 +49,19 @@ $BODY$; --create the hypertable column based on a pg_attribute (table column) entry. CREATE OR REPLACE FUNCTION _sysinternal.create_column_from_attribute( hypertable_name NAME, - att pg_attribute + att pg_attribute, + conn_name TEXT = NULL ) RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS $BODY$ DECLARE BEGIN + IF conn_name IS NULL THEN + conn_name = get_meta_server_name(); + END IF; PERFORM * FROM dblink( - get_meta_server_name(), + conn_name, format('SELECT _meta.add_field(%L, %L, %L, %L, %L, %L, %L, %L)', hypertable_name, att.attname, From 490019d20b41bdacefdeb1ea01f435e1f8a45c84 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Fri, 23 Dec 2016 02:09:51 -0500 Subject: [PATCH 2/2] Use C parse-tree modifications to transform queries on main tables to that on replica tables --- extension/sql/tests/regression/ddl.sql | 7 +- .../sql/tests/regression/expected/ddl.out | 24 +- extension/src/iobeamdb.c | 260 +++++++----------- extension/src/iobeamdb.h | 14 +- scripts/start-pg-docker.sh | 2 +- 5 files changed, 137 insertions(+), 170 deletions(-) diff --git a/extension/sql/tests/regression/ddl.sql b/extension/sql/tests/regression/ddl.sql index 7e8a75bf8..2e59568a1 100644 --- a/extension/sql/tests/regression/ddl.sql +++ b/extension/sql/tests/regression/ddl.sql @@ -44,8 +44,8 @@ SELECT * FROM insert_data('public."Hypertable_1"', 'copy_t'); COMMIT; ---TODO: make following work. ---SELECT * FROM PUBLIC."Hypertable_1"; +SELECT * FROM PUBLIC."Hypertable_1"; +EXPLAIN SELECT * FROM PUBLIC."Hypertable_1"; \d+ PUBLIC."Hypertable_1" \d+ "_sys_1_"."_hyper_1_root" @@ -93,3 +93,6 @@ SELECT * FROM _sys_1_._hyper_1_0_1_distinct_data; \d+ PUBLIC."Hypertable_1" \d+ "_sys_1_"."_hyper_1_root" \d+ _sys_1_._hyper_1_1_0_1_data + +SELECT * FROM PUBLIC."Hypertable_1"; + diff --git a/extension/sql/tests/regression/expected/ddl.out b/extension/sql/tests/regression/expected/ddl.out index 512aa9d1f..f33b548ab 100644 --- a/extension/sql/tests/regression/expected/ddl.out +++ b/extension/sql/tests/regression/expected/ddl.out @@ -106,8 +106,22 @@ FROM insert_data('public."Hypertable_1"', 'copy_t'); (1 row) COMMIT; ---TODO: make following work. ---SELECT * FROM PUBLIC."Hypertable_1"; +SELECT * FROM PUBLIC."Hypertable_1"; + time | Device_id | temp_c | humidity | sensor_1 | sensor_2 | sensor_3 | sensor_4 +---------------------+-----------+--------+----------+----------+----------+----------+---------- + 1257894000000000000 | dev1 | 30 | 70 | 1 | 2 | 3 | 100 +(1 row) + +EXPLAIN SELECT * FROM PUBLIC."Hypertable_1"; + QUERY PLAN +--------------------------------------------------------------------------------- + Append (cost=0.00..27.00 rows=702 width=204) + -> Seq Scan on _hyper_1_0_replica (cost=0.00..0.00 rows=1 width=204) + -> Seq Scan on _hyper_1_1_0_partition (cost=0.00..0.00 rows=1 width=204) + -> Seq Scan on _hyper_1_2_0_partition (cost=0.00..13.50 rows=350 width=204) + -> Seq Scan on _hyper_1_1_0_1_data (cost=0.00..13.50 rows=350 width=204) +(5 rows) + \d+ PUBLIC."Hypertable_1" Table "public.Hypertable_1" Column | Type | Modifiers | Storage | Stats target | Description @@ -325,3 +339,9 @@ Check constraints: "partition" CHECK (get_partition_for_key("Device_id", 32768) >= '0'::smallint AND get_partition_for_key("Device_id", 32768) <= '16383'::smallint) Inherits: _sys_1_._hyper_1_1_0_partition +SELECT * FROM PUBLIC."Hypertable_1"; + time | Device_id | humidity | sensor_1 | sensor_2_renamed | sensor_3_renamed | temp_f | sensor_3 | sensor_4 +---------------------+-----------+----------+----------+------------------+------------------+--------+----------+---------- + 1257894000000000000 | dev1 | 70 | 1 | 2 | 3 | 31 | 131 | 131 +(1 row) + diff --git a/extension/src/iobeamdb.c b/extension/src/iobeamdb.c index b4a0760ba..77e4452e6 100644 --- a/extension/src/iobeamdb.c +++ b/extension/src/iobeamdb.c @@ -1,8 +1,10 @@ #include "postgres.h" #include "catalog/namespace.h" +#include "catalog/pg_namespace.h" #include "optimizer/planner.h" #include "nodes/nodes.h" #include "nodes/print.h" +#include "nodes/nodeFuncs.h" #include "parser/parsetree.h" #include "utils/lsyscache.h" #include "executor/spi.h" @@ -23,21 +25,22 @@ static planner_hook_type prev_planner_hook = NULL; static bool isLoaded = false; PlannedStmt *iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); -void +void _PG_init(void) { - elog(INFO, "iobeamdb loaded"); - prev_planner_hook = planner_hook; - planner_hook = iobeamdb_planner; + elog(INFO, "iobeamdb loaded"); + prev_planner_hook = planner_hook; + planner_hook = iobeamdb_planner; } -void +void _PG_fini(void) { - planner_hook = prev_planner_hook; + planner_hook = prev_planner_hook; } -bool IobeamLoaded(void) +bool +IobeamLoaded(void) { if (!isLoaded) { @@ -50,178 +53,117 @@ bool IobeamLoaded(void) return isLoaded; } -char* -cmdToString(CmdType cmdType) -{ - switch (cmdType) { - case CMD_SELECT: return("select"); - case CMD_UPDATE: return("update"); - case CMD_INSERT: return("insert"); - case CMD_DELETE: return("delete"); - case CMD_UTILITY: return("utility"); - case CMD_UNKNOWN: - default: return("unknown"); - } - - return("unknown"); -} - -PlannedStmt * +PlannedStmt * iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { - PlannedStmt *rv = NULL; + PlannedStmt *rv = NULL; - // elog(INFO, "iobeamdb got called"); - - if (rv != NULL) - return rv; - - if (prev_planner_hook != NULL) { - /* Call any earlier hooks */ - elog(LOG, " calling prev planner-hook"); - rv = (prev_planner_hook)(parse, cursorOptions, boundParams); - } else { - /* Call the standard planner */ - //elog(LOG, " calling standard_planner"); - rv = standard_planner(parse, cursorOptions, boundParams); - } - - if(!IobeamLoaded()) \ + if(IobeamLoaded()) { - return rv; - } else - { - /* - here we have the plan and can start to mess around with it. - */ - - Oid singleFromTable = get_single_from_oid(parse); - - - if (rv->commandType == CMD_SELECT && singleFromTable!= InvalidOid) - { - char *hypertableName = get_hypertable_name(singleFromTable); - if (hypertableName != NULL) - { - Query *optimizedQuery; - char *optimizedSql = get_optimized_query_sql(parse, hypertableName); - pprint(parse); - - optimizedQuery = re_parse_optimized(optimizedSql); - pprint(optimizedQuery); - rv = standard_planner(optimizedQuery, cursorOptions, NULL); - } - } + /* replace call to main table with call to the replica table */ + change_table_name_walker((Node *) parse, NULL); } + + if (prev_planner_hook != NULL) { + /* Call any earlier hooks */ + elog(LOG, " calling prev planner-hook"); + rv = (prev_planner_hook)(parse, cursorOptions, boundParams); + } else { + /* Call the standard planner */ + //elog(LOG, " calling standard_planner"); + rv = standard_planner(parse, cursorOptions, boundParams); + } + return rv; } -Query *re_parse_optimized(char * sql) { - /* - * Parse the SQL string into a list of raw parse trees. - */ - List *raw_parsetree_list = pg_parse_query(sql); - Node *parsetree; - List *querytree_list; - Query * parse; - - if(list_length(raw_parsetree_list) != 1) { - elog(ERROR, "Expected one parsetree"); +/* + * Change all main tables to one of the replicas in the parse tree. + * + */ +bool +change_table_name_walker(Node *node, void *context) +{ + if (node == NULL) + { + return false; } - parsetree = (Node *) linitial(raw_parsetree_list); - querytree_list = pg_analyze_and_rewrite(parsetree, - sql, - NULL, - 0); - - if(list_length(querytree_list) != 1) { - elog(ERROR, "Expected one querytree"); + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; + if (rangeTableEntry->rtekind == RTE_RELATION) + { + Oid replicaOid; + replicaOid = get_replica_oid(rangeTableEntry->relid); + if (replicaOid != InvalidOid) { + rangeTableEntry->relid = replicaOid; + } + } + return false; } - parse = (Query *) linitial(querytree_list); - return parse; + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, change_table_name_walker, + context, QTW_EXAMINE_RTES); + } + + return expression_tree_walker(node, change_table_name_walker, context); } -char* get_optimized_query_sql(Query *parse, char *hypertableName) { - StringInfo codeGenSql = makeStringInfo(); - appendStringInfo(codeGenSql, "SELECT ioql_exec_query_record_sql(new_ioql_query(namespace_name => '%s'))", hypertableName); - char *resultSql = NULL; +/* + * + * Use the default_replica_node to look up the oid for a replica table from the oid of the main table. + * TODO: make this use a cache instead of a db lookup every time. + * + * */ +Oid +get_replica_oid(Oid mainRelationOid) +{ + Oid namespace = get_rel_namespace(mainRelationOid); + //TODO: cache this + Oid hypertable_meta = get_relname_relid("hypertable", get_namespace_oid("public", false)); + char *tableName = get_rel_name(mainRelationOid); + char *schemaName = get_namespace_name(namespace); + StringInfo sql = makeStringInfo(); + int ret; + Oid replicaOid = InvalidOid; + + /* prevents infinite recursion, don't check hypertable meta tables */ + if ( + hypertable_meta == InvalidOid + || namespace == PG_CATALOG_NAMESPACE + || mainRelationOid == hypertable_meta + || mainRelationOid == get_relname_relid("hypertable_replica", get_namespace_oid("public", false)) + || mainRelationOid == get_relname_relid("default_replica_node", get_namespace_oid("public", false)) + ) + { + return InvalidOid; + } + + appendStringInfo(sql, REPLICA_OID_QUERY, schemaName, tableName); SPI_connect(); - if (SPI_exec(codeGenSql->data, 1) <= 0) { + + ret = SPI_exec(sql->data, 1); + + if (ret <= 0) + { elog(ERROR, "Got an SPI error"); } - if(SPI_processed != 1) { - elog(ERROR, "Didn't get 1 row in code gen"); - SPI_finish(); - return NULL; - } - resultSql = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); - elog(LOG, "The optimized query is %s", resultSql); - SPI_finish(); - return resultSql; -} - - -Oid get_single_from_oid(Query *parse) { - /* one from entry which is to a table that is a hypertable */ - if (list_length(parse->jointree->fromlist)==1) + if(SPI_processed == 1) { - Node *jtnode = (Node *) linitial(parse->jointree->fromlist); - if (IsA(jtnode, RangeTblRef)) - { - int varno = ((RangeTblRef *) jtnode)->rtindex; - RangeTblEntry *rte = rt_fetch(varno, parse->rtable); - if (rte->rtekind == RTE_RELATION) - { - Oid relationOid = rte->relid; - return relationOid; - } - } + bool isnull; + Datum res; + + TupleDesc tupdesc = SPI_tuptable->tupdesc; + HeapTuple tuple = SPI_tuptable->vals[0]; + res = SPI_getbinval(tuple, tupdesc, 1, &isnull); + replicaOid = DatumGetObjectId(res); } - return InvalidOid; + SPI_finish(); + return replicaOid; } - -char * get_hypertable_name(Oid relationOid) { - Oid namespace = get_rel_namespace(relationOid); - //todo cache this - Oid hypertable_meta = get_relname_relid("hypertable", get_namespace_oid("public", false)); - char *tableName = get_rel_name(relationOid); - char *schemaName = get_namespace_name(namespace); - StringInfo sql = makeStringInfo(); - int ret; - uint64 results; - char *name = NULL; - - /* prevents infinite recursion, don't check hypertable meta table */ - if (hypertable_meta == relationOid) { - return NULL; - } - - appendStringInfo(sql, HYPERTABLE_NAME_QUERY, schemaName, tableName); - - - SPI_connect(); - - ret = SPI_exec(sql->data, 1); - - - if (ret <= 0) { - elog(ERROR, "Got an SPI error"); - } - - results = SPI_processed; - - - if(results == 1) { - TupleDesc tupdesc = SPI_tuptable->tupdesc; - HeapTuple tuple = SPI_tuptable->vals[0]; - name = SPI_getvalue(tuple, tupdesc, 1); - elog(LOG, "Is a hypertable query for %s %s.%s", name, schemaName, tableName); - } - SPI_finish(); - return name; -} diff --git a/extension/src/iobeamdb.h b/extension/src/iobeamdb.h index 8490e7714..7ffb10e70 100644 --- a/extension/src/iobeamdb.h +++ b/extension/src/iobeamdb.h @@ -3,20 +3,22 @@ #define HYPERTABLE_NAME_QUERY "SELECT name FROM hypertable WHERE root_schema_name = '%s' AND root_table_name = '%s'" +#define REPLICA_OID_QUERY "SELECT format('%%I.%%I', hr.schema_name, hr.table_name)::regclass::oid \ + FROM public.hypertable h \ + INNER JOIN public.default_replica_node drn ON (drn.hypertable_name = h.name AND drn.database_name = current_database()) \ + INNER JOIN public.hypertable_replica hr ON (hr.replica_id = drn.replica_id AND hr.hypertable_name = drn.hypertable_name) \ + WHERE main_schema_name = '%s' AND main_table_name = '%s'" + #include "postgres.h" #include "optimizer/planner.h" #include "nodes/nodes.h" - void _PG_init(void); void _PG_fini(void); bool IobeamLoaded(void); -char* cmdToString(CmdType cmdType); -Oid get_single_from_oid(Query *parse); -char* get_hypertable_name(Oid relationOid); -char* get_optimized_query_sql(Query *parse, char *hypertableName); -Query *re_parse_optimized(char * sql); +bool change_table_name_walker(Node *node, void *context); +Oid get_replica_oid(Oid mainRelationOid); #endif /* IOBEAMDB_H */ diff --git a/scripts/start-pg-docker.sh b/scripts/start-pg-docker.sh index d3e5b5bc6..d7bca658e 100755 --- a/scripts/start-pg-docker.sh +++ b/scripts/start-pg-docker.sh @@ -8,7 +8,7 @@ docker run -d --name iobeamdb -p 5432:5432 -e POSTGRES_DB=test -m 4g \ $IMAGE_NAME postgres \ -csynchronous_commit=off -cwal_writer_delay=1000 \ -cmax_locks_per_transaction=1000 \ - -cshared_preload_libraries=pg_stat_statements \ + -cshared_preload_libraries=pg_stat_statements,iobeamdb \ -cvacuum_cost_delay=20 -cautovacuum_max_workers=1 \ -clog_autovacuum_min_duration=1000 -cshared_buffers=1GB \ -ceffective_cache_size=3GB -cwork_mem=52428kB \