diff --git a/.gitignore b/.gitignore index 00cee43d0..395ab02c7 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ /src/*.so /.vscode/ /iobeamdb.so -*.bak \ No newline at end of file +*.bak +typedef.list diff --git a/Makefile b/Makefile index 3eabe6717..44d0e65f1 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,18 @@ EXT_SQL_FILE = sql/$(EXTENSION)--$(EXT_VERSION).sql DATA = $(EXT_SQL_FILE) MODULE_big = $(EXTENSION) -SRCS = src/iobeamdb.c src/murmur3.c src/pgmurmur3.c src/utils.c +SRCS = \ + src/iobeamdb.c \ + src/murmur3.c \ + src/pgmurmur3.c \ + src/utils.c \ + src/metadata_queries.c \ + src/cache.c \ + src/cache_invalidate.c \ + src/hypertable_cache.c \ + src/chunk_cache.c \ + src/insert.c + OBJS = $(SRCS:.c=.o) MKFILE_PATH := $(abspath $(MAKEFILE_LIST)) @@ -58,5 +69,12 @@ package: clean $(EXT_SQL_FILE) $(install_sh) -m 644 $(EXTENSION).control 'package/extension/' $(install_sh) -m 644 $(EXT_SQL_FILE) 'package/extension/' +typedef.list: clean $(OBJS) + ./generate_typedef.sh + +pgindent: typedef.list + pgindent --typedef=typedef.list + + .PHONY: check-sql-files all diff --git a/generate_typedef.sh b/generate_typedef.sh new file mode 100755 index 000000000..f54a0cdcc --- /dev/null +++ b/generate_typedef.sh @@ -0,0 +1,8 @@ +#!/bin/bash +gobjdump -W src/*.o |egrep -A3 DW_TAG_typedef |perl -e 'while (<>) { chomp; @flds = split;next unless (1 < @flds);\ + next if $flds[0] ne "DW_AT_name" && $flds[1] ne "DW_AT_name";\ + next if $flds[-1] =~ /^DW_FORM_str/;\ + print $flds[-1],"\n"; }' |sort |uniq > typedef.list.local +wget -q -O - "http://www.pgbuildfarm.org/cgi-bin/typedefs.pl?branch=HEAD" |\ + cat - typedef.list.local | sort | uniq > typedef.list +rm typedef.list.local diff --git a/sql/common/cache.sql b/sql/common/cache.sql new file mode 100644 index 000000000..079b6b837 --- /dev/null +++ b/sql/common/cache.sql @@ -0,0 +1,25 @@ +-- cache invalidation proxy table +CREATE TABLE _iobeamdb_cache.cache_inval_hypertable(); +CREATE TABLE _iobeamdb_cache.cache_inval_chunk(); + +--not actually strictly needed but good for sanity as all tables should be dumped. +SELECT pg_catalog.pg_extension_config_dump('_iobeamdb_cache.cache_inval_hypertable', ''); +SELECT pg_catalog.pg_extension_config_dump('_iobeamdb_cache.cache_inval_chunk', ''); + +CREATE OR REPLACE FUNCTION _iobeamdb_cache.invalidate_relcache_trigger() + RETURNS TRIGGER AS '$libdir/iobeamdb', 'invalidate_relcache_trigger' LANGUAGE C; + +CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.hypertable +FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_hypertable'); + +CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.partition_epoch +FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_hypertable'); + +CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.partition +FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_hypertable'); + +CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.chunk +FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_chunk'); + +CREATE TRIGGER "0_cache_inval_1" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.chunk_replica_node +FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_chunk'); diff --git a/sql/common/cluster_setup_functions.sql b/sql/common/cluster_setup_functions.sql index 1f313c3dd..d932aa9ab 100644 --- a/sql/common/cluster_setup_functions.sql +++ b/sql/common/cluster_setup_functions.sql @@ -1,7 +1,4 @@ -CREATE SCHEMA IF NOT EXISTS _iobeamdb_internal; -CREATE SCHEMA IF NOT EXISTS _iobeamdb_meta; -CREATE SCHEMA IF NOT EXISTS _iobeamdb_meta_api; -CREATE SCHEMA IF NOT EXISTS _iobeamdb_data_api; + CREATE OR REPLACE FUNCTION _iobeamdb_internal.create_user_mapping( cluster_user_row _iobeamdb_catalog.cluster_user, diff --git a/sql/common/schemas.sql b/sql/common/schemas.sql new file mode 100644 index 000000000..1c63e0b98 --- /dev/null +++ b/sql/common/schemas.sql @@ -0,0 +1,6 @@ +CREATE SCHEMA IF NOT EXISTS _iobeamdb_catalog; +CREATE SCHEMA IF NOT EXISTS _iobeamdb_internal; +CREATE SCHEMA IF NOT EXISTS _iobeamdb_meta; +CREATE SCHEMA IF NOT EXISTS _iobeamdb_meta_api; +CREATE SCHEMA IF NOT EXISTS _iobeamdb_data_api; +CREATE SCHEMA IF NOT EXISTS _iobeamdb_cache; diff --git a/sql/common/tables.sql b/sql/common/tables.sql index 4754aa165..cc5c41dcc 100644 --- a/sql/common/tables.sql +++ b/sql/common/tables.sql @@ -134,6 +134,7 @@ CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.partition_epoch ( CHECK (num_partitions <= partitioning_mod), CHECK ((partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR (partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)) ); +CREATE INDEX ON _iobeamdb_catalog.partition_epoch(hypertable_id, start_time, end_time); SELECT pg_catalog.pg_extension_config_dump('_iobeamdb_catalog.partition_epoch', ''); SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_iobeamdb_catalog.partition_epoch','id'), ''); diff --git a/sql/common/types.sql b/sql/common/types.sql index 88f79ad35..7862f5dfb 100644 --- a/sql/common/types.sql +++ b/sql/common/types.sql @@ -1,4 +1,3 @@ -CREATE SCHEMA IF NOT EXISTS _iobeamdb_catalog; CREATE TYPE _iobeamdb_catalog.chunk_placement_type AS ENUM ('RANDOM', 'STICKY'); diff --git a/sql/load_order.txt b/sql/load_order.txt index e53ac64f7..c32bec8bd 100644 --- a/sql/load_order.txt +++ b/sql/load_order.txt @@ -1,5 +1,7 @@ +sql/common/schemas.sql sql/common/types.sql sql/common/tables.sql +sql/common/cache.sql sql/common/cluster_setup_functions.sql sql/common/errors.sql sql/common/chunk.sql diff --git a/sql/main/hypertable_triggers.sql b/sql/main/hypertable_triggers.sql index ec0ece215..3ba299268 100644 --- a/sql/main/hypertable_triggers.sql +++ b/sql/main/hypertable_triggers.sql @@ -11,7 +11,7 @@ BEGIN IF TG_OP = 'UPDATE' THEN RAISE EXCEPTION 'UPDATE ONLY not supported on hypertables' USING ERRCODE = 'IO101'; - ELSIF TG_OP = 'DELETE' AND current_setting('io.ignore_delete_in_trigger', true) <> 'true' THEN + ELSIF TG_OP = 'DELETE' AND current_setting('io.ignore_delete_in_trigger', true) IS DISTINCT FROM 'true' THEN RAISE EXCEPTION 'DELETE ONLY not currently supported on hypertables' USING ERRCODE = 'IO101'; END IF; @@ -62,8 +62,6 @@ BEGIN CREATE TRIGGER insert_trigger AFTER INSERT ON %I.%I FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_internal.on_modify_main_table(); $$, NEW.schema_name, NEW.table_name); - - RETURN NEW; END IF; diff --git a/sql/main/insert.sql b/sql/main/insert.sql index 6559c78be..10645bd69 100644 --- a/sql/main/insert.sql +++ b/sql/main/insert.sql @@ -1,236 +1,11 @@ -- This file contains functions that aid in inserting data into a hypertable. --- Get a comma-separated list of columns in a hypertable. -CREATE OR REPLACE FUNCTION _iobeamdb_internal.get_column_list( - hypertable_id INTEGER -) - RETURNS TEXT LANGUAGE SQL STABLE AS -$BODY$ -SELECT array_to_string(_iobeamdb_internal.get_quoted_column_names(hypertable_id), ', ') -$BODY$; +CREATE OR REPLACE FUNCTION _iobeamdb_internal.insert_trigger_on_copy_table_c() + RETURNS TRIGGER AS '$libdir/iobeamdb', 'insert_trigger_on_copy_table_c' LANGUAGE C; --- Gets the partition ID of a given epoch and data row. --- --- epoch - The epoch whose partition ID we want --- copy_record - Record/row from a table --- copy_table_name - Name of the relation to cast the record to. -CREATE OR REPLACE FUNCTION _iobeamdb_internal.get_partition_for_epoch_row( - epoch _iobeamdb_catalog.partition_epoch, - copy_record anyelement, - copy_table_name TEXT -) - RETURNS _iobeamdb_catalog.partition LANGUAGE PLPGSQL STABLE AS -$BODY$ -DECLARE - partition_row _iobeamdb_catalog.partition; -BEGIN - IF epoch.partitioning_func IS NULL THEN - SELECT p.* - FROM _iobeamdb_catalog.partition p - WHERE p.epoch_id = epoch.id - INTO STRICT partition_row; - ELSE - EXECUTE format( - $$ - SELECT p.* - FROM _iobeamdb_catalog.partition p - WHERE p.epoch_id = %L AND - %I.%I((SELECT row.%I FROM (SELECT (%L::%s).*) as row)::TEXT, %L) - BETWEEN p.keyspace_start AND p.keyspace_end - $$, - epoch.id, epoch.partitioning_func_schema, epoch.partitioning_func, - epoch.partitioning_column, - copy_record, copy_table_name, epoch.partitioning_mod) - INTO STRICT partition_row; - END IF; - RETURN partition_row; -END -$BODY$; --- Gets the value of the time column from a given row. --- --- column_name - Name of time column to fetch --- column_type - Type of the time record --- copy_record - Record/row from a table --- copy_table_name - Name of the relation to cast the record to -CREATE OR REPLACE FUNCTION _iobeamdb_internal.get_time_column_from_record( - column_name NAME, - column_type REGTYPE, - copy_record anyelement, - copy_table_name TEXT -) - RETURNS bigint LANGUAGE PLPGSQL STABLE AS -$BODY$ -DECLARE - t bigint; -BEGIN - EXECUTE format( - $$ - SELECT %s FROM (SELECT (%L::%s).*) as row LIMIT 1 - $$, _iobeamdb_internal.extract_time_sql(format('row.%I', column_name), column_type), copy_record, copy_table_name) - INTO STRICT t; - RETURN t; -END -$BODY$; --- Inserts rows from a (temporary) table into correct hypertable child tables. --- --- In typical use case, the copy_table_oid is the OID of a hypertable's main --- table. This allows users to use normal SQL INSERT calls on the main table, --- and a trigger that executes after the statement will call this function to --- place the data appropriately. --- --- hypertable_id - ID of the hypertable the data belongs to --- copy_table_oid - OID of the table to fetch rows from -CREATE OR REPLACE FUNCTION _iobeamdb_internal.insert_data( - hypertable_id INTEGER, - copy_table_oid REGCLASS -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -DECLARE - point_record_query_sql TEXT; - point_record RECORD; - chunk_row _iobeamdb_catalog.chunk; - chunk_id INT; - crn_record RECORD; - hypertable_row RECORD; - partition_constraint_where_clause TEXT = ''; - column_list TEXT = ''; - insert_sql TEXT =''; -BEGIN - --This guard protects against calling insert_data() twice in the same transaction, - --which might otherwise cause a deadlock in case the second insert_data() involves a chunk - --that was inserted into in the first call to insert_data(). - --This is a temporary safe guard that should ideally be removed once chunk management - --has been refactored and improved to avoid such deadlocks. - --NOTE: In its current form, this safe guard unfortunately prohibits transactions - --involving INSERTs on two different hypertables. - IF current_setting('io.insert_data_guard', true) = 'on' THEN - RAISE EXCEPTION 'insert_data() can only be called once per transaction'; - END IF; - PERFORM set_config('io.insert_data_guard', 'on', true); - PERFORM set_config('io.ignore_delete_in_trigger', 'true', true); - SELECT * INTO hypertable_row FROM _iobeamdb_catalog.hypertable h WHERE h.id = hypertable_id; - column_list := _iobeamdb_internal.get_column_list(hypertable_id); - point_record_query_sql := format( - $$ - SELECT %3$s AS time, - p.id AS partition_id, p.keyspace_start, p.keyspace_end, - pe.partitioning_func_schema, pe.partitioning_func, pe.partitioning_column, pe.partitioning_mod - FROM (SELECT * FROM ONLY %1$s LIMIT 1) ct - LEFT JOIN _iobeamdb_catalog.partition_epoch pe ON ( - pe.hypertable_id = %2$L AND - (pe.start_time <= %3$s OR pe.start_time IS NULL) AND - (pe.end_time >= %3$s OR pe.end_time IS NULL) - ) - LEFT JOIN _iobeamdb_internal.get_partition_for_epoch_row(pe, ct::%1$s, '%1$s') AS p ON(true) - $$, copy_table_oid, hypertable_id, - _iobeamdb_internal.extract_time_sql(format('ct.%I', hypertable_row.time_column_name), hypertable_row.time_column_type)); - - --can be inserting empty set so not strict. - EXECUTE point_record_query_sql - INTO point_record; - - IF point_record.time IS NOT NULL AND point_record.partition_id IS NULL THEN - RAISE EXCEPTION 'Should never happen: could not find partition for insert' - USING ERRCODE = 'IO501'; - END IF; - - --Create a temp table to collect all the chunks we insert into. We might - --need to close the chunks at the end of the transaction. - CREATE TEMP TABLE IF NOT EXISTS insert_chunks(LIKE _iobeamdb_catalog.chunk) ON COMMIT DROP; - - --We need to truncate the table if it already existed due to calling this - --function twice in a single transaction. - TRUNCATE TABLE insert_chunks; - - WHILE point_record.time IS NOT NULL LOOP - --Get a chunk with SHARE lock - INSERT INTO insert_chunks - SELECT * FROM get_or_create_chunk(point_record.partition_id, point_record.time, TRUE) - RETURNING * INTO chunk_row; - - IF point_record.partitioning_column IS NOT NULL THEN - --if we are inserting across more than one partition, - --construct a WHERE clause constraint that SELECTs only - --values from copy table that match the current partition - partition_constraint_where_clause := format( - $$ - WHERE (%1$I >= %2$s OR %2$s IS NULL) AND (%1$I <= %3$s OR %3$s IS NULL) AND - (%4$I.%5$I(%6$I::TEXT, %7$L) BETWEEN %8$L AND %9$L) - $$, - hypertable_row.time_column_name, - _iobeamdb_internal.time_literal_sql(chunk_row.start_time, hypertable_row.time_column_type), - _iobeamdb_internal.time_literal_sql(chunk_row.end_time, hypertable_row.time_column_type), - point_record.partitioning_func_schema, - point_record.partitioning_func, - point_record.partitioning_column, - point_record.partitioning_mod, - point_record.keyspace_start, - point_record.keyspace_end - ); - END IF; - - --Do insert on all chunk replicas - SELECT string_agg(insert_stmt, ',') - INTO insert_sql - FROM ( - SELECT format('i_%s AS (INSERT INTO %I.%I (%s) SELECT * FROM selected)', - row_number() OVER(), crn.schema_name, crn.table_name, column_list) insert_stmt - FROM _iobeamdb_catalog.chunk_replica_node crn - WHERE (crn.chunk_id = chunk_row.id) - ) AS parts; - - EXECUTE format( - $$ - WITH selected AS - ( - DELETE FROM ONLY %1$s %2$s - RETURNING %4$s - ), - %3$s - SELECT 1 - $$, - copy_table_oid, - partition_constraint_where_clause, - insert_sql, - column_list - ); - - EXECUTE point_record_query_sql - INTO point_record; - - IF point_record.time IS NOT NULL AND point_record.partition_id IS NULL THEN - RAISE EXCEPTION 'Should never happen: could not find partition for insert' - USING ERRCODE = 'IO501'; - END IF; - END LOOP; - - --Loop through all open chunks that were inserted into, closing - --if needed. Do it in ID order to avoid deadlocks. - FOR chunk_id IN - SELECT c.id FROM insert_chunks cl - INNER JOIN _iobeamdb_catalog.chunk c ON cl.id = c.id - WHERE c.end_time IS NULL ORDER BY cl.id DESC - LOOP - PERFORM _iobeamdb_internal.close_chunk_if_needed(chunk_id); - END LOOP; -END -$BODY$; - -CREATE OR REPLACE FUNCTION _iobeamdb_internal.insert_trigger_on_copy_table() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -BEGIN - EXECUTE format( - $$ - SELECT _iobeamdb_internal.insert_data(%1$L, %2$L) - $$, TG_ARGV[0], TG_RELID); - RETURN NEW; -END -$BODY$; diff --git a/sql/main/partitioning.sql b/sql/main/partitioning.sql index 284652f71..dae6d5645 100644 --- a/sql/main/partitioning.sql +++ b/sql/main/partitioning.sql @@ -1,7 +1,2 @@ -CREATE OR REPLACE FUNCTION _iobeamdb_catalog.get_partition_for_key( - key TEXT, - mod_factor INT -) - RETURNS SMALLINT LANGUAGE SQL IMMUTABLE STRICT AS $$ -SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT; -$$; +CREATE OR REPLACE FUNCTION _iobeamdb_catalog.get_partition_for_key(text, int) RETURNS smallint + AS '$libdir/iobeamdb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT; diff --git a/src/cache.c b/src/cache.c new file mode 100644 index 000000000..3edb8ff73 --- /dev/null +++ b/src/cache.c @@ -0,0 +1,80 @@ +#include "cache.h" + +void +cache_init(Cache *cache) +{ + if (cache->htab != NULL) + { + elog(ERROR, "Cache %s is already initialized", cache->name); + return; + } + + if (cache->hctl.hcxt == NULL) + { + cache->hctl.hcxt = AllocSetContextCreate(CacheMemoryContext, + cache->name, + ALLOCSET_DEFAULT_SIZES); + } + + cache->htab = hash_create(cache->name, cache->numelements, + &cache->hctl, cache->flags); +} + +void +cache_invalidate(Cache *cache) +{ + if (cache->htab == NULL) + return; + + if (cache->pre_invalidate_hook != NULL) + cache->pre_invalidate_hook(cache); + + hash_destroy(cache->htab); + cache->htab = NULL; + MemoryContextDelete(cache->hctl.hcxt); + cache->hctl.hcxt = NULL; + + if (cache->post_invalidate_hook != NULL) + cache->post_invalidate_hook(cache); +} + +MemoryContext +cache_memory_ctx(Cache *cache) +{ + return cache->hctl.hcxt; +} + +MemoryContext +cache_switch_to_memory_context(Cache *cache) +{ + return MemoryContextSwitchTo(cache->hctl.hcxt); +} + +void * +cache_fetch(Cache *cache, CacheQueryCtx *ctx) +{ + bool found; + + if (cache->htab == NULL) + { + elog(ERROR, "Hash %s not initialized", cache->name); + } + + ctx->entry = hash_search(cache->htab, cache->get_key(ctx), HASH_ENTER, &found); + + if (!found && cache->create_entry != NULL) + { + MemoryContext old = cache_switch_to_memory_context(cache); + + ctx->entry = cache->create_entry(cache, ctx); + MemoryContextSwitchTo(old); + } + else if (found && cache->update_entry != NULL) + { + /* Switch memory context here? */ + /* MemoryContext old = cache_switch_to_memory_context(cache); */ + ctx->entry = cache->update_entry(cache, ctx); + /* MemoryContextSwitchTo(old); */ + } + return ctx->entry; +} diff --git a/src/cache.h b/src/cache.h new file mode 100644 index 000000000..90b53ea4f --- /dev/null +++ b/src/cache.h @@ -0,0 +1,35 @@ +#ifndef _IOBEAMDB_CACHE_H_ +#define _IOBEAMDB_CACHE_H_ + +#include +#include +#include + +typedef struct CacheQueryCtx +{ + void *entry; + void *private[0]; +} CacheQueryCtx; + +typedef struct Cache +{ + HASHCTL hctl; + HTAB *htab; + const char *name; + long numelements; + int flags; + void *(*get_key) (struct CacheQueryCtx *); + void *(*create_entry) (struct Cache *, CacheQueryCtx *); + void *(*update_entry) (struct Cache *, CacheQueryCtx *); + void (*pre_invalidate_hook) (struct Cache *); + void (*post_invalidate_hook) (struct Cache *); +} Cache; + +extern void cache_init(Cache *cache); +extern void cache_invalidate(Cache *cache); +extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx); + +extern MemoryContext cache_memory_ctx(Cache *cache); +extern MemoryContext cache_switch_to_memory_context(Cache *cache); + +#endif /* _IOBEAMDB_CACHE_H_ */ diff --git a/src/cache_invalidate.c b/src/cache_invalidate.c new file mode 100644 index 000000000..2e975cef5 --- /dev/null +++ b/src/cache_invalidate.c @@ -0,0 +1,115 @@ +/* + * Notes on the way caching works: Since out caches are stored in per-process + * (per-backend memory), we have to have a way to propagate invalidation + * messages to all backends, for that we use the Postgres relcache + * mechanism. Relcache is a cache that keeps internal info about + * relations(tables). Postgres has a mechanism for registering for relcache + * invalidation events that are propagated to all backends: + * CacheRegisterRelcacheCallback(). We register inval_cache_callback() with + * this mechanism and route all invalidation messages through it to the correct + * cache invalidation functions. + * + * The plan for our caches is to use (abuse) this mechanism to serve as a + * notify to invalidate our caches. Thus, we create proxy tables for each + * cache we use and attach the invalidate_relcache_trigger trigger to all + * tables whose changes should invalidate the cache. This trigger will + * invalidate the relcache for the proxy table specified as the first argument + * to the trigger (see cache.sql). + */ + +#include +#include +#include +#include +#include +#include +#include + +#include "iobeamdb.h" +#include "hypertable_cache.h" +#include "chunk_cache.h" + +#define CACHE_INVAL_PROXY_SCHEMA "_iobeamdb_cache" +#define CACHE_INVAL_PROXY_SCHEMA_OID get_namespace_oid(CACHE_INVAL_PROXY_SCHEMA, false) + +void _cache_invalidate_init(void); +void _cache_invalidate_fini(void); +void _cache_invalidate_extload(void); +Datum invalidate_relcache_trigger(PG_FUNCTION_ARGS); + +PG_FUNCTION_INFO_V1(invalidate_relcache_trigger); + + +static Oid hypertable_cache_inval_proxy_oid = InvalidOid; +static Oid chunk_cache_inval_proxy_oid = InvalidOid; + + +/* + * This function is called when any relcache is invalidated. + * Should route the invalidation to the correct cache. + */ +static void +inval_cache_callback(Datum arg, Oid relid) +{ + /* TODO: look at IsTransactionState should this be necessary? */ + if (!IobeamLoaded()) + return; + + if (relid == InvalidOid || relid == hypertable_cache_inval_proxy_oid) + invalidate_hypertable_cache_callback(); + + if (relid == InvalidOid || relid == chunk_cache_inval_proxy_oid) + invalidate_chunk_cache_callback(); +} + +/* + * This trigger causes the relcache for the cache_inval_proxy table (passed in + * as arg 0) to be invalidated. It should be called to invalidate the caches + * associated with a proxy table (usually each cache has it's own proxy table) + * This function is attached to the right tables in common/cache.sql + * + */ +Datum +invalidate_relcache_trigger(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + char *inval_proxy_name_arg; + Oid proxy_oid; + + if (!CALLED_AS_TRIGGER(fcinfo)) + elog(ERROR, "not called by trigger manager"); + + /* arg 0 = relid of the cache_inval_proxy table */ + inval_proxy_name_arg = trigdata->tg_trigger->tgargs[0]; + proxy_oid = get_relname_relid(inval_proxy_name_arg, get_namespace_oid(CACHE_INVAL_PROXY_SCHEMA, false)); + CacheInvalidateRelcacheByRelid(proxy_oid); + + /* tuple to return to executor */ + if (TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) + return PointerGetDatum(trigdata->tg_newtuple); + else + return PointerGetDatum(trigdata->tg_trigtuple); +} + + +void +_cache_invalidate_init(void) +{ + CacheRegisterRelcacheCallback(inval_cache_callback, PointerGetDatum(NULL)); +} + +void +_cache_invalidate_fini(void) +{ + /* No way to unregister relcache callback */ + hypertable_cache_inval_proxy_oid = InvalidOid; + chunk_cache_inval_proxy_oid = InvalidOid; +} + + +void +_cache_invalidate_extload(void) +{ + hypertable_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID); + chunk_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID); +} diff --git a/src/chunk_cache.c b/src/chunk_cache.c new file mode 100644 index 000000000..e0c4dcfbe --- /dev/null +++ b/src/chunk_cache.c @@ -0,0 +1,241 @@ +#include +#include +#include + +#include "chunk_cache.h" +#include "cache.h" +#include "hypertable_cache.h" +#include "utils.h" +#include "metadata_queries.h" + + +/* + * Chunk Insert Plan Cache: + * + * Hashtable of chunk_id => chunk_insert_plan_htable_entry. + * + * This cache stores plans for the execution of the command for moving stuff + * from the copy table to the tables associated with the chunk. + * + * Retrieval: each chunk has one associated plan. If the chunk's start/end time + * changes then the old plan is freed and a new plan is regenerated + * + * NOTE: chunks themselves do not have a cache since they need to be locked for + * each insert anyway... + * + */ +typedef struct chunk_insert_plan_htable_entry +{ + int32 chunk_id; + int64 start_time; + int64 end_time; + SPIPlanPtr move_from_copyt_plan; +} chunk_insert_plan_htable_entry; + +typedef struct ChunkCacheQueryCtx +{ + CacheQueryCtx cctx; + hypertable_cache_entry *hci; + epoch_and_partitions_set *pe_entry; + partition_info *part; + int32 chunk_id; + int64 chunk_start_time; + int64 chunk_end_time; +} ChunkCacheQueryCtx; + +static void * +chunk_insert_plan_cache_get_key(CacheQueryCtx *ctx) +{ + return &((ChunkCacheQueryCtx *) ctx)->chunk_id; +} + +static void *chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); +static void *chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); + +static void chunk_insert_plan_cache_pre_invalidate(Cache *cache); +static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx); + +static Cache chunk_insert_plan_cache = { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(chunk_insert_plan_htable_entry), + .hcxt = NULL, + }, + .htab = NULL, + .name = CHUNK_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = chunk_insert_plan_cache_get_key, + .create_entry = chunk_insert_plan_cache_create_entry, + .update_entry = chunk_insert_plan_cache_update_entry, + .pre_invalidate_hook = chunk_insert_plan_cache_pre_invalidate, + .post_invalidate_hook = cache_init, +}; + +static void +chunk_insert_plan_cache_pre_invalidate(Cache *cache) +{ + chunk_insert_plan_htable_entry *entry; + HASH_SEQ_STATUS scan; + + hash_seq_init(&scan, cache->htab); + + while ((entry = hash_seq_search(&scan))) + { + SPI_freeplan(entry->move_from_copyt_plan); + } +} + +static void * +chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) +{ + ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; + chunk_insert_plan_htable_entry *pe = ctx->entry; + char *insert_sql; + + insert_sql = get_copy_table_insert_sql(cctx); + pe->chunk_id = cctx->chunk_id; + pe->start_time = cctx->chunk_start_time; + pe->end_time = cctx->chunk_end_time; + pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); + + return pe; +} + +static void * +chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) +{ + ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; + chunk_insert_plan_htable_entry *pe = ctx->entry; + char *insert_sql; + + if (pe->start_time == cctx->chunk_start_time && + pe->end_time == cctx->chunk_end_time) + return pe; + + insert_sql = get_copy_table_insert_sql(cctx); + SPI_freeplan(pe->move_from_copyt_plan); + pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); + + return pe; +} + +void +invalidate_chunk_cache_callback(void) +{ + CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache"); + cache_invalidate(&chunk_insert_plan_cache); +} + +static chunk_insert_plan_htable_entry * +get_chunk_insert_plan_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, + partition_info *part, int32 chunk_id, int64 chunk_start_time, + int64 chunk_end_time) +{ + ChunkCacheQueryCtx ctx = { + .hci = hci, + .pe_entry = pe_entry, + .part = part, + .chunk_id = chunk_id, + .chunk_start_time = chunk_start_time, + .chunk_end_time = chunk_end_time, + }; + + return cache_fetch(&chunk_insert_plan_cache, &ctx.cctx); +} + +chunk_cache_entry * +get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, + partition_info *part, int64 time_pt, bool lock) +{ + chunk_insert_plan_htable_entry *move_plan; + + chunk_row *chunk = fetch_chunk_row(NULL, part->id, time_pt, lock); + chunk_cache_entry *entry = palloc(sizeof(chunk_cache_entry)); + + entry->chunk = chunk; + entry->id = chunk->id; + move_plan = get_chunk_insert_plan_cache_entry(hci, pe_entry, part, chunk->id, + chunk->start_time, chunk->end_time); + entry->move_from_copyt_plan = move_plan->move_from_copyt_plan; + return entry; +} + +static char * +get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx) +{ + StringInfo where_clause = makeStringInfo(); + StringInfo insert_clauses = makeStringInfo(); + StringInfo sql_insert = makeStringInfo(); + ListCell *cell; + int i; + crn_set *crn = fetch_crn_set(NULL, ctx->chunk_id); + + appendStringInfo(where_clause, "WHERE TRUE"); + + if (ctx->pe_entry->partitioning_func != NULL) + { + appendStringInfo(where_clause, " AND (%s.%s(%s::TEXT, %d) BETWEEN %d AND %d)", + quote_identifier(ctx->pe_entry->partitioning_func_schema->data), + quote_identifier(ctx->pe_entry->partitioning_func->data), + quote_identifier(ctx->pe_entry->partitioning_column->data), + ctx->pe_entry->partitioning_mod, + ctx->part->keyspace_start, + ctx->part->keyspace_end); + } + + + if (ctx->chunk_start_time != OPEN_START_TIME) + { + appendStringInfo(where_clause, " AND (%1$s >= %2$s) ", + quote_identifier(ctx->hci->info->time_column_name.data), + internal_time_to_column_literal_sql(ctx->chunk_start_time, + ctx->hci->info->time_column_type)); + } + + if (ctx->chunk_end_time != OPEN_END_TIME) + { + appendStringInfo(where_clause, " AND (%1$s <= %2$s) ", + quote_identifier(ctx->hci->info->time_column_name.data), + internal_time_to_column_literal_sql(ctx->chunk_end_time, + ctx->hci->info->time_column_type)); + } + + i = 0; + foreach(cell, crn->tables) + { + crn_row *tab = lfirst(cell); + + i = i + 1; + appendStringInfo(insert_clauses, "i_%d AS (INSERT INTO %s.%s SELECT * FROM selected)", + i, + quote_identifier(tab->schema_name.data), + quote_identifier(tab->table_name.data) + ); + } + pfree(crn); + crn = NULL; + + appendStringInfo(sql_insert, "\ + WITH selected AS ( DELETE FROM ONLY %1$s %2$s RETURNING * ), \ + %3$s \ + SELECT 1", copy_table_name(ctx->hci->id), + where_clause->data, + insert_clauses->data); + + return sql_insert->data; +} + +void +_chunk_cache_init(void) +{ + CreateCacheMemoryContext(); + cache_init(&chunk_insert_plan_cache); +} + +void +_chunk_cache_fini(void) +{ + chunk_insert_plan_cache.post_invalidate_hook = NULL; + cache_invalidate(&chunk_insert_plan_cache); +} diff --git a/src/chunk_cache.h b/src/chunk_cache.h new file mode 100644 index 000000000..b7a648544 --- /dev/null +++ b/src/chunk_cache.h @@ -0,0 +1,31 @@ +#ifndef IOBEAMDB_CHUNK_CACHE_H +#define IOBEAMDB_CHUNK_CACHE_H + +#include +#include + +#define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk" +#define CHUNK_CACHE_INVAL_PROXY_OID \ + get_relname_relid(CHUNK_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID) + +typedef struct hypertable_cache_entry hypertable_cache_entry; +typedef struct epoch_and_partitions_set epoch_and_partitions_set; +typedef struct partition_info partition_info; +typedef struct chunk_row chunk_row; + +typedef struct chunk_cache_entry +{ + int32 id; + chunk_row *chunk; + SPIPlanPtr move_from_copyt_plan; +} chunk_cache_entry; + +extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, + partition_info *part, int64 time_pt, bool lock); + +extern void invalidate_chunk_cache_callback(void); + +extern void _chunk_cache_init(void); +extern void _chunk_cache_fini(void); + +#endif /* IOBEAMDB_CHUNK_CACHE_H */ diff --git a/src/compat-endian.h b/src/compat-endian.h index 0923c3f84..d2a5c70b8 100644 --- a/src/compat-endian.h +++ b/src/compat-endian.h @@ -11,7 +11,7 @@ #elif defined(__APPLE__) -// Mac OS X / Darwin +/* Mac OS X / Darwin */ #include #define bswap_32(x) OSSwapInt32(x) #define bswap_64(x) OSSwapInt64(x) @@ -27,7 +27,11 @@ #ifdef WORDS_BIGENDIAN -static inline uint16_t compat_bswap16(uint16_t v) { return (v << 8) | (v >> 8); } +static inline uint16_t +compat_bswap16(uint16_t v) +{ + return (v << 8) | (v >> 8); +} #define bswap_16(v) compat_bswap16(v) #define htobe16(x) ((uint16_t)(x)) @@ -44,7 +48,7 @@ static inline uint16_t compat_bswap16(uint16_t v) { return (v << 8) | (v >> 8); #define le32toh(x) bswap_32(x) #define le64toh(x) bswap_64(x) -#else /* !WORDS_BIGENDIAN */ +#else /* !WORDS_BIGENDIAN */ #define htobe16(x) bswap_16(x) #define htobe32(x) bswap_32(x) @@ -60,9 +64,8 @@ static inline uint16_t compat_bswap16(uint16_t v) { return (v << 8) | (v >> 8); #define le32toh(x) ((uint32_t)(x)) #define le64toh(x) ((uint64_t)(x)) -#endif /* !WORDS_BIGENDIAN */ +#endif /* !WORDS_BIGENDIAN */ -#endif /* _NEED_ENDIAN_COMPAT */ +#endif /* _NEED_ENDIAN_COMPAT */ #endif - diff --git a/src/errors.h b/src/errors.h new file mode 100644 index 000000000..7a154712f --- /dev/null +++ b/src/errors.h @@ -0,0 +1,36 @@ +/* Defines error codes used +-- PREFIX IO +*/ + +/* +-- IO000 - GROUP: query errors +-- IO001 - hypertable does not exist +-- IO002 - column does not exist +*/ +#define ERRCODE_IO_QUERY_ERRORS MAKE_SQLSTATE('I','O','0','0','0') +#define ERRCODE_IO_HYPERTABLE_NOT_EXIST MAKE_SQLSTATE('I','O','0','0','1') +#define ERRCODE_IO_COLUMN_NOT_EXIST MAKE_SQLSTATE('I','O','0','0','2') + +/* +--IO100 - GROUP: DDL errors +--IO101 - operation not supported +--IO102 - bad hypertable definition +--IO110 - hypertable already exists +--I0120 - node already exists +--I0130 - user already exists +*/ +#define ERRCODE_IO_DDL_ERRORS MAKE_SQLSTATE('I','O','1','0','0') +#define ERRCODE_IO_OPERATION_NOT_SUPPORTED MAKE_SQLSTATE('I','O','1','0','1') +#define ERRCODE_IO_BAD_HYPERTABLE_DEFINITION MAKE_SQLSTATE('I','O','1','0','2') +#define ERRCODE_IO_HYPERTABLE_EXISTS MAKE_SQLSTATE('I','O','1','1','0') +#define ERRCODE_IO_NODE_EXISTS MAKE_SQLSTATE('I','O','1','2','0') +#define ERRCODE_IO_USER_EXISTS MAKE_SQLSTATE('I','O','1','3','0') + +/* +--IO500 - GROUP: internal error +--IO501 - unexpected state/event +--IO502 - communication/remote error +*/ +#define ERRCODE_IO_INTERNAL_ERROR MAKE_SQLSTATE('I','O','5','0','0') +#define ERRCODE_IO_UNEXPECTED MAKE_SQLSTATE('I','O','5','0','1') +#define ERRCODE_IO_COMMUNICATION_ERROR MAKE_SQLSTATE('I','O','5','0','2') diff --git a/src/hypertable_cache.c b/src/hypertable_cache.c new file mode 100644 index 000000000..56cdf61bf --- /dev/null +++ b/src/hypertable_cache.c @@ -0,0 +1,217 @@ + +#include +#include + +#include "hypertable_cache.h" +#include "cache.h" +#include "metadata_queries.h" +#include "utils.h" + +static void hypertable_cache_pre_invalidate(Cache *cache); +static void *hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); + +typedef struct HypertableCacheQueryCtx +{ + CacheQueryCtx cctx; + int32 hypertable_id; +} HypertableCacheQueryCtx; + +static void * +hypertable_cache_get_key(CacheQueryCtx *ctx) +{ + return &((HypertableCacheQueryCtx *) ctx)->hypertable_id; +} + +static Cache hypertable_cache = { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(hypertable_cache_entry), + .hcxt = NULL, + }, + .htab = NULL, + .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = hypertable_cache_get_key, + .create_entry = hypertable_cache_create_entry, + .pre_invalidate_hook = hypertable_cache_pre_invalidate, + .post_invalidate_hook = cache_init, +}; + +static void +hypertable_cache_pre_invalidate(Cache *cache) +{ + hypertable_cache_entry *entry; + HASH_SEQ_STATUS scan; + + hash_seq_init(&scan, cache->htab); + + while ((entry = hash_seq_search(&scan))) + { + SPI_freeplan(entry->info->get_one_tuple_copyt_plan); + } +} + +static void * +hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) +{ + HypertableCacheQueryCtx *hctx = (HypertableCacheQueryCtx *) ctx; + hypertable_cache_entry *he = ctx->entry; + + he->info = fetch_hypertable_info(NULL, hctx->hypertable_id); + he->num_epochs = 0; + + return he; +} + +void +invalidate_hypertable_cache_callback(void) +{ + CACHE1_elog(WARNING, "DESTROY hypertable_cache"); + cache_invalidate(&hypertable_cache); +} + + +/* Get hypertable cache entry. If the entry is not in the cache, add it. */ +hypertable_cache_entry * +get_hypertable_cache_entry(int32 hypertable_id) +{ + HypertableCacheQueryCtx ctx = { + .hypertable_id = hypertable_id, + }; + + return cache_fetch(&hypertable_cache, &ctx.cctx); +} + +/* function to compare epochs */ +static int +cmp_epochs(const void *time_pt_pointer, const void *test) +{ + /* note reverse order; assume oldest stuff last */ + int64 *time_pt = (int64 *) time_pt_pointer; + epoch_and_partitions_set **entry = (epoch_and_partitions_set **) test; + + if ((*entry)->start_time <= *time_pt && (*entry)->end_time >= *time_pt) + { + return 0; + } + + if (*time_pt < (*entry)->start_time) + { + return 1; + } + return -1; +} + +epoch_and_partitions_set * +get_partition_epoch_cache_entry(hypertable_cache_entry *hce, int64 time_pt, Oid relid) +{ + MemoryContext old; + epoch_and_partitions_set *entry, + **cache_entry; + int j; + + /* fastpath: check latest entry */ + if (hce->num_epochs > 0) + { + entry = hce->epochs[0]; + + if (entry->start_time <= time_pt && entry->end_time >= time_pt) + { + return entry; + } + } + + cache_entry = bsearch(&time_pt, hce->epochs, hce->num_epochs, + sizeof(epoch_and_partitions_set *), cmp_epochs); + + if (cache_entry != NULL) + { + return (*cache_entry); + } + + old = cache_switch_to_memory_context(&hypertable_cache); + entry = fetch_epoch_and_partitions_set(NULL, hce->info->id, time_pt, relid); + + /* check if full */ + if (hce->num_epochs == MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY) + { + /* remove last */ + free_epoch(hce->epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY - 1]); + hce->epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY - 1] = NULL; + hce->num_epochs--; + } + + /* ordered insert */ + for (j = hce->num_epochs - 1; j >= 0 && cmp_epochs(&time_pt, hce->epochs + j) < 0; j--) + { + hce->epochs[j + 1] = hce->epochs[j]; + } + hce->epochs[j + 1] = entry; + hce->num_epochs++; + + MemoryContextSwitchTo(old); + + return entry; +} + +/* function to compare partitions */ +static int +cmp_partitions(const void *keyspace_pt_arg, const void *test) +{ + /* note in keyspace asc; assume oldest stuff last */ + int64 keyspace_pt = *((int16 *) keyspace_pt_arg); + partition_info *part = *((partition_info **) test); + + if (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt) + { + return 0; + } + + if (keyspace_pt > part->keyspace_end) + { + return 1; + } + return -1; +} + + +partition_info * +get_partition_info(epoch_and_partitions_set *epoch, int16 keyspace_pt) +{ + partition_info **part; + + if (keyspace_pt < 0) + { + if (epoch->num_partitions > 1) + { + elog(ERROR, "Found many partitions(%d) for an unpartitioned epoch", + epoch->num_partitions); + } + return epoch->partitions[0]; + } + + part = bsearch(&keyspace_pt, epoch->partitions, epoch->num_partitions, + sizeof(partition_info *), cmp_partitions); + + if (part == NULL) + { + elog(ERROR, "could not find partition for epoch"); + } + + return *part; +} + +void +_hypertable_cache_init(void) +{ + CreateCacheMemoryContext(); + cache_init(&hypertable_cache); +} + +void +_hypertable_cache_fini(void) +{ + hypertable_cache.post_invalidate_hook = NULL; + cache_invalidate(&hypertable_cache); +} diff --git a/src/hypertable_cache.h b/src/hypertable_cache.h new file mode 100644 index 000000000..73bbabc1e --- /dev/null +++ b/src/hypertable_cache.h @@ -0,0 +1,37 @@ +#ifndef IOBEAMDB_HYPERTABLE_CACHE_H +#define IOBEAMDB_HYPERTABLE_CACHE_H + +#include + +typedef struct hypertable_basic_info hypertable_basic_info; +typedef struct epoch_and_partitions_set epoch_and_partitions_set; +typedef struct partition_info partition_info; + +#define HYPERTABLE_CACHE_INVAL_PROXY_TABLE "cache_inval_hypertable" +#define HYPERTABLE_CACHE_INVAL_PROXY_OID \ + get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID) + +#define MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY 20 + +typedef struct hypertable_cache_entry +{ + int32 id; + hypertable_basic_info *info; + int num_epochs; + /* Array of epoch_and_partitions_set*. Order by start_time */ + epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY]; +} hypertable_cache_entry; + +extern hypertable_cache_entry *get_hypertable_cache_entry(int32 hypertable_id); + +extern epoch_and_partitions_set *get_partition_epoch_cache_entry(hypertable_cache_entry *hce, + int64 time_pt, Oid relid); + +extern partition_info *get_partition_info(epoch_and_partitions_set *epoch, int16 keyspace_pt); + +extern void invalidate_hypertable_cache_callback(void); + +extern void _hypertable_cache_init(void); +extern void _hypertable_cache_fini(void); + +#endif /* IOBEAMDB_HYPERTABLE_CACHE_H */ diff --git a/src/insert.c b/src/insert.c new file mode 100644 index 000000000..b4a5d3510 --- /dev/null +++ b/src/insert.c @@ -0,0 +1,325 @@ +#include "postgres.h" +#include "funcapi.h" +#include "access/htup_details.h" +#include "catalog/namespace.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" +#include "catalog/pg_trigger.h" +#include "catalog/pg_class.h" +#include "optimizer/planner.h" +#include "optimizer/clauses.h" +#include "nodes/nodes.h" +#include "nodes/print.h" +#include "nodes/nodeFuncs.h" +#include "nodes/makefuncs.h" +#include "parser/parsetree.h" +#include "utils/lsyscache.h" +#include "utils/builtins.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/int8.h" +#include "executor/spi.h" +#include "commands/extension.h" +#include "commands/tablecmds.h" +#include "commands/trigger.h" +#include "tcop/tcopprot.h" +#include "tcop/utility.h" +#include "deps/dblink.h" + +#include "access/xact.h" +#include "parser/parse_oper.h" +#include "parser/parse_func.h" + +#include "fmgr.h" + +#include "insert.h" +#include "cache.h" +#include "hypertable_cache.h" +#include "chunk_cache.h" +#include "errors.h" +#include "utils.h" +#include "metadata_queries.h" + +#define INSERT_TRIGGER_COPY_TABLE_FN "insert_trigger_on_copy_table_c" +#define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger" + +/* private funcs */ +static int tuple_fnumber(TupleDesc tupdesc, const char *fname); +static HeapTuple get_one_tuple_from_copy_table(hypertable_cache_entry *hci); + +/* + * Inserts rows from the temporary copy table into correct hypertable child tables. + * hypertable_id - ID of the hypertable the data belongs to + */ + +static FmgrInfo * +get_close_if_needed_fn() +{ + static FmgrInfo *single = NULL; + + if (single == NULL) + { + MemoryContext old; + + old = MemoryContextSwitchTo(TopMemoryContext); + single = create_fmgr("_iobeamdb_internal", "close_chunk_if_needed", 1); + MemoryContextSwitchTo(old); + } + return single; +} + +PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c); + +Datum +insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) +{ + TriggerData *trigdata = (TriggerData *) fcinfo->context; + + /* arg 0 = hypertable id */ + char *hypertable_id_arg = trigdata->tg_trigger->tgargs[0]; + + HeapTuple firstrow; + hypertable_cache_entry *hci; + int time_fnum, + i, + num_chunks; + bool isnull; + + List *chunk_id_list = NIL; + ListCell *chunk_id_cell; + int *chunk_id_array; + + /* + * --This guard protects against calling insert_data() twice in the same + * transaction, --which might otherwise cause a deadlock in case the + * second insert_data() involves a chunk --that was inserted into in the + * first call to insert_data(). --This is a temporary safe guard that + * should ideally be removed once chunk management --has been refactored + * and improved to avoid such deadlocks. --NOTE: In its current form, this + * safe guard unfortunately prohibits transactions --involving INSERTs on + * two different hypertables. + */ + char *insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); + + if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) + { + ereport(ERROR, + (errcode(ERRCODE_IO_OPERATION_NOT_SUPPORTED), + errmsg("insert_data() can only be called once per transaction"))); + } + /* set the guard locally (for this transaction) */ + set_config_option("io.insert_data_guard", "on", PGC_USERSET, PGC_S_SESSION, + GUC_ACTION_LOCAL, true, 0, false); + + /* + * get the hypertable cache; use the time column name to figure out the + * column fnum for time field + */ + hci = get_hypertable_cache_entry(atoi(hypertable_id_arg)); + time_fnum = tuple_fnumber(trigdata->tg_relation->rd_att, NameStr(hci->info->time_column_name)); + + /* get one row in a loop until the copy table is empty. */ + while ((firstrow = get_one_tuple_from_copy_table(hci))) + { + Datum time_datum = heap_getattr(firstrow, time_fnum, trigdata->tg_relation->rd_att, &isnull); + int64 time_internal; + epoch_and_partitions_set *pe_entry; + partition_info *part = NULL; + chunk_cache_entry *chunk; + int ret; + + if (isnull) + { + elog(ERROR, "Time column is null"); + } + + time_internal = time_value_to_internal(time_datum, hci->info->time_column_type); + pe_entry = get_partition_epoch_cache_entry(hci, time_internal, trigdata->tg_relation->rd_id); + + if (pe_entry->partitioning_func != NULL) + { + /* + * get the keyspace point by running the partitioning func on the + * row's partitioning value; + */ + Datum part_value_datum = heap_getattr(firstrow, pe_entry->partitioning_column_attrnumber, + trigdata->tg_relation->rd_att, &isnull); + Datum part_value_text_datum = FunctionCall1(pe_entry->partitioning_column_text_func_fmgr, part_value_datum); + char *partition_val = DatumGetCString(part_value_text_datum); + + Datum keyspace_datum = FunctionCall2(pe_entry->partition_func_fmgr, CStringGetTextDatum(partition_val), Int32GetDatum(pe_entry->partitioning_mod)); + + int16 keyspace_pt = DatumGetInt16(keyspace_datum); + + /* get the partition using the keyspace value of the row. */ + part = get_partition_info(pe_entry, keyspace_pt); + } + else + { + /* Not Partitioning: get the first and only partition */ + part = get_partition_info(pe_entry, -1); + } + + chunk = get_chunk_cache_entry(hci, pe_entry, part, time_internal, true); + if (chunk->chunk->end_time == OPEN_END_TIME) + { + chunk_id_list = lappend_int(chunk_id_list, chunk->id); + } + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + ret = SPI_execute_plan(chunk->move_from_copyt_plan, NULL, NULL, false, 1); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + SPI_finish(); + + } + + /* build chunk id array */ + num_chunks = list_length(chunk_id_list); + chunk_id_array = palloc(sizeof(int) * num_chunks); + i = 0; + foreach(chunk_id_cell, chunk_id_list) + { + int chunk_id = lfirst_int(chunk_id_cell); + + chunk_id_array[i++] = chunk_id; + } + /* sort by chunk_id to avoid deadlocks */ + qsort(chunk_id_array, num_chunks, sizeof(int), int_cmp); + + /* close chunks */ + for (i = 0; i < num_chunks; i++) + { + /* TODO: running this on every insert is really expensive */ + /* Keep some kind of cache of result an run this heuristically. */ + int32 chunk_id = chunk_id_array[i]; + + FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk_id)); + } + return PointerGetDatum(NULL); +} + + + + +/* Creates a temp table for INSERT and COPY commands. This table + * stores the data until it is distributed to the appropriate chunks. + * The copy table uses ON COMMIT DELETE ROWS and inherits from the root table. + * */ +Oid +create_copy_table(int32 hypertable_id, Oid root_oid) +{ + /* + * Inserting into a hypertable transformed into inserting into a "copy" + * temporary table that has a trigger which calls insert_data afterwords + */ + Oid copy_table_relid; + ObjectAddress copyTableRelationAddr; + StringInfo temp_table_name = makeStringInfo(); + StringInfo hypertable_id_arg = makeStringInfo(); + RangeVar *parent, + *rel; + CreateStmt *create; + CreateTrigStmt *createTrig; + + appendStringInfo(temp_table_name, "_copy_temp_%d", hypertable_id); + appendStringInfo(hypertable_id_arg, "%d", hypertable_id); + + parent = makeRangeVarFromRelid(root_oid); + + rel = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1); + rel->relpersistence = RELPERSISTENCE_TEMP; + + RangeVarGetAndCheckCreationNamespace(rel, NoLock, ©_table_relid); + + if (OidIsValid(copy_table_relid)) + { + return copy_table_relid; + } + + create = makeNode(CreateStmt); + + /* + * Create the target relation by faking up a CREATE TABLE parsetree and + * passing it to DefineRelation. + */ + create->relation = rel; + create->tableElts = NIL; + create->inhRelations = list_make1(parent); + create->ofTypename = NULL; + create->constraints = NIL; + create->options = NIL; + create->oncommit = ONCOMMIT_DELETE_ROWS; + create->tablespacename = NULL; + create->if_not_exists = false; + + copyTableRelationAddr = DefineRelation(create, RELKIND_RELATION, InvalidOid, NULL); + + createTrig = makeNode(CreateTrigStmt); + createTrig->trigname = INSERT_TRIGGER_COPY_TABLE_NAME; + createTrig->relation = rel; + createTrig->funcname = list_make2(makeString(IOBEAMDB_INTERNAL_SCHEMA), makeString(INSERT_TRIGGER_COPY_TABLE_FN)); + createTrig->args = list_make1(makeString(hypertable_id_arg->data)); + createTrig->row = false; + createTrig->timing = TRIGGER_TYPE_AFTER; + createTrig->events = TRIGGER_TYPE_INSERT; + createTrig->columns = NIL; + createTrig->whenClause = NULL; + createTrig->isconstraint = FALSE; + createTrig->deferrable = FALSE; + createTrig->initdeferred = FALSE; + createTrig->constrrel = NULL; + + CreateTrigger(createTrig, NULL, copyTableRelationAddr.objectId, 0, 0, 0, false); + + /* make trigger visible */ + CommandCounterIncrement(); + + return copyTableRelationAddr.objectId; +} + + +static int +tuple_fnumber(TupleDesc tupdesc, const char *fname) +{ + int res; + + for (res = 0; res < tupdesc->natts; res++) + { + if (namestrcmp(&tupdesc->attrs[res]->attname, fname) == 0) + return res + 1; + } + + elog(ERROR, "field not found: %s", fname); +} + + +static HeapTuple +get_one_tuple_from_copy_table(hypertable_cache_entry *hci) +{ + HeapTuple res; + int ret; + + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + ret = SPI_execute_plan(hci->info->get_one_tuple_copyt_plan, NULL, NULL, false, 1); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + if (SPI_processed != 1) + { + SPI_finish(); + return NULL; + } + res = SPI_copytuple(SPI_tuptable->vals[0]); + SPI_finish(); + + return res; +} diff --git a/src/insert.h b/src/insert.h new file mode 100644 index 000000000..3113e5bc3 --- /dev/null +++ b/src/insert.h @@ -0,0 +1,10 @@ +#ifndef IOBEAMDB_INSERT_H +#define IOBEAMDB_INSERT_H + +#include "fmgr.h" +/* exported pg functions */ +extern Datum insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS); + +extern Oid create_copy_table(int32 hypertable_id, Oid root_oid); + +#endif /* IOBEAMDB_INSERT_H */ diff --git a/src/iobeamdb.c b/src/iobeamdb.c index 9b10a7684..b862839ba 100644 --- a/src/iobeamdb.c +++ b/src/iobeamdb.c @@ -2,6 +2,8 @@ #include #include "postgres.h" +#include "funcapi.h" +#include "access/htup_details.h" #include "catalog/namespace.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" @@ -17,6 +19,8 @@ #include "utils/lsyscache.h" #include "utils/builtins.h" #include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/int8.h" #include "executor/spi.h" #include "commands/extension.h" #include "commands/tablecmds.h" @@ -33,10 +37,27 @@ #include "iobeamdb.h" +#include "insert.h" +#include "cache.h" +#include "errors.h" +#include "utils.h" #ifdef PG_MODULE_MAGIC PG_MODULE_MAGIC; #endif +#define HYPERTABLE_INFO_QUERY "\ + SELECT format('%I.%I', hr.schema_name, hr.table_name)::regclass::oid, \ + pe.partitioning_column, pe.partitioning_func_schema, pe.partitioning_func, pe.partitioning_mod, \ + format('%I.%I', h.root_schema_name, h.root_table_name)::regclass::oid, \ + h.id \ + FROM _iobeamdb_catalog.hypertable h \ + INNER JOIN _iobeamdb_catalog.default_replica_node drn ON (drn.hypertable_id = h.id AND drn.database_name = current_database()) \ + INNER JOIN _iobeamdb_catalog.hypertable_replica hr ON (hr.replica_id = drn.replica_id AND hr.hypertable_id = drn.hypertable_id) \ + INNER JOIN _iobeamdb_catalog.partition_epoch pe ON (pe.hypertable_id = h.id) \ + WHERE h.schema_name = $1 AND h.table_name = $2" + +void _PG_init(void); +void _PG_fini(void); /* Postgres hook interface */ static planner_hook_type prev_planner_hook = NULL; @@ -90,8 +111,6 @@ static partitioning_info * get_partitioning_info_for_partition_column_var(Var *var_expr, Query *parse, List * hypertable_info_list); static Expr * create_partition_func_equals_const(Var *var_expr, Const *const_expr, Name partitioning_func_schema, Name partitioning_func, int32 partitioning_mod); -Oid create_copy_table(hypertable_info *hinfo); -RangeVar* makeRangeVarFromRelid(Oid relid); SPIPlanPtr get_hypertable_info_plan(void); @@ -108,12 +127,24 @@ void prev_ProcessUtility(Node *parsetree, DestReceiver *dest, char *completionTag); +extern void _hypertable_cache_init(void); +extern void _hypertable_cache_fini(void); +extern void _chunk_cache_init(void); +extern void _chunk_cache_fini(void); + +extern void _cache_invalidate_init(void); +extern void _cache_invalidate_fini(void); +extern void _cache_invalidate_extload(void); void _PG_init(void) { elog(INFO, "iobeamdb loaded"); + _hypertable_cache_init(); + _chunk_cache_init(); + _cache_invalidate_init(); + prev_planner_hook = planner_hook; planner_hook = iobeamdb_planner; @@ -121,6 +152,17 @@ _PG_init(void) ProcessUtility_hook = iobeamdb_ProcessUtility; RegisterXactCallback(io_xact_callback, NULL); + +} + +void +_PG_fini(void) +{ + planner_hook = prev_planner_hook; + ProcessUtility_hook = prev_ProcessUtility_hook; + _cache_invalidate_fini(); + _hypertable_cache_fini(); + _chunk_cache_fini(); } SPIPlanPtr get_hypertable_info_plan() @@ -149,29 +191,70 @@ SPIPlanPtr get_hypertable_info_plan() return hypertable_info_plan; } - -void -_PG_fini(void) -{ - planner_hook = prev_planner_hook; - ProcessUtility_hook = prev_ProcessUtility_hook; -} - bool IobeamLoaded(void) { if (!isLoaded) { + if(!IsTransactionState()) + { + return false; + } Oid id = get_extension_oid("iobeamdb", true); - if (id != InvalidOid) + if (id != InvalidOid && !(creating_extension && id == CurrentExtensionObject)) { isLoaded = true; + _cache_invalidate_extload(); } } return isLoaded; } + +/* + * Change all main tables to one of the replicas in the parse tree. + * + */ +static bool +change_table_name_walker(Node *node, void *context) +{ + if (node == NULL) + { + return false; + } + + if (IsA(node, RangeTblEntry)) + { + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; + change_table_name_context* ctx = (change_table_name_context *)context; + if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->inh) + { + hypertable_info* hinfo = get_hypertable_info(rangeTableEntry->relid); + if (hinfo != NULL) + { + ctx->hypertable_info = lappend(ctx->hypertable_info, hinfo); + rangeTableEntry->relid = hinfo->replica_oid; + } + } else if (rangeTableEntry->rtekind == RTE_RELATION && ctx->parse->commandType == CMD_INSERT){ + hypertable_info* hinfo = get_hypertable_info(rangeTableEntry->relid); + if (hinfo != NULL) + { + rangeTableEntry->relid = create_copy_table(hinfo->hypertable_id, hinfo->root_oid); + } + } + return false; + } + + if (IsA(node, Query)) + { + return query_tree_walker((Query *) node, change_table_name_walker, + context, QTW_EXAMINE_RTES); + } + + return expression_tree_walker(node, change_table_name_walker, context); +} + PlannedStmt * iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) { @@ -212,49 +295,6 @@ iobeamdb_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) return rv; } -/* - * Change all main tables to one of the replicas in the parse tree. - * - */ -bool -change_table_name_walker(Node *node, void *context) -{ - if (node == NULL) - { - return false; - } - - if (IsA(node, RangeTblEntry)) - { - RangeTblEntry *rangeTableEntry = (RangeTblEntry *) node; - change_table_name_context* ctx = (change_table_name_context *)context; - if (rangeTableEntry->rtekind == RTE_RELATION && rangeTableEntry->inh) - { - hypertable_info* hinfo = get_hypertable_info(rangeTableEntry->relid); - if (hinfo != NULL) - { - ctx->hypertable_info = lappend(ctx->hypertable_info, hinfo); - rangeTableEntry->relid = hinfo->replica_oid; - } - } else if (rangeTableEntry->rtekind == RTE_RELATION && ctx->parse->commandType == CMD_INSERT){ - hypertable_info* hinfo = get_hypertable_info(rangeTableEntry->relid); - if (hinfo != NULL) - { - rangeTableEntry->relid = create_copy_table(hinfo); - } - } - return false; - } - - if (IsA(node, Query)) - { - return query_tree_walker((Query *) node, change_table_name_walker, - context, QTW_EXAMINE_RTES); - } - - return expression_tree_walker(node, change_table_name_walker, context); -} - /* * * Use the default_replica_node to look up the hypertable_info for a replica table from the oid of the main table. @@ -366,88 +406,15 @@ get_hypertable_info(Oid mainRelationOid) return NULL; } -/* Make a RangeVar from a regclass Oid */ -RangeVar * -makeRangeVarFromRelid(Oid relid) { - Oid namespace = get_rel_namespace(relid); - char *tableName = get_rel_name(relid); - char *schemaName = get_namespace_name(namespace); - return makeRangeVar(schemaName, tableName,-1); -} -/* Creates a temp table for INSERT and COPY commands. This table - * stores the data until it is distributed to the appropriate chunks. - * The copy table uses ON COMMIT DELETE ROWS and inherits from the root table. - * */ -Oid create_copy_table(hypertable_info *hinfo) { - /* Inserting into a hypertable transformed into inserting - * into a "copy" temporary table that has a trigger - * which calls insert_data afterwords - * */ - Oid copy_table_relid; - ObjectAddress copyTableRelationAddr; +char * copy_table_name(int32 hypertable_id) { StringInfo temp_table_name = makeStringInfo(); - StringInfo hypertable_id_arg = makeStringInfo(); - RangeVar *parent, *rel; - CreateStmt *create; - CreateTrigStmt *createTrig; - - appendStringInfo(temp_table_name, "_copy_temp_%d", hinfo->hypertable_id); - appendStringInfo(hypertable_id_arg, "%d", hinfo->hypertable_id); - - parent = makeRangeVarFromRelid(hinfo->root_oid); - - rel = makeRangeVar("pg_temp", temp_table_name->data,-1); - rel->relpersistence = RELPERSISTENCE_TEMP; - - RangeVarGetAndCheckCreationNamespace(rel, NoLock, ©_table_relid); - - if (OidIsValid(copy_table_relid)) { - return copy_table_relid; - } - - create = makeNode(CreateStmt); - - /* - * Create the target relation by faking up a CREATE TABLE parsetree and - * passing it to DefineRelation. - */ - create->relation = rel; - create->tableElts = NIL; - create->inhRelations = list_make1(parent); - create->ofTypename = NULL; - create->constraints = NIL; - create->options = NIL; - create->oncommit = ONCOMMIT_DELETE_ROWS; - create->tablespacename = NULL; - create->if_not_exists = false; - - copyTableRelationAddr = DefineRelation(create, RELKIND_RELATION, InvalidOid, NULL); - - createTrig = makeNode(CreateTrigStmt); - createTrig->trigname = "insert_trigger"; - createTrig->relation = rel; - createTrig->funcname = list_make2(makeString("_iobeamdb_internal"), makeString("insert_trigger_on_copy_table")); - createTrig->args = list_make1(makeString(hypertable_id_arg->data)); - createTrig->row = false; - createTrig->timing = TRIGGER_TYPE_AFTER; - createTrig->events = TRIGGER_TYPE_INSERT; - createTrig->columns = NIL; - createTrig->whenClause = NULL; - createTrig->isconstraint = FALSE; - createTrig->deferrable = FALSE; - createTrig->initdeferred = FALSE; - createTrig->constrrel = NULL; - - CreateTrigger(createTrig, NULL, copyTableRelationAddr.objectId, 0,0,0, false); - - /* make trigger visible*/ - CommandCounterIncrement(); - - return copyTableRelationAddr.objectId; + appendStringInfo(temp_table_name, "_copy_temp_%d", hypertable_id); + return temp_table_name->data; } + /* * This function does a transformation that allows postgres's native constraint exclusion to exclude space partititions when * the query contains equivalence qualifiers on the space partition key. @@ -753,7 +720,6 @@ void iobeamdb_ProcessUtility(Node *parsetree, DestReceiver *dest, char *completionTag) { - if (!IobeamLoaded()){ prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); return; @@ -767,7 +733,7 @@ void iobeamdb_ProcessUtility(Node *parsetree, hypertable_info* hinfo = get_hypertable_info(relId); if (hinfo != NULL) { - copystmt->relation = makeRangeVarFromRelid(create_copy_table(hinfo)); + copystmt->relation = makeRangeVarFromRelid(create_copy_table(hinfo->hypertable_id, hinfo->root_oid)); } } prev_ProcessUtility((Node *)copystmt, queryString, context, params, dest, completionTag); @@ -792,3 +758,5 @@ void iobeamdb_ProcessUtility(Node *parsetree, prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); } + + diff --git a/src/iobeamdb.h b/src/iobeamdb.h index 2ea1ed8d0..d67192c54 100644 --- a/src/iobeamdb.h +++ b/src/iobeamdb.h @@ -2,26 +2,16 @@ #ifndef IOBEAMDB_H #define IOBEAMDB_H -#define HYPERTABLE_INFO_QUERY "\ - SELECT format('%I.%I', hr.schema_name, hr.table_name)::regclass::oid, \ - pe.partitioning_column, pe.partitioning_func_schema, pe.partitioning_func, pe.partitioning_mod, \ - format('%I.%I', h.root_schema_name, h.root_table_name)::regclass::oid, \ - h.id \ - FROM _iobeamdb_catalog.hypertable h \ - INNER JOIN _iobeamdb_catalog.default_replica_node drn ON (drn.hypertable_id = h.id AND drn.database_name = current_database()) \ - INNER JOIN _iobeamdb_catalog.hypertable_replica hr ON (hr.replica_id = drn.replica_id AND hr.hypertable_id = drn.hypertable_id) \ - INNER JOIN _iobeamdb_catalog.partition_epoch pe ON (pe.hypertable_id = h.id) \ - WHERE h.schema_name = $1 AND h.table_name = $2" +#include -#include "postgres.h" -#include "optimizer/planner.h" -#include "nodes/nodes.h" +#define IOBEAMDB_CATALOG_SCHEMA "_iobeamdb_catalog" +#define IOBEAMDB_INTERNAL_SCHEMA "_iobeamdb_internal" +#define IOBEAMDB_HYPERTABLE_TABLE "hypertable" -void _PG_init(void); -void _PG_fini(void); +typedef struct Node Node; bool IobeamLoaded(void); -bool change_table_name_walker(Node *node, void *context); +char *copy_table_name(int32 hypertable_id); #endif /* IOBEAMDB_H */ diff --git a/src/metadata_queries.c b/src/metadata_queries.c new file mode 100644 index 000000000..abfb45eaa --- /dev/null +++ b/src/metadata_queries.c @@ -0,0 +1,487 @@ +#include "metadata_queries.h" +#include "utils.h" +#include "catalog/pg_type.h" +#include "commands/trigger.h" +#include "utils/inval.h" +#include "utils/memutils.h" +#include "utils/catcache.h" +#include "utils/builtins.h" +#include "executor/spi.h" +#include "access/xact.h" + +#define DEFINE_PLAN(fnname, query, num, args) \ + static SPIPlanPtr fnname() {\ + static SPIPlanPtr plan = NULL; \ + if(plan != NULL) \ + return plan; \ + plan = prepare_plan(query, num, args); \ + return plan; \ + } + +/* Utility function to prepare an SPI plan */ +SPIPlanPtr +prepare_plan(const char *src, int nargs, Oid *argtypes) +{ + SPIPlanPtr plan; + + if (SPI_connect() < 0) + { + elog(ERROR, "Could not connect for prepare"); + } + plan = SPI_prepare(src, nargs, argtypes); + if (NULL == plan) + { + elog(ERROR, "Could not prepare plan"); + } + if (SPI_keepplan(plan) != 0) + { + elog(ERROR, "Could not keep plan"); + } + if (SPI_finish() < 0) + { + elog(ERROR, "Could not finish for prepare"); + } + + return plan; +} + +/* + * + * Functions for fetching info from the db. + * + */ + +#define HYPERTABLE_QUERY_ARGS (Oid[]) { INT4OID } +#define HYPERTABLE_QUERY "SELECT id, time_column_name, time_column_type FROM _iobeamdb_catalog.hypertable h WHERE h.id = $1" +DEFINE_PLAN(get_hypertable_plan, HYPERTABLE_QUERY, 1, HYPERTABLE_QUERY_ARGS) + +hypertable_basic_info * +fetch_hypertable_info(hypertable_basic_info *entry, int32 hypertable_id) +{ + SPIPlanPtr plan = get_hypertable_plan(); + Datum args[1] = {Int32GetDatum(hypertable_id)}; + int ret; + bool is_null; + TupleDesc tupdesc; + HeapTuple tuple; + Name time_column_name; + int sql_len = NAMEDATALEN * 2 + 100; + char get_one_tuple_copyt_sql[sql_len]; + + if (entry == NULL) + { + entry = palloc(sizeof(hypertable_basic_info)); + } + CACHE2_elog(WARNING, "Looking up hypertable info: %d", hypertable_id); + + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + ret = SPI_execute_plan(plan, args, NULL, true, 2); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + if (SPI_processed != 1) + { + elog(ERROR, "Got not 1 row but %lu", SPI_processed); + } + + tupdesc = SPI_tuptable->tupdesc; + tuple = SPI_tuptable->vals[0]; + + entry->id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, &is_null)); + + time_column_name = DatumGetName(SPI_getbinval(tuple, tupdesc, 2, &is_null)); + memcpy(entry->time_column_name.data, time_column_name, NAMEDATALEN); + + entry->time_column_type = DatumGetObjectId(SPI_getbinval(tuple, tupdesc, 3, &is_null)); + + SPI_finish(); + snprintf(get_one_tuple_copyt_sql, sql_len, "SELECT * FROM %s LIMIT 1", copy_table_name(entry->id)); + entry->get_one_tuple_copyt_plan = prepare_plan(get_one_tuple_copyt_sql, 0, NULL); + return entry; +} + + +#define EPOCH_AND_PARTITION_ARGS (Oid[]) { INT4OID, INT8OID } +#define EPOCH_AND_PARTITION_QUERY "SELECT pe.id as epoch_id, hypertable_id, start_time, end_time, \ + partitioning_func_schema, partitioning_func, partitioning_mod, partitioning_column, \ + p.id as partition_id, keyspace_start, keyspace_end \ + FROM _iobeamdb_catalog.partition_epoch pe\ + INNER JOIN _iobeamdb_catalog.partition p ON (p.epoch_id = pe.id) \ + WHERE pe.hypertable_id = $1 AND \ + (pe.start_time <= $2 OR pe.start_time IS NULL) AND \ + (pe.end_time >= $2 OR pe.end_time IS NULL) \ + ORDER BY p.keyspace_start ASC" + +DEFINE_PLAN(get_epoch_and_partition_plan, EPOCH_AND_PARTITION_QUERY, 2, EPOCH_AND_PARTITION_ARGS) + +epoch_and_partitions_set * +fetch_epoch_and_partitions_set(epoch_and_partitions_set *entry, int32 hypertable_id, int64 time_pt, Oid relid) +{ + MemoryContext orig_ctx = CurrentMemoryContext; + SPIPlanPtr plan = get_epoch_and_partition_plan(); + Datum args[2] = {Int32GetDatum(hypertable_id), Int64GetDatum(time_pt)}; + int ret, + j; + int64 time_ret; + int32 mod_ret; + Name name_ret; + bool is_null; + TupleDesc tupdesc; + HeapTuple tuple; + + if (entry == NULL) + { + entry = palloc(sizeof(epoch_and_partitions_set)); + } + CACHE3_elog(WARNING, "Looking up cache partition_epoch %d %lu", hypertable_id, time_pt); + + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + ret = SPI_execute_plan(plan, args, NULL, true, 0); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + if (SPI_processed < 1) + { + elog(ERROR, "Could not find partition epoch"); + } + + tupdesc = SPI_tuptable->tupdesc; + tuple = SPI_tuptable->vals[0]; + + entry->id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, &is_null)); + entry->hypertable_id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, &is_null)); + + time_ret = DatumGetInt64(SPI_getbinval(tuple, tupdesc, 3, &is_null)); + if (is_null) + { + entry->start_time = OPEN_START_TIME; + } + else + { + entry->start_time = time_ret; + } + + time_ret = DatumGetInt64(SPI_getbinval(tuple, tupdesc, 4, &is_null)); + if (is_null) + { + entry->end_time = OPEN_END_TIME; + } + else + { + entry->end_time = time_ret; + } + + name_ret = DatumGetName(SPI_getbinval(tuple, tupdesc, 5, &is_null)); + if (is_null) + { + entry->partitioning_func_schema = NULL; + } + else + { + entry->partitioning_func_schema = SPI_palloc(sizeof(NameData)); + memcpy(entry->partitioning_func_schema, name_ret, sizeof(NameData)); + } + + name_ret = DatumGetName(SPI_getbinval(tuple, tupdesc, 6, &is_null)); + if (is_null) + { + entry->partitioning_func = NULL; + } + else + { + entry->partitioning_func = SPI_palloc(sizeof(NameData)); + memcpy(entry->partitioning_func, name_ret, sizeof(NameData)); + } + + mod_ret = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 7, &is_null)); + if (is_null) + { + entry->partitioning_mod = -1; + } + else + { + entry->partitioning_mod = mod_ret; + } + + + name_ret = DatumGetName(SPI_getbinval(tuple, tupdesc, 8, &is_null)); + if (is_null) + { + entry->partitioning_column = NULL; + entry->partitioning_column_attrnumber = InvalidAttrNumber; + entry->partitioning_column_text_func = InvalidOid; + } + else + { + Oid typeoid; + Oid textfn; + bool isVarlena; + FmgrInfo *textfn_finfo = SPI_palloc(sizeof(FmgrInfo)); + + entry->partitioning_column = SPI_palloc(sizeof(NameData)); + memcpy(entry->partitioning_column, name_ret, sizeof(NameData)); + entry->partitioning_column_attrnumber = get_attnum(relid, entry->partitioning_column->data); + typeoid = get_atttype(relid, entry->partitioning_column_attrnumber); + getTypeOutputInfo(typeoid, &textfn, &isVarlena); + entry->partitioning_column_text_func = textfn; + fmgr_info_cxt(textfn, textfn_finfo, orig_ctx); + entry->partitioning_column_text_func_fmgr = textfn_finfo; + } + + if (entry->partitioning_func != NULL) + { + FmgrInfo *finfo = SPI_palloc(sizeof(FmgrInfo)); + FuncCandidateList part_func = FuncnameGetCandidates(list_make2(makeString(entry->partitioning_func_schema->data), + makeString(entry->partitioning_func->data)), + 2, NULL, false, false, false); + + if (part_func == NULL) + { + elog(ERROR, "couldn't find the partitioning function"); + } + if (part_func->next != NULL) + { + elog(ERROR, "multiple partitioning functions found"); + } + + fmgr_info_cxt(part_func->oid, finfo, orig_ctx); + entry->partition_func_fmgr = finfo; + } + else + { + + entry->partition_func_fmgr = NULL; + } + + entry->num_partitions = SPI_processed; + entry->partitions = SPI_palloc(sizeof(partition_info *) * entry->num_partitions); + for (j = 0; j < entry->num_partitions; j++) + { + HeapTuple tuple = SPI_tuptable->vals[j]; + + entry->partitions[j] = SPI_palloc(sizeof(partition_info)); + + entry->partitions[j]->id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 9, &is_null)); + entry->partitions[j]->keyspace_start = DatumGetInt16(SPI_getbinval(tuple, tupdesc, 10, &is_null)); + entry->partitions[j]->keyspace_end = DatumGetInt16(SPI_getbinval(tuple, tupdesc, 11, &is_null)); + } + + SPI_finish(); + return entry; +} + +void +free_epoch(epoch_and_partitions_set *epoch) +{ + int j; + + for (j = 0; j < epoch->num_partitions; j++) + { + partition_info *part = epoch->partitions[j]; + + pfree(part); + } + pfree(epoch); +} + +#define CRN_QUERY_ARGS (Oid[]) { INT4OID } +#define CRN_QUERY "SELECT schema_name, table_name \ + FROM _iobeamdb_catalog.chunk_replica_node AS crn \ + WHERE crn.chunk_id = $1" + +DEFINE_PLAN(get_crn_plan, CRN_QUERY, 1, CRN_QUERY_ARGS) + +crn_set * +fetch_crn_set(crn_set *entry, int32 chunk_id) +{ + SPIPlanPtr plan = get_crn_plan(); + Datum args[1] = {Int32GetDatum(chunk_id)}; + int ret, + total_rows, + j; + bool is_null; + TupleDesc tupdesc; + crn_row **tab_array; + + if (entry == NULL) + entry = palloc(sizeof(crn_set)); + CACHE2_elog(WARNING, "Looking up crn %d", chunk_id); + + entry->chunk_id = chunk_id; + + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + ret = SPI_execute_plan(plan, args, NULL, false, 0); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + if (SPI_processed < 1) + { + elog(ERROR, "Could not find crn"); + } + + + tupdesc = SPI_tuptable->tupdesc; + + total_rows = SPI_processed; + tab_array = SPI_palloc(total_rows * sizeof(crn_row *)); + + for (j = 0; j < total_rows; j++) + { + HeapTuple tuple = SPI_tuptable->vals[j]; + crn_row *tab = SPI_palloc(sizeof(crn_row)); + + Name name = DatumGetName(SPI_getbinval(tuple, tupdesc, 1, &is_null)); + + memcpy(tab->schema_name.data, name, NAMEDATALEN); + + name = DatumGetName(SPI_getbinval(tuple, tupdesc, 2, &is_null)); + memcpy(tab->table_name.data, name, NAMEDATALEN); + + tab_array[j] = tab; + } + SPI_finish(); + + entry->tables = NIL; + for (j = 0; j < total_rows; j++) + { + entry->tables = lappend(entry->tables, tab_array[j]); + } + pfree(tab_array); + return entry; +} + + +/* + * Retrieving chunks: + * + * Locked chunk retrieval has to occur on every row. So we have a fast and slowpath. + * The fastpath retrieves and locks the chunk only if it already exists locally. The + * fastpath is faster since it does not call a plpgsql function but calls sql directly. + * This was found to make a performance difference in tests. + * + * The slowpath calls get_or_create_chunk(), and is called only if the fastpath returned no rows. + * + */ +#define CHUNK_QUERY_ARGS (Oid[]) {INT4OID, INT8OID, BOOLOID} +#define CHUNK_QUERY "SELECT id, partition_id, start_time, end_time \ + FROM get_or_create_chunk($1, $2, $3)" + +#define CHUNK_QUERY_LOCKED_FASTPATH_ARGS (Oid[]) {INT4OID, INT8OID} +#define CHUNK_QUERY_LOCKED_FASTPATH "SELECT id, partition_id, start_time, end_time \ + FROM _iobeamdb_catalog.chunk c \ + WHERE c.partition_id = $1 AND \ + (c.start_time <= $2 OR c.start_time IS NULL) AND \ + (c.end_time >= $2 OR c.end_time IS NULL) \ + FOR SHARE;" + + +/* plan for getting a chunk via get_or_create_chunk(). */ +DEFINE_PLAN(get_chunk_plan, CHUNK_QUERY, 3, CHUNK_QUERY_ARGS) +/* + * Plan for getting a locked chunk. This uses a faster query but will only succeed if there is already a local chunk to lock. + * Thus, this may return 0 rows, in which case you should fall back to chunk_plan. + */ +DEFINE_PLAN(get_chunk_plan_locked_fastpath, CHUNK_QUERY_LOCKED_FASTPATH, 2, CHUNK_QUERY_LOCKED_FASTPATH_ARGS) + + +chunk_row * +fetch_chunk_row(chunk_row *entry, int32 partition_id, int64 time_pt, bool lock) +{ + HeapTuple tuple = NULL; + TupleDesc tupdesc = NULL; + SPIPlanPtr plan_locked_fastpath = get_chunk_plan_locked_fastpath(); + SPIPlanPtr plan_slowpath = get_chunk_plan(); + int64 time_ret; + bool is_null; + + + if (entry == NULL) + entry = palloc(sizeof(chunk_row)); + + if (SPI_connect() < 0) + { + elog(ERROR, "Got an SPI connect error"); + } + + if (lock) + { + /* try the fastpath */ + int ret; + + Datum args[2] = {Int32GetDatum(partition_id), Int64GetDatum(time_pt)}; + + ret = SPI_execute_plan(plan_locked_fastpath, args, NULL, false, 2); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + + if (SPI_processed > 1) + { + elog(ERROR, "Got more than 1 row but %lu", SPI_processed); + } + + if (SPI_processed == 1) + { + tupdesc = SPI_tuptable->tupdesc; + tuple = SPI_tuptable->vals[0]; + } + } + + if (tuple == NULL) + { + /* the fastpath was n/a or returned 0 rows. */ + int ret; + Datum args[3] = {Int32GetDatum(partition_id), Int64GetDatum(time_pt), BoolGetDatum(lock)}; + + ret = SPI_execute_plan(plan_slowpath, args, NULL, false, 2); + if (ret <= 0) + { + elog(ERROR, "Got an SPI error %d", ret); + } + if (SPI_processed != 1) + { + elog(ERROR, "Got not 1 row but %lu", SPI_processed); + } + + tupdesc = SPI_tuptable->tupdesc; + tuple = SPI_tuptable->vals[0]; + } + entry->id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 1, &is_null)); + entry->partition_id = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 2, &is_null)); + + time_ret = DatumGetInt64(SPI_getbinval(tuple, tupdesc, 3, &is_null)); + if (is_null) + { + entry->start_time = OPEN_START_TIME; + } + else + { + entry->start_time = time_ret; + } + + time_ret = DatumGetInt64(SPI_getbinval(tuple, tupdesc, 4, &is_null)); + if (is_null) + { + entry->end_time = OPEN_END_TIME; + } + else + { + entry->end_time = time_ret; + } + + SPI_finish(); + return entry; +} diff --git a/src/metadata_queries.h b/src/metadata_queries.h new file mode 100644 index 000000000..b038db3e9 --- /dev/null +++ b/src/metadata_queries.h @@ -0,0 +1,82 @@ +#ifndef IOBEAMDB_METADATA_QUERIES_H +#define IOBEAMDB_METADATA_QUERIES_H + +#include "iobeamdb.h" +#include "utils/lsyscache.h" +#include "catalog/namespace.h" +#include "executor/spi.h" +#include "fmgr.h" + +#define OPEN_START_TIME -1 +#define OPEN_END_TIME PG_INT64_MAX + +typedef struct hypertable_basic_info +{ + int32 id; + NameData time_column_name; + Oid time_column_type; + SPIPlanPtr get_one_tuple_copyt_plan; +} hypertable_basic_info; + +typedef struct partition_info +{ + int32 id; + int16 keyspace_start; + int16 keyspace_end; +} partition_info; + +typedef struct epoch_and_partitions_set +{ + int32 id; + int32 hypertable_id; + int64 start_time; + int64 end_time; + Name partitioning_func_schema; + Name partitioning_func; + int32 partitioning_mod; + Name partitioning_column; + AttrNumber partitioning_column_attrnumber; + Oid partitioning_column_text_func; + FmgrInfo *partitioning_column_text_func_fmgr; + FmgrInfo *partition_func_fmgr; + int num_partitions; + partition_info **partitions; +} epoch_and_partitions_set; + +typedef struct chunk_row +{ + int32 id; + int32 partition_id; + int64 start_time; + int64 end_time; +} chunk_row; + +typedef struct crn_row +{ + NameData schema_name; + NameData table_name; +} crn_row; + +typedef struct crn_set +{ + int32 chunk_id; + List *tables; +} crn_set; + +/* utility func */ +extern SPIPlanPtr prepare_plan(const char *src, int nargs, Oid *argtypes); + + +/* db access func */ +extern epoch_and_partitions_set *fetch_epoch_and_partitions_set(epoch_and_partitions_set *entry, + int32 hypertable_id, int64 time_pt, Oid relid); + +extern void free_epoch(epoch_and_partitions_set *epoch); + +extern hypertable_basic_info *fetch_hypertable_info(hypertable_basic_info *entry, int32 hypertable_id); + +extern chunk_row *fetch_chunk_row(chunk_row *entry, int32 partition_id, int64 time_pt, bool lock); + +extern crn_set *fetch_crn_set(crn_set *entry, int32 chunk_id); + +#endif /* IOBEAMDB_METADATA_QUERIES_H */ diff --git a/src/pgmurmur3.c b/src/pgmurmur3.c index 3f0c39bb3..9922843f8 100644 --- a/src/pgmurmur3.c +++ b/src/pgmurmur3.c @@ -29,3 +29,31 @@ pg_murmur3_hash_string(PG_FUNCTION_ARGS) PG_RETURN_INT32(io[0]); } + +/* _iobeamdb_catalog.get_partition_for_key(key TEXT, mod_factor INT) RETURNS SMALLINT */ +PG_FUNCTION_INFO_V1(get_partition_for_key); + +Datum +get_partition_for_key(PG_FUNCTION_ARGS) +{ +// SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; + struct varlena *data; + int32 mod; + Datum hash_d; + int32 hash_i; + int16 res; + /* request aligned data on weird architectures */ +#ifdef HLIB_UNALIGNED_READ_OK + data = PG_GETARG_VARLENA_PP(0); +#else + data = PG_GETARG_VARLENA_P(0); +#endif + mod = PG_GETARG_INT32(1); + + hash_d = DirectFunctionCall2(pg_murmur3_hash_string, PointerGetDatum(data), Int32GetDatum(1)); + hash_i = DatumGetInt32(hash_d); + + res = (int16) ((hash_i & 0x7fffffff) % mod); + + PG_RETURN_INT16(res); +} diff --git a/src/utils.c b/src/utils.c index a6421487b..0acef8dca 100644 --- a/src/utils.c +++ b/src/utils.c @@ -4,11 +4,13 @@ #include #include #include +#include +#include -Datum pg_timestamp_to_microseconds(PG_FUNCTION_ARGS); -Datum pg_microseconds_to_timestamp(PG_FUNCTION_ARGS); -Datum pg_timestamp_to_unix_microseconds(PG_FUNCTION_ARGS); -Datum pg_unix_microseconds_to_timestamp(PG_FUNCTION_ARGS); +#include "utils.h" +#include "nodes/nodes.h" +#include "nodes/makefuncs.h" +#include "utils/lsyscache.h" PG_FUNCTION_INFO_V1(pg_timestamp_to_microseconds); @@ -19,7 +21,7 @@ Datum pg_timestamp_to_microseconds(PG_FUNCTION_ARGS) { TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(0); - int64 microseconds; + int64 microseconds; if (!IS_VALID_TIMESTAMP(timestamp)) ereport(ERROR, @@ -31,7 +33,8 @@ pg_timestamp_to_microseconds(PG_FUNCTION_ARGS) #else if (1) { - int64 seconds = (int64)timestamp; + int64 seconds = (int64) timestamp; + microseconds = (seconds * USECS_PER_SEC) + ((timestamp - seconds) * USECS_PER_SEC); } #endif @@ -46,7 +49,7 @@ PG_FUNCTION_INFO_V1(pg_microseconds_to_timestamp); Datum pg_microseconds_to_timestamp(PG_FUNCTION_ARGS) { - int64 microseconds = PG_GETARG_INT64(0); + int64 microseconds = PG_GETARG_INT64(0); TimestampTz timestamp; #ifdef HAVE_INT64_TIMESTAMP @@ -72,8 +75,8 @@ Datum pg_timestamp_to_unix_microseconds(PG_FUNCTION_ARGS) { TimestampTz timestamp = PG_GETARG_TIMESTAMPTZ(0); - int64 epoch_diff_microseconds = (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; - int64 microseconds; + int64 epoch_diff_microseconds = (POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY; + int64 microseconds; if (timestamp < MIN_TIMESTAMP) ereport(ERROR, @@ -90,7 +93,8 @@ pg_timestamp_to_unix_microseconds(PG_FUNCTION_ARGS) #else if (1) { - int64 seconds = (int64)timestamp; + int64 seconds = (int64) timestamp; + microseconds = (seconds * USECS_PER_SEC) + ((timestamp - seconds) * USECS_PER_SEC) + epoch_diff_microseconds; } #endif @@ -105,16 +109,16 @@ PG_FUNCTION_INFO_V1(pg_unix_microseconds_to_timestamp); Datum pg_unix_microseconds_to_timestamp(PG_FUNCTION_ARGS) { - int64 microseconds = PG_GETARG_INT64(0); + int64 microseconds = PG_GETARG_INT64(0); TimestampTz timestamp; /* - Test that the UNIX us timestamp is within bounds. - Note that an int64 at UNIX epoch and microsecond precision cannot represent - the upper limit of the supported date range (Julian end date), so INT64_MAX - is the natural upper bound for this function. - */ - if (microseconds < ((int64)USECS_PER_DAY * (DATETIME_MIN_JULIAN - UNIX_EPOCH_JDATE))) + * Test that the UNIX us timestamp is within bounds. Note that an int64 at + * UNIX epoch and microsecond precision cannot represent the upper limit + * of the supported date range (Julian end date), so INT64_MAX is the + * natural upper bound for this function. + */ + if (microseconds < ((int64) USECS_PER_DAY * (DATETIME_MIN_JULIAN - UNIX_EPOCH_JDATE))) ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE), errmsg("timestamp out of range"))); @@ -123,10 +127,111 @@ pg_unix_microseconds_to_timestamp(PG_FUNCTION_ARGS) timestamp = microseconds - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * USECS_PER_DAY); #else /* Shift the epoch using integer arithmetic to reduce precision errors */ - timestamp = microseconds / USECS_PER_SEC; /* seconds */ - microseconds = microseconds - ((int64)timestamp * USECS_PER_SEC); - timestamp = (float8)((int64)seconds - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) - + (float8)microseconds / USECS_PER_SEC; + timestamp = microseconds / USECS_PER_SEC; /* seconds */ + microseconds = microseconds - ((int64) timestamp * USECS_PER_SEC); + timestamp = (float8) ((int64) seconds - ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY)) + + (float8) microseconds / USECS_PER_SEC; #endif PG_RETURN_TIMESTAMPTZ(timestamp); } + + +/* */ +int64 +time_value_to_internal(Datum time_val, Oid type) +{ + if (type == INT8OID) + { + return DatumGetInt64(time_val); + } + if (type == INT4OID) + { + return (int64) DatumGetInt32(time_val); + } + if (type == INT2OID) + { + return (int64) DatumGetInt16(time_val); + } + if (type == TIMESTAMPOID) + { + Datum tz = DirectFunctionCall1(timestamp_timestamptz, time_val); + Datum res = DirectFunctionCall1(pg_timestamp_to_unix_microseconds, tz); + + return DatumGetInt64(res); + } + if (type == TIMESTAMPTZOID) + { + Datum res = DirectFunctionCall1(pg_timestamp_to_unix_microseconds, time_val); + + return DatumGetInt64(res); + } + + elog(ERROR, "unkown time type oid '%d'", type); +} + +char * +internal_time_to_column_literal_sql(int64 internal_time, Oid type) +{ + char *sql = palloc(100 * sizeof(char)); + + /* ok to waste a little space */ + if (type == INT8OID || type == INT4OID || type == INT8OID) + { + snprintf(sql, 100, "%ld", internal_time); + return sql; + } + /* todo avoid these function calls */ + if (type == TIMESTAMPOID) + { + snprintf(sql, 100, "_iobeamdb_internal.to_timestamp(%ld)::TIMESTAMP", internal_time); + return sql; + } + if (type == TIMESTAMPTZOID) + { + snprintf(sql, 100, "_iobeamdb_internal.to_timestamp(%ld)", internal_time); + return sql; + } + elog(ERROR, "unkown time type oid '%d'", type); +} + +/* Make a RangeVar from a regclass Oid */ +RangeVar * +makeRangeVarFromRelid(Oid relid) +{ + Oid namespace = get_rel_namespace(relid); + char *tableName = get_rel_name(relid); + char *schemaName = get_namespace_name(namespace); + + return makeRangeVar(schemaName, tableName, -1); +} + +int +int_cmp(const void *a, const void *b) +{ + const int *ia = (const int *) a; + const int *ib = (const int *) b; + + return *ia - *ib; +} + +FmgrInfo * +create_fmgr(char *schema, char *function_name, int num_args) +{ + FmgrInfo *finfo = palloc(sizeof(FmgrInfo)); + FuncCandidateList func_list = FuncnameGetCandidates(list_make2(makeString(schema), + makeString(function_name)), + num_args, NULL, false, false, false); + + if (func_list == NULL) + { + elog(ERROR, "couldn't find the function %s.%s", schema, function_name); + } + if (func_list->next != NULL) + { + elog(ERROR, "multiple functions found"); + } + + fmgr_info(func_list->oid, finfo); + + return finfo; +} diff --git a/src/utils.h b/src/utils.h new file mode 100644 index 000000000..02fc490b3 --- /dev/null +++ b/src/utils.h @@ -0,0 +1,39 @@ +#ifndef IOBEAMDB_UTILS_H +#define IOBEAMDB_UTILS_H + +#include "fmgr.h" +#include "nodes/primnodes.h" + +extern Datum pg_timestamp_to_microseconds(PG_FUNCTION_ARGS); +extern Datum pg_microseconds_to_timestamp(PG_FUNCTION_ARGS); +extern Datum pg_timestamp_to_unix_microseconds(PG_FUNCTION_ARGS); +extern Datum pg_unix_microseconds_to_timestamp(PG_FUNCTION_ARGS); + +/* + * Convert a column value into the internal time representation. + */ +extern int64 time_value_to_internal(Datum time_val, Oid type); +extern char *internal_time_to_column_literal_sql(int64 internal_time, Oid type); + +#if 0 +#define CACHE1_elog(a,b) elog(a,b) +#define CACHE2_elog(a,b,c) elog(a,b,c) +#define CACHE3_elog(a,b,c,d) elog(a,b,c,d) +#define CACHE4_elog(a,b,c,d,e) elog(a,b,c,d,e) +#define CACHE5_elog(a,b,c,d,e,f) elog(a,b,c,d,e,f) +#define CACHE6_elog(a,b,c,d,e,f,g) elog(a,b,c,d,e,f,g) +#else +#define CACHE1_elog(a,b) +#define CACHE2_elog(a,b,c) +#define CACHE3_elog(a,b,c,d) +#define CACHE4_elog(a,b,c,d,e) +#define CACHE5_elog(a,b,c,d,e,f) +#define CACHE6_elog(a,b,c,d,e,f,g) +#endif + + +extern FmgrInfo *create_fmgr(char *schema, char *function_name, int num_args); +extern RangeVar *makeRangeVarFromRelid(Oid relid); +extern int int_cmp(const void *a, const void *b); + +#endif /* IOBEAMDB_UTILS_H */ diff --git a/test/expected/sql_query.out b/test/expected/sql_query.out index 06bcd3065..ee28942b6 100644 --- a/test/expected/sql_query.out +++ b/test/expected/sql_query.out @@ -98,20 +98,20 @@ EXPLAIN (verbose ON, costs off) SELECT * FROM PUBLIC."testNs" WHERE device_id = Append -> Seq Scan on "testNs"._hyper_1_0_replica Output: _hyper_1_0_replica."timeCustom", _hyper_1_0_replica.device_id, _hyper_1_0_replica.series_0, _hyper_1_0_replica.series_1, _hyper_1_0_replica.series_2, _hyper_1_0_replica.series_bool - Filter: ((_hyper_1_0_replica.device_id = 'dev20'::text) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_0_replica.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: ((_hyper_1_0_replica.device_id = 'dev20'::text) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_0_replica.device_id, 32768) = '28646'::smallint)) -> Seq Scan on "testNs"._hyper_1_1_0_partition Output: _hyper_1_1_0_partition."timeCustom", _hyper_1_1_0_partition.device_id, _hyper_1_1_0_partition.series_0, _hyper_1_1_0_partition.series_1, _hyper_1_1_0_partition.series_2, _hyper_1_1_0_partition.series_bool - Filter: ((_hyper_1_1_0_partition.device_id = 'dev20'::text) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_partition.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: ((_hyper_1_1_0_partition.device_id = 'dev20'::text) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_partition.device_id, 32768) = '28646'::smallint)) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_1_data Output: _hyper_1_1_0_1_data."timeCustom", _hyper_1_1_0_1_data.device_id, _hyper_1_1_0_1_data.series_0, _hyper_1_1_0_1_data.series_1, _hyper_1_1_0_1_data.series_2, _hyper_1_1_0_1_data.series_bool Recheck Cond: (_hyper_1_1_0_1_data.device_id = 'dev20'::text) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_1_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_1_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "1-testNs_device_id_timeCustom_idx" Index Cond: (_hyper_1_1_0_1_data.device_id = 'dev20'::text) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_2_data Output: _hyper_1_1_0_2_data."timeCustom", _hyper_1_1_0_2_data.device_id, _hyper_1_1_0_2_data.series_0, _hyper_1_1_0_2_data.series_1, _hyper_1_1_0_2_data.series_2, _hyper_1_1_0_2_data.series_bool Recheck Cond: (_hyper_1_1_0_2_data.device_id = 'dev20'::text) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_2_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_2_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "6-testNs_device_id_timeCustom_idx" Index Cond: (_hyper_1_1_0_2_data.device_id = 'dev20'::text) (19 rows) @@ -122,20 +122,20 @@ EXPLAIN (verbose ON, costs off) SELECT * FROM PUBLIC."testNs" WHERE device_id = Append -> Seq Scan on "testNs"._hyper_1_0_replica Output: _hyper_1_0_replica."timeCustom", _hyper_1_0_replica.device_id, _hyper_1_0_replica.series_0, _hyper_1_0_replica.series_1, _hyper_1_0_replica.series_2, _hyper_1_0_replica.series_bool - Filter: ((_hyper_1_0_replica.device_id = 'dev20'::text) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_0_replica.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: ((_hyper_1_0_replica.device_id = 'dev20'::text) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_0_replica.device_id, 32768) = '28646'::smallint)) -> Seq Scan on "testNs"._hyper_1_1_0_partition Output: _hyper_1_1_0_partition."timeCustom", _hyper_1_1_0_partition.device_id, _hyper_1_1_0_partition.series_0, _hyper_1_1_0_partition.series_1, _hyper_1_1_0_partition.series_2, _hyper_1_1_0_partition.series_bool - Filter: ((_hyper_1_1_0_partition.device_id = 'dev20'::text) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_partition.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: ((_hyper_1_1_0_partition.device_id = 'dev20'::text) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_partition.device_id, 32768) = '28646'::smallint)) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_1_data Output: _hyper_1_1_0_1_data."timeCustom", _hyper_1_1_0_1_data.device_id, _hyper_1_1_0_1_data.series_0, _hyper_1_1_0_1_data.series_1, _hyper_1_1_0_1_data.series_2, _hyper_1_1_0_1_data.series_bool Recheck Cond: (_hyper_1_1_0_1_data.device_id = 'dev20'::text) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_1_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_1_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "1-testNs_device_id_timeCustom_idx" Index Cond: (_hyper_1_1_0_1_data.device_id = 'dev20'::text) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_2_data Output: _hyper_1_1_0_2_data."timeCustom", _hyper_1_1_0_2_data.device_id, _hyper_1_1_0_2_data.series_0, _hyper_1_1_0_2_data.series_1, _hyper_1_1_0_2_data.series_2, _hyper_1_1_0_2_data.series_bool Recheck Cond: (_hyper_1_1_0_2_data.device_id = 'dev20'::text) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_2_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_2_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "6-testNs_device_id_timeCustom_idx" Index Cond: (_hyper_1_1_0_2_data.device_id = 'dev20'::text) (19 rows) @@ -146,20 +146,20 @@ EXPLAIN (verbose ON, costs off) SELECT * FROM PUBLIC."testNs" WHERE 'dev'||'20' Append -> Seq Scan on "testNs"._hyper_1_0_replica Output: _hyper_1_0_replica."timeCustom", _hyper_1_0_replica.device_id, _hyper_1_0_replica.series_0, _hyper_1_0_replica.series_1, _hyper_1_0_replica.series_2, _hyper_1_0_replica.series_bool - Filter: (('dev20'::text = _hyper_1_0_replica.device_id) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_0_replica.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: (('dev20'::text = _hyper_1_0_replica.device_id) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_0_replica.device_id, 32768) = '28646'::smallint)) -> Seq Scan on "testNs"._hyper_1_1_0_partition Output: _hyper_1_1_0_partition."timeCustom", _hyper_1_1_0_partition.device_id, _hyper_1_1_0_partition.series_0, _hyper_1_1_0_partition.series_1, _hyper_1_1_0_partition.series_2, _hyper_1_1_0_partition.series_bool - Filter: (('dev20'::text = _hyper_1_1_0_partition.device_id) AND ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_partition.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint)) + Filter: (('dev20'::text = _hyper_1_1_0_partition.device_id) AND (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_partition.device_id, 32768) = '28646'::smallint)) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_1_data Output: _hyper_1_1_0_1_data."timeCustom", _hyper_1_1_0_1_data.device_id, _hyper_1_1_0_1_data.series_0, _hyper_1_1_0_1_data.series_1, _hyper_1_1_0_1_data.series_2, _hyper_1_1_0_1_data.series_bool Recheck Cond: ('dev20'::text = _hyper_1_1_0_1_data.device_id) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_1_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_1_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "1-testNs_device_id_timeCustom_idx" Index Cond: ('dev20'::text = _hyper_1_1_0_1_data.device_id) -> Bitmap Heap Scan on "testNs"._hyper_1_1_0_2_data Output: _hyper_1_1_0_2_data."timeCustom", _hyper_1_1_0_2_data.device_id, _hyper_1_1_0_2_data.series_0, _hyper_1_1_0_2_data.series_1, _hyper_1_1_0_2_data.series_2, _hyper_1_1_0_2_data.series_bool Recheck Cond: ('dev20'::text = _hyper_1_1_0_2_data.device_id) - Filter: ((((_iobeamdb_internal.murmur3_hash_string(_hyper_1_1_0_2_data.device_id, 1) & 2147483647) % 32768))::smallint = '28646'::smallint) + Filter: (_iobeamdb_catalog.get_partition_for_key(_hyper_1_1_0_2_data.device_id, 32768) = '28646'::smallint) -> Bitmap Index Scan on "6-testNs_device_id_timeCustom_idx" Index Cond: ('dev20'::text = _hyper_1_1_0_2_data.device_id) (19 rows)