diff --git a/sql/common/tables.sql b/sql/common/tables.sql index ac43a71fd..9d5888c39 100644 --- a/sql/common/tables.sql +++ b/sql/common/tables.sql @@ -2,7 +2,9 @@ CREATE TABLE IF NOT EXISTS node ( database_name NAME PRIMARY KEY NOT NULL, schema_name NAME UNIQUE NOT NULL, --public schema of remote server_name NAME UNIQUE NOT NULL, - hostname TEXT NOT NULL + hostname TEXT NOT NULL, + active BOOLEAN NOT NULL DEFAULT TRUE, + id SERIAL UNIQUE -- id for node. used in naming ); CREATE TABLE IF NOT EXISTS cluster_user ( @@ -10,39 +12,121 @@ CREATE TABLE IF NOT EXISTS cluster_user ( password TEXT --not any more of a security hole than usual since stored in pg_user_mapping anyway ); -CREATE TABLE IF NOT EXISTS namespace ( - name NAME PRIMARY KEY NOT NULL, - schema_name NAME UNIQUE NOT NULL, - cluster_table_name NAME NOT NULL, - cluster_distinct_table_name NAME NOT NULL + +--The hypertable is an abstraction that represents a replicated table that is partition on 2 dimensions. +--One of the dimensions is time, the other is arbitrary. +--This abstraction also tracks the distinct value set of any columns marked as `distinct`. +CREATE TABLE IF NOT EXISTS hypertable ( + name NAME NOT NULL PRIMARY KEY CHECK(name NOT LIKE '\_%'), + main_schema_name NAME NOT NULL, + main_table_name NAME NOT NULL, + associated_schema_name NAME NOT NULL, + associated_table_prefix NAME NOT NULL, + root_schema_name NAME NOT NULL, + root_table_name NAME NOT NULL, + distinct_schema_name NAME NOT NULL, + distinct_table_name NAME NOT NULL, + replication_factor SMALLINT NOT NULL CHECK(replication_factor > 0), + UNIQUE (main_schema_name, main_table_name), + UNIQUE (associated_schema_name, associated_table_prefix), + UNIQUE (root_schema_name, root_table_name) ); -CREATE TABLE IF NOT EXISTS namespace_node ( - namespace_name NAME REFERENCES namespace (name) NOT NULL, - database_name NAME REFERENCES node (database_name) NOT NULL, - master_table_name NAME NOT NULL, - remote_table_name NAME NOT NULL, - distinct_local_table_name NAME NOT NULL, - distinct_remote_table_name NAME NOT NULL, - PRIMARY KEY (namespace_name, database_name) +CREATE TABLE IF NOT EXISTS hypertable_replica ( + hypertable_name NAME NOT NULL REFERENCES hypertable (name), + replica_id SMALLINT NOT NULL CHECK(replica_id >= 0), + schema_name NAME NOT NULL, + table_name NAME NOT NULL, + distinct_schema_name NAME NOT NULL, + distinct_table_name NAME NOT NULL, + PRIMARY KEY (hypertable_name, replica_id), + UNIQUE (schema_name, table_name) ); -CREATE UNIQUE INDEX IF NOT EXISTS unique_remote_table_name_per_namespace - ON namespace_node (namespace_name, remote_table_name); -CREATE UNIQUE INDEX IF NOT EXISTS unique_distinct_remote_table_name_per_namespace - ON namespace_node (namespace_name, distinct_remote_table_name); + +--there should be one distinct_replica_node for each node with a chunk from that replica +--so there can be multiple rows for one hypertable-replica on different nodes. +--that way writes are local. Optimized reads are also local for many queries. +--But, some read queries are cross-node. +CREATE TABLE IF NOT EXISTS distinct_replica_node ( + hypertable_name NAME NOT NULL, + replica_id SMALLINT NOT NULL, + database_name NAME NOT NULL REFERENCES node(database_name), + schema_name NAME NOT NULL, + table_name NAME NOT NULL, + PRIMARY KEY (hypertable_name, replica_id, database_name), + UNIQUE (schema_name, table_name), + FOREIGN KEY (hypertable_name, replica_id) REFERENCES hypertable_replica(hypertable_name, replica_id) +); + +CREATE TABLE IF NOT EXISTS partition_epoch ( + id SERIAL PRIMARY KEY, + hypertable_name NAME NOT NULL REFERENCES hypertable (name), + start_time BIGINT CHECK(start_time > 0), + end_time BIGINT CHECK(end_time > 0), + partitioning_func NAME NOT NULL, + partitioning_mod INT NOT NULL CHECK(partitioning_mod < 65536), + partitioning_field NAME NOT NULL, + UNIQUE(hypertable_name, start_time), + UNIQUE(hypertable_name, end_time), + CHECK(start_time < end_time) +); + +CREATE TABLE IF NOT EXISTS partition ( + id SERIAL PRIMARY KEY, + epoch_id INT NOT NULL REFERENCES partition_epoch(id), + keyspace_start SMALLINT NOT NULL CHECK(keyspace_start >= 0), --start inclusive + keyspace_end SMALLINT NOT NULL CHECK(keyspace_end > 0), --end inclusive; compatible with between operator + UNIQUE(epoch_id, keyspace_start), + CHECK(keyspace_end > keyspace_start) +); + +--todo: trigger to verify partition_epoch hypertable name matches this hypertable_name +CREATE TABLE IF NOT EXISTS partition_replica ( + id SERIAL PRIMARY KEY, + partition_id INT NOT NULL REFERENCES partition(id), + hypertable_name NAME NOT NULL, + replica_id SMALLINT NOT NULL, + schema_name NAME NOT NULL, + table_name NAME NOT NULL, + UNIQUE(schema_name, table_name), + FOREIGN KEY (hypertable_name, replica_id) REFERENCES hypertable_replica(hypertable_name, replica_id) +); + + +CREATE TABLE IF NOT EXISTS chunk ( + id SERIAL PRIMARY KEY, + partition_id INT NOT NULL REFERENCES partition(id), + start_time BIGINT CHECK(start_time > 0), + end_time BIGINT CHECK(end_time > 0), + UNIQUE(partition_id, start_time), + UNIQUE(partition_id, end_time), + CHECK(start_time < end_time) +); + + +--mapping between chunks, partition_replica, and node +CREATE TABLE IF NOT EXISTS chunk_replica_node ( + chunk_id INT NOT NULL REFERENCES chunk(id), + partition_replica_id INT NOT NULL REFERENCES partition_replica(id), + database_name NAME NOT NULL REFERENCES node(database_name), + schema_name NAME NOT NULL, + table_name NAME NOT NULL, + PRIMARY KEY (chunk_id, partition_replica_id), --a single chunk, replica tuple + UNIQUE (chunk_id, database_name), --no two chunk replicas on same node + UNIQUE (schema_name, table_name) +); CREATE TABLE IF NOT EXISTS field ( - namespace_name NAME NOT NULL REFERENCES namespace (name), - name NAME NOT NULL, - data_type REGTYPE NOT NULL CHECK (data_type IN - ('double precision' :: REGTYPE, 'text' :: REGTYPE, 'boolean' :: REGTYPE, 'bigint' :: REGTYPE)), - is_partitioning BOOLEAN DEFAULT FALSE, - is_distinct BOOLEAN DEFAULT FALSE, - index_types field_index_type [], - PRIMARY KEY (namespace_name, name) + hypertable_name NAME NOT NULL REFERENCES hypertable (name), + name NAME NOT NULL, + data_type REGTYPE NOT NULL, + is_partitioning BOOLEAN NOT NULL DEFAULT FALSE, + is_distinct BOOLEAN NOT NULL DEFAULT FALSE, + index_types field_index_type [] NOT NULL, + PRIMARY KEY (hypertable_name, name) ); CREATE UNIQUE INDEX IF NOT EXISTS one_partition_field - ON field (namespace_name) + ON field (hypertable_name) WHERE is_partitioning; diff --git a/sql/main/chunk.sql b/sql/main/chunk.sql new file mode 100644 index 000000000..3304e8b90 --- /dev/null +++ b/sql/main/chunk.sql @@ -0,0 +1,51 @@ +CREATE OR REPLACE FUNCTION lock_for_chunk_close( + chunk_id INTEGER +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE +BEGIN + --take an update lock on the chunk row + --this conflicts, by design, with the lock taken when inserting on the node getting the insert command (not the node with the chunk table) + PERFORM * + FROM chunk c + WHERE c.id = chunk_id + FOR UPDATE; +END +$BODY$; + + +CREATE OR REPLACE FUNCTION max_time_for_chunk_close( + schema_name NAME, + table_name NAME +) + RETURNS BIGINT LANGUAGE PLPGSQL STABLE AS +$BODY$ +DECLARE + max_time BIGINT; +BEGIN + EXECUTE format( + $$ + SELECT max("time") + FROM %I.%I + $$, + schema_name, table_name) + INTO max_time; + + RETURN max_time; +END +$BODY$; + +CREATE OR REPLACE FUNCTION set_end_time_for_chunk_close( + chunk_id INTEGER, + max_time BIGINT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE +BEGIN + UPDATE chunk SET end_time = max_time WHERE id = chunk_id; +END +$BODY$; + + diff --git a/sql/main/chunk_replica_node_index.sql b/sql/main/chunk_replica_node_index.sql new file mode 100644 index 000000000..f732ae057 --- /dev/null +++ b/sql/main/chunk_replica_node_index.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION _sysinternal.create_chunk_replica_node_index( + schema_name NAME, + table_name NAME, + field_name NAME, + index_type field_index_type +) + RETURNS VOID LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + index_name NAME; + prefix BIGINT; +BEGIN + prefix = nextval('chunk_replica_node_index_name_prefix'); + IF index_type = 'TIME-VALUE' THEN + index_name := format('%s-time-%s', prefix, field_name); + ELSIF index_type = 'VALUE-TIME' THEN + index_name := format('%s-%s-time', prefix, field_name); + ELSE + RAISE EXCEPTION 'Unknown index type %', index_type + USING ERRCODE = 'IO103'; + END IF; + + INSERT INTO chunk_replica_node_index (schema_name, table_name, field_name, index_name, index_type) VALUES + (schema_name, table_name, field_name, index_name, index_type) + ON CONFLICT DO NOTHING; +END +$BODY$; diff --git a/sql/main/chunk_replica_node_index_triggers.sql b/sql/main/chunk_replica_node_index_triggers.sql new file mode 100644 index 000000000..a4fefcd88 --- /dev/null +++ b/sql/main/chunk_replica_node_index_triggers.sql @@ -0,0 +1,41 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node_index() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + IF NEW.index_type = 'TIME-VALUE' THEN + EXECUTE format( + $$ + CREATE INDEX %1$I ON %2$I.%3$I + USING BTREE(time DESC NULLS LAST, %4$I) + WHERE %4$I IS NOT NULL + $$, + NEW.index_name, NEW.schema_name, NEW.table_name, NEW.field_name); + ELSIF NEW.index_type = 'VALUE-TIME' THEN + EXECUTE format( + $$ + CREATE INDEX %1$I ON %2$I.%3$I + USING BTREE(%4$I, time DESC NULLS LAST) + WHERE %4$I IS NOT NULL + $$, + NEW.index_name, NEW.schema_name, NEW.table_name, NEW.field_name); + ELSE + RAISE EXCEPTION 'Unknown index type %', index_type + USING ERRCODE = 'IO103'; + END IF; + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node_index +ON chunk_replica_node_index; +CREATE TRIGGER trigger_on_create_chunk_replica_node_index AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node_index +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node_index(); +COMMIT; diff --git a/sql/main/chunk_replica_node_triggers.sql b/sql/main/chunk_replica_node_triggers.sql new file mode 100644 index 000000000..60ec19a3d --- /dev/null +++ b/sql/main/chunk_replica_node_triggers.sql @@ -0,0 +1,51 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + partition_replica_row partition_replica; + chunk_row chunk; + field_row field; +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + SELECT * + INTO STRICT partition_replica_row + FROM partition_replica AS p + WHERE p.id = NEW.partition_replica_id; + + SELECT * + INTO STRICT chunk_row + FROM chunk AS c + WHERE c.id = NEW.chunk_id; + + IF NEW.database_name = current_database() THEN + PERFORM _sysinternal.create_local_data_table(NEW.schema_name, NEW.table_name, + partition_replica_row.schema_name, partition_replica_row.table_name); + + FOR field_row IN SELECT f.* + FROM field AS f + WHERE f.hypertable_name = partition_replica_row.hypertable_name LOOP + PERFORM _sysinternal.create_chunk_replica_node_index(NEW.schema_name, NEW.table_name, field_row.name, index_type) + FROM unnest(field_row.index_types) AS index_type; + END LOOP; + ELSE + PERFORM _sysinternal.create_remote_table(NEW.schema_name, NEW.table_name, + partition_replica_row.schema_name, partition_replica_row.table_name, NEW.database_name); + END IF; + + PERFORM _sysinternal.set_time_constraint(NEW.schema_name, NEW.table_name, chunk_row.start_time, chunk_row.end_time); + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node +ON chunk_replica_node; +CREATE TRIGGER trigger_on_create_chunk_replica_node AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node(); +COMMIT; diff --git a/sql/main/chunk_triggers.sql b/sql/main/chunk_triggers.sql new file mode 100644 index 000000000..77b619beb --- /dev/null +++ b/sql/main/chunk_triggers.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE +BEGIN + IF TG_OP <> 'INSERT' AND TG_OP <> 'UPDATE' THEN + RAISE EXCEPTION 'Only inserts and updates supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + IF TG_OP = 'UPDATE' THEN + PERFORM _sysinternal.set_time_constraint(crn.schema_name, crn.table_name, NEW.start_time, NEW.end_time) + FROM chunk_replica_node crn + WHERE crn.chunk_id = NEW.id; + END IF; + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_chunk +ON chunk; +CREATE TRIGGER trigger_on_create_chunk AFTER INSERT OR UPDATE OR DELETE ON chunk +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk(); +COMMIT; diff --git a/sql/main/data_table_triggers.sql b/sql/main/data_table_triggers.sql deleted file mode 100644 index 0c2bddb26..000000000 --- a/sql/main/data_table_triggers.sql +++ /dev/null @@ -1,119 +0,0 @@ -CREATE OR REPLACE FUNCTION _sysinternal.create_data_table_index( - table_oid REGCLASS, - field_name NAME, - index_type field_index_type -) - RETURNS VOID LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - index_name NAME; - prefix BIGINT; -BEGIN - prefix = nextval('data_table_index_name_prefix'); - IF index_type = 'TIME-VALUE' THEN - index_name := format('%s-time-%s', prefix, field_name); - ELSIF index_type = 'VALUE-TIME' THEN - index_name := format('%s-%s-time', prefix, field_name); - ELSE - RAISE EXCEPTION 'Unknown index type %', index_type - USING ERRCODE = 'IO103'; - END IF; - - INSERT INTO data_table_index (table_oid, field_name, index_name, index_type) VALUES - (table_oid, field_name, index_name, index_type) - ON CONFLICT DO NOTHING; -END -$BODY$; - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_data_table() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - field_row field; -BEGIN - IF TG_OP = 'UPDATE' THEN - IF ( - (OLD.start_time IS NULL AND new.start_time IS NOT NULL) - OR - (OLD.end_time IS NULL AND new.end_time IS NOT NULL) - ) - AND ( - OLD.table_oid = NEW.table_oid AND - OLD.namespace_name = NEW.namespace_name AND - OLD.partition_number = NEW.partition_number AND - OLD.total_partitions = NEW.total_partitions AND - OLD.partitioning_field = NEW.partitioning_field - ) THEN - RETURN NEW; - ELSE - RAISE EXCEPTION 'This type of update not allowed on data_table' - USING ERRCODE = 'IO101'; - END IF; - END IF; - - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts and updates supported on data_table table' - USING ERRCODE = 'IO101'; - END IF; - - FOR field_row IN SELECT f.* - FROM field AS f - WHERE f.namespace_name = NEW.namespace_name LOOP - PERFORM _sysinternal.create_data_table_index(NEW.table_oid, field_row.name, index_type) - FROM unnest(field_row.index_types) AS index_type; - END LOOP; - - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_on_create_data_table -ON data_table; -CREATE TRIGGER trigger_on_create_data_table AFTER INSERT OR UPDATE OR DELETE ON data_table -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_data_table(); -COMMIT; - - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_data_table_index() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace table' - USING ERRCODE = 'IO101'; - END IF; - - IF NEW.index_type = 'TIME-VALUE' THEN - EXECUTE format( - $$ - CREATE INDEX %1$I ON %2$s - USING BTREE(time DESC NULLS LAST, %3$I) - WHERE %3$I IS NOT NULL - $$, - NEW.index_name, NEW.table_oid, NEW.field_name); - ELSIF NEW.index_type = 'VALUE-TIME' THEN - EXECUTE format( - $$ - CREATE INDEX %1$I ON %2$s - USING BTREE(%3$I, time DESC NULLS LAST) - WHERE %3$I IS NOT NULL - $$, - NEW.index_name, NEW.table_oid, NEW.field_name); - ELSE - RAISE EXCEPTION 'Unknown index type %', index_type - USING ERRCODE = 'IO103'; - END IF; - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_on_create_data_table_index -ON data_table_index; -CREATE TRIGGER trigger_on_create_data_table_index AFTER INSERT OR UPDATE OR DELETE ON data_table_index -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_data_table_index(); -COMMIT; diff --git a/sql/main/distinct_replica_node_triggers.sql b/sql/main/distinct_replica_node_triggers.sql new file mode 100644 index 000000000..7dc9c5e9e --- /dev/null +++ b/sql/main/distinct_replica_node_triggers.sql @@ -0,0 +1,36 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_distinct_replica_node() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + hypertable_replica_row hypertable_replica; +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + SELECT * + INTO STRICT hypertable_replica_row + FROM hypertable_replica AS h + WHERE h.hypertable_name = NEW.hypertable_name AND + h.replica_id = NEW.replica_id; + + IF NEW.database_name = current_database() THEN + PERFORM _sysinternal.create_local_distinct_table(NEW.schema_name, NEW.table_name, + hypertable_replica_row.distinct_schema_name, hypertable_replica_row.distinct_table_name); + ELSE + PERFORM _sysinternal.create_remote_table(NEW.schema_name, NEW.table_name, + hypertable_replica_row.distinct_schema_name, hypertable_replica_row.distinct_table_name, NEW.database_name); + END IF; + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_distinct_replica_node +ON distinct_replica_node; +CREATE TRIGGER trigger_on_create_distinct_replica_node AFTER INSERT OR UPDATE OR DELETE ON distinct_replica_node +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_distinct_replica_node(); +COMMIT; diff --git a/sql/main/field_triggers.sql b/sql/main/field_triggers.sql index e8af457b3..8838338ff 100644 --- a/sql/main/field_triggers.sql +++ b/sql/main/field_triggers.sql @@ -1,5 +1,5 @@ -CREATE OR REPLACE FUNCTION create_field_indexes_on_data_tables( - namespace_name NAME, +CREATE OR REPLACE FUNCTION _sysinternal.create_chunk_replica_node_indexes_for_field( + hypertable_name NAME, field_name NAME, index_types field_index_type [] ) @@ -7,18 +7,35 @@ CREATE OR REPLACE FUNCTION create_field_indexes_on_data_tables( $BODY$ DECLARE BEGIN - PERFORM _sysinternal.create_data_table_index(dt.table_oid, field_name, index_type) - FROM data_table dt + PERFORM _sysinternal.create_chunk_replica_node_index(crn.schema_name, crn.table_name, field_name, index_type) + FROM chunk_replica_node crn + INNER JOIN partition_replica pr ON (pr.id = crn.partition_replica_id) CROSS JOIN unnest(index_types) AS index_type - WHERE dt.namespace_name = create_field_indexes_on_data_tables.namespace_name; + WHERE pr.hypertable_name = create_chunk_replica_node_indexes_for_field.hypertable_name; END $BODY$; -CREATE OR REPLACE FUNCTION create_field_on_cluster_table( - schema_name NAME, - cluster_table_name NAME, - field NAME, - data_type_oid REGTYPE +CREATE OR REPLACE FUNCTION _sysinternal.create_partition_constraint_for_field( + hypertable_name NAME, + field_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + PERFORM _sysinternal.add_partition_constraint(pr.schema_name, pr.table_name, p.keyspace_start, p.keyspace_end, p.epoch_id) + FROM partition_epoch pe + INNER JOIN partition p ON (p.epoch_id = pe.id) + INNER JOIN partition_replica pr ON (pr.partition_id = p.id) + WHERE pe.hypertable_name = create_partition_constraint_for_field.hypertable_name + AND pe.partitioning_field = create_partition_constraint_for_field.field_name; +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_field_on_root_table( + schema_name NAME, + table_name NAME, + field NAME, + data_type_oid REGTYPE ) RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS $BODY$ @@ -28,7 +45,7 @@ BEGIN $$ ALTER TABLE %1$I.%2$I ADD COLUMN %3$I %4$s DEFAULT NULL $$, - schema_name, cluster_table_name, field, data_type_oid); + schema_name, table_name, field, data_type_oid); END $BODY$; @@ -38,23 +55,23 @@ CREATE OR REPLACE FUNCTION _sysinternal.on_create_field() RETURNS TRIGGER LANGUAGE PLPGSQL AS $BODY$ DECLARE - namespace_row namespace; - node_row node; + hypertable_row hypertable; BEGIN IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace table' + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME USING ERRCODE = 'IO101'; END IF; SELECT * - INTO STRICT namespace_row - FROM namespace AS ns - WHERE ns.name = NEW.namespace_name; + INTO STRICT hypertable_row + FROM hypertable AS h + WHERE h.name = NEW.hypertable_name; - PERFORM create_field_on_cluster_table(namespace_row.schema_name, namespace_row.cluster_table_name, NEW.name, + PERFORM _sysinternal.create_field_on_root_table(hypertable_row.root_schema_name, hypertable_row.root_table_name, NEW.name, NEW.data_type); - PERFORM create_field_indexes_on_data_tables(NEW.namespace_name, NEW.name, NEW.index_types); + PERFORM _sysinternal.create_partition_constraint_for_field(NEW.hypertable_name, NEW.name); + PERFORM _sysinternal.create_chunk_replica_node_indexes_for_field(NEW.hypertable_name, NEW.name, NEW.index_types); RETURN NEW; END $BODY$ diff --git a/sql/main/hypertable_replica_triggers.sql b/sql/main/hypertable_replica_triggers.sql new file mode 100644 index 000000000..c906d69f6 --- /dev/null +++ b/sql/main/hypertable_replica_triggers.sql @@ -0,0 +1,33 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable_replica() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + hypertable_row hypertable; +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on hypertable_replica table' + USING ERRCODE = 'IO101'; + END IF; + + SELECT * + INTO STRICT hypertable_row + FROM hypertable AS h + WHERE h.name = NEW.hypertable_name; + + + PERFORM _sysinternal.create_replica_table(NEW.schema_name, NEW.table_name, + hypertable_row.root_schema_name, hypertable_row.root_table_name); + PERFORM _sysinternal.create_replica_table(NEW.distinct_schema_name, NEW.distinct_table_name, + hypertable_row.distinct_schema_name, hypertable_row.distinct_table_name); + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_hypertable_replica +ON hypertable_replica; +CREATE TRIGGER trigger_on_create_hypertable_replica AFTER INSERT OR UPDATE OR DELETE ON hypertable_replica +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable_replica(); +COMMIT; diff --git a/sql/main/hypertable_triggers.sql b/sql/main/hypertable_triggers.sql new file mode 100644 index 000000000..07b31289d --- /dev/null +++ b/sql/main/hypertable_triggers.sql @@ -0,0 +1,33 @@ + + +CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + remote_node node; +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on namespace table' + USING ERRCODE = 'IO101'; + END IF; + + PERFORM _sysinternal.create_schema(NEW.main_schema_name); + PERFORM _sysinternal.create_schema(NEW.associated_schema_name); + PERFORM _sysinternal.create_schema(NEW.root_schema_name); + PERFORM _sysinternal.create_schema(NEW.distinct_schema_name); + + PERFORM _sysinternal.create_root_table(NEW.root_schema_name, NEW.root_table_name); + PERFORM _sysinternal.create_root_distinct_table(NEW.distinct_schema_name, NEW.distinct_table_name); + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_hypertable +ON hypertable; +CREATE TRIGGER trigger_on_create_hypertable AFTER INSERT OR UPDATE OR DELETE ON hypertable +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable(); +COMMIT; + diff --git a/sql/main/insert.sql b/sql/main/insert.sql index d2b3d622b..a0c9e8f64 100644 --- a/sql/main/insert.sql +++ b/sql/main/insert.sql @@ -1,5 +1,5 @@ CREATE OR REPLACE FUNCTION get_fields_from_json( - namespace_name NAME + hypertable_name NAME ) RETURNS TEXT [] LANGUAGE SQL STABLE AS $BODY$ @@ -10,27 +10,27 @@ SELECT ARRAY( f.name AS field_name, f.data_type FROM field AS f - WHERE f.namespace_name = get_fields_from_json.namespace_name + WHERE f.hypertable_name = get_fields_from_json.hypertable_name ORDER BY f.name ) AS info ); $BODY$; CREATE OR REPLACE FUNCTION get_field_list( - namespace_name NAME + hypertable_name NAME ) RETURNS TEXT LANGUAGE SQL STABLE AS $BODY$ -SELECT array_to_string(get_quoted_field_names(namespace_name), ', ') +SELECT array_to_string(get_quoted_field_names(hypertable_name), ', ') $BODY$; CREATE OR REPLACE FUNCTION get_field_from_json_list( - namespace_name NAME + hypertable_name NAME ) RETURNS TEXT LANGUAGE SQL STABLE AS $BODY$ -SELECT array_to_string(get_fields_from_json(namespace_name), ', ') +SELECT array_to_string(get_fields_from_json(hypertable_name), ', ') $BODY$; @@ -46,7 +46,7 @@ BEGIN EXECUTE format( $$ CREATE TEMP TABLE "%s" ( - namespace_name name NOT NULL, + hypertable_name name NOT NULL, time BIGINT NOT NULL, value jsonb ) ON COMMIT DROP @@ -65,28 +65,28 @@ CREATE OR REPLACE FUNCTION insert_data_one_partition( RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS $BODY$ DECLARE - data_table_row data_table; - distinct_table_oid REGCLASS; - time_point BIGINT; - namespace_point NAME; + chunk_replica_node_row chunk_replica; + distinct_table_oid REGCLASS; + time_point BIGINT; + hypertable_point NAME; BEGIN time_point := 1; EXECUTE format( $$ - SELECT "time", namespace_name FROM %s ORDER BY namespace_name LIMIT 1 + SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1 $$, copy_table_oid) - INTO time_point, namespace_point; + INTO time_point, hypertable_point; WHILE time_point IS NOT NULL LOOP SELECT * INTO distinct_table_oid - FROM get_distinct_local_table_oid(namespace_point); + FROM get_distinct_local_table_oid(hypertable_point); SELECT * INTO data_table_row - FROM get_or_create_data_table(time_point, namespace_point, + FROM get_or_create_data_table(time_point, hypertable_point, partition_number, total_partitions); BEGIN @@ -96,13 +96,13 @@ BEGIN ( DELETE FROM %2$s WHERE ("time" >= %3$L OR %3$L IS NULL) and ("time" <= %4$L OR %4$L IS NULL) - AND namespace_name = %5$L + AND hypertable_name = %5$L RETURNING * ), distinct_field AS ( SELECT name FROM field - WHERE namespace_name = %5$L AND is_distinct = TRUE + WHERE hypertable_name = %5$L AND is_distinct = TRUE ), insert_distinct AS ( INSERT INTO %6$s as distinct_table @@ -120,17 +120,17 @@ BEGIN ) INSERT INTO %1$s (time, %7$s) SELECT time, %8$s FROM selected; $$, data_table_row.table_oid, copy_table_oid, data_table_row.start_time, - data_table_row.end_time, namespace_point, distinct_table_oid, - get_field_list(namespace_point), - get_field_from_json_list(namespace_point)) + data_table_row.end_time, hypertable_point, distinct_table_oid, + get_field_list(hypertable_point), + get_field_from_json_list(hypertable_point)) USING data_table_row; EXECUTE format( $$ - SELECT "time", namespace_name FROM %s ORDER BY namespace_name LIMIT 1 + SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1 $$, copy_table_oid) - INTO time_point, namespace_point; + INTO time_point, hypertable_point; EXCEPTION WHEN deadlock_detected THEN --do nothing, rerun loop (deadlock can be caused by concurrent updates to distinct table) diff --git a/sql/main/namespace_triggers.sql b/sql/main/namespace_triggers.sql deleted file mode 100644 index 33b4303d9..000000000 --- a/sql/main/namespace_triggers.sql +++ /dev/null @@ -1,201 +0,0 @@ -CREATE OR REPLACE FUNCTION create_schema( - schema_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE SCHEMA %I - $$, schema_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_cluster_table( - schema_name NAME, - table_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE TABLE IF NOT EXISTS %I.%I ( - time BIGINT NOT NULL - ) - $$, schema_name, table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_cluster_distinct_table( - schema_name NAME, - table_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE TABLE IF NOT EXISTS %1$I.%2$I ( - field TEXT, - value TEXT, - last_time_approx BIGINT, - PRIMARY KEY(field, value) - ); - $$, schema_name, table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_local_distinct_table( - schema_name NAME, - table_name NAME, - cluster_table_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE TABLE IF NOT EXISTS %1$I.%2$I (PRIMARY KEY(field, value)) - INHERITS(%1$I.%3$I); - - $$, schema_name, table_name, cluster_table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_remote_distinct_table( - schema_name NAME, - local_table_name NAME, - remote_table_name NAME, - cluster_table_name NAME, - server_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I) SERVER %4$I - OPTIONS (schema_name %1$L, table_name %5$L) - $$, - schema_name, remote_table_name, cluster_table_name, server_name, local_table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_master_table( - schema_name NAME, - table_name NAME, - cluster_table_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I) - $$, - schema_name, table_name, cluster_table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION create_remote_table( - schema_name NAME, - remote_table_name NAME, - master_table_name NAME, - cluster_table_name NAME, - server_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I) SERVER %4$I OPTIONS (schema_name %1$L, table_name %5$L) - $$, - schema_name, remote_table_name, cluster_table_name, server_name, master_table_name); -END -$BODY$; - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_namespace() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - remote_node node; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace table' - USING ERRCODE = 'IO101'; - END IF; - - IF NEW.schema_name = 'public' THEN - RAISE EXCEPTION 'public is not an allowed namespace name' - USING ERRCODE = 'IO104'; - END IF; - - IF position('_' IN NEW.schema_name) = 1 THEN - RAISE EXCEPTION 'Namespace cannot begin with an underscore (_)' - USING ERRCODE = 'IO104'; - END IF; - - PERFORM create_schema(NEW.schema_name); - PERFORM create_cluster_table(NEW.schema_name, NEW.cluster_table_name); - PERFORM create_cluster_distinct_table(NEW.schema_name, NEW.cluster_distinct_table_name); - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_on_create_namespace -ON namespace; -CREATE TRIGGER trigger_on_create_namespace AFTER INSERT OR UPDATE OR DELETE ON namespace -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_namespace(); -COMMIT; - - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_namespace_node() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - namespace_row namespace; - node_row node; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace table' - USING ERRCODE = 'IO101'; - END IF; - - SELECT * - INTO STRICT namespace_row - FROM namespace AS ns - WHERE ns.name = NEW.namespace_name; - - IF NEW.database_name = current_database() THEN - PERFORM create_master_table(namespace_row.schema_name, NEW.master_table_name, namespace_row.cluster_table_name); - PERFORM create_local_distinct_table(namespace_row.schema_name, NEW.distinct_local_table_name, - namespace_row.cluster_distinct_table_name); - ELSE - SELECT * - INTO STRICT node_row - FROM node AS n - WHERE n.database_name = NEW.database_name; - - PERFORM create_remote_table(namespace_row.schema_name, NEW.remote_table_name, NEW.master_table_name, - namespace_row.cluster_table_name, - node_row.server_name); - PERFORM create_remote_distinct_table(namespace_row.schema_name, NEW.distinct_local_table_name, - NEW.distinct_remote_table_name, - namespace_row.cluster_distinct_table_name, - node_row.server_name); - END IF; - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_on_create_namespace_node -ON namespace_node; -CREATE TRIGGER trigger_on_create_namespace_node AFTER INSERT OR UPDATE OR DELETE ON namespace_node -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_namespace_node(); -COMMIT; diff --git a/sql/main/partition_replica_triggers.sql b/sql/main/partition_replica_triggers.sql new file mode 100644 index 000000000..10be0cd5c --- /dev/null +++ b/sql/main/partition_replica_triggers.sql @@ -0,0 +1,31 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition_replica_table() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + PERFORM _sysinternal.create_data_partition_table( + NEW.schema_name, NEW.table_name, + hr.schema_name, hr.table_name, + p.keyspace_start, p.keyspace_end, + p.epoch_id) + FROM hypertable_replica hr + CROSS JOIN partition p + WHERE hr.hypertable_name = NEW.hypertable_name AND + hr.replica_id = NEW.replica_id AND + p.id = NEW.partition_id; + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_partition_replica_table +ON partition_replica; +CREATE TRIGGER trigger_on_create_partition_replica_table AFTER INSERT OR UPDATE OR DELETE ON partition_replica +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition_replica_table(); +COMMIT; diff --git a/sql/main/partition_table_triggers.sql b/sql/main/partition_table_triggers.sql deleted file mode 100644 index aaabd2a5a..000000000 --- a/sql/main/partition_table_triggers.sql +++ /dev/null @@ -1,38 +0,0 @@ -CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition_table() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - namespace_node_row namespace_node; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on partition_table table' - USING ERRCODE = 'IO101'; - END IF; - - SELECT * - INTO STRICT namespace_node_row - FROM namespace_node n - WHERE n.namespace_name = NEW.namespace_name AND - n.database_name = current_database(); - - EXECUTE format( - $$ - CREATE TABLE IF NOT EXISTS %1$I.%2$I ( - CONSTRAINT partition CHECK(get_partition_for_key(%4$I::text, %5$L) = %6$L) - ) INHERITS(%1$I.%3$I) - $$, - get_schema_name(NEW.namespace_name), NEW.table_name, namespace_node_row.master_table_name, - NEW.partitioning_field, - NEW.total_partitions, NEW.partition_number - ); - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_on_create_partition_table -ON partition_table; -CREATE TRIGGER trigger_on_create_partition_table AFTER INSERT OR UPDATE OR DELETE ON partition_table -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition_table(); -COMMIT; diff --git a/sql/main/partitioning.sql b/sql/main/partitioning.sql index efc3b6542..f8bd050ea 100644 --- a/sql/main/partitioning.sql +++ b/sql/main/partitioning.sql @@ -1,6 +1,6 @@ -CREATE OR REPLACE FUNCTION get_partition_for_key(key TEXT, num_nodes SMALLINT) +CREATE OR REPLACE FUNCTION get_partition_for_key(key TEXT, mod_factor INT) RETURNS SMALLINT LANGUAGE SQL IMMUTABLE STRICT AS $$ -SELECT ((public.hash_string(key, 'murmur3' :: TEXT, 1 :: INT4) & x'7fffffff' :: INTEGER) % num_nodes)::SMALLINT; +SELECT ((public.hash_string(key, 'murmur3' :: TEXT, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor)::SMALLINT; $$; diff --git a/sql/main/schema_info.sql b/sql/main/schema_info.sql index da56e1f59..17f7fd8cb 100644 --- a/sql/main/schema_info.sql +++ b/sql/main/schema_info.sql @@ -1,64 +1,44 @@ -CREATE OR REPLACE FUNCTION get_partitioning_field_name( - namespace_name NAME -) - RETURNS NAME LANGUAGE SQL STABLE AS -$BODY$ -SELECT name -FROM field f -WHERE f.namespace_name = get_partitioning_field_name.namespace_name AND f.is_partitioning; -$BODY$; - -CREATE OR REPLACE FUNCTION get_schema_name( - namespace_name NAME -) - RETURNS NAME LANGUAGE SQL STABLE AS -$BODY$ -SELECT schema_name -FROM namespace AS n -WHERE n.name = namespace_name; -$BODY$; - - CREATE OR REPLACE FUNCTION get_distinct_local_table_oid( - namespace_name NAME + hypertable_name NAME, + replica_id SMALLINT ) RETURNS REGCLASS LANGUAGE SQL STABLE AS $BODY$ -SELECT format('%I.%I', n.schema_name, nn.distinct_local_table_name) :: REGCLASS -FROM namespace AS n -INNER JOIN namespace_node AS nn ON (n.name = nn.namespace_name) -WHERE n.name = get_distinct_local_table_oid.namespace_name AND - nn.database_name = current_database(); +SELECT format('%I.%I', drn.schema_name, drn.table_name) :: REGCLASS +FROM distinct_replica_node AS drn +WHERE drn.hypertable_name = get_distinct_local_table_oid.hypertable_name AND + drn.replica_id = get_distinct_local_table_oid.replica_id AND + drn.database_name = current_database(); $BODY$; CREATE OR REPLACE FUNCTION get_field_names( - namespace_name NAME + hypertable_name NAME ) RETURNS NAME [] LANGUAGE SQL STABLE AS $BODY$ SELECT ARRAY( SELECT name FROM field f - WHERE f.namespace_name = get_field_names.namespace_name + WHERE f.hypertable_name = get_field_names.hypertable_name ORDER BY name ); $BODY$; CREATE OR REPLACE FUNCTION get_quoted_field_names( - namespace_name NAME + hypertable_name NAME ) RETURNS TEXT [] LANGUAGE SQL STABLE AS $BODY$ SELECT ARRAY( SELECT format('%I', name) FROM field f - WHERE f.namespace_name = get_quoted_field_names.namespace_name + WHERE f.hypertable_name = get_quoted_field_names.hypertable_name ORDER BY name ); $BODY$; CREATE OR REPLACE FUNCTION get_field_names_and_types( - namespace_name NAME, + hypertable_name NAME, field_names NAME [] ) RETURNS TABLE(field NAME, data_type REGTYPE) LANGUAGE PLPGSQL STABLE AS @@ -72,7 +52,7 @@ BEGIN FROM field f INNER JOIN unnest(field_names) WITH ORDINALITY AS x(field_name, ordering) ON f.name = x.field_name - WHERE f.namespace_name = get_field_names_and_types.namespace_name + WHERE f.hypertable_name = get_field_names_and_types.hypertable_name ORDER BY x.ordering; GET DIAGNOSTICS rows_returned = ROW_COUNT; IF rows_returned != cardinality(field_names) THEN @@ -85,10 +65,10 @@ BEGIN WHERE NOT EXISTS( SELECT 1 FROM field f - WHERE f.namespace_name = get_field_names_and_types.namespace_name AND + WHERE f.hypertable_name = get_field_names_and_types.hypertable_name AND f.name = field_name ); - RAISE 'Missing field "%" in namespace "%"', missing_field, namespace_name + RAISE 'Missing field "%" in namespace "%"', missing_field, hypertable_name USING ERRCODE = 'IO002'; END; END IF; @@ -96,7 +76,7 @@ END $BODY$; CREATE OR REPLACE FUNCTION get_field_type( - namespace_name NAME, + hypertable_name NAME, field_name NAME ) RETURNS REGTYPE LANGUAGE PLPGSQL STABLE AS @@ -107,22 +87,14 @@ BEGIN SELECT f.data_type INTO data_type FROM field f - WHERE f.name = get_field_type.field_name AND f.namespace_name = get_field_type.namespace_name; + WHERE f.name = get_field_type.field_name AND f.hypertable_name = get_field_type.hypertable_name; IF NOT FOUND THEN - RAISE 'Missing field "%" in namespace "%"', field_name, namespace_name + RAISE 'Missing field "%" in namespace "%"', field_name, hypertable_name USING ERRCODE = 'IO002'; END IF; RETURN data_type; END $BODY$; -CREATE OR REPLACE FUNCTION get_cluster_table( - namespace_name NAME -) - RETURNS REGCLASS LANGUAGE SQL STABLE AS -$BODY$ -SELECT format('%I.%I', schema_name, cluster_table_name) :: REGCLASS -FROM namespace AS n -WHERE n.name = namespace_name -$BODY$; \ No newline at end of file + diff --git a/sql/main/table_creation.sql b/sql/main/table_creation.sql new file mode 100644 index 000000000..657a4654d --- /dev/null +++ b/sql/main/table_creation.sql @@ -0,0 +1,239 @@ +CREATE OR REPLACE FUNCTION _sysinternal.create_schema( + schema_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE SCHEMA IF NOT EXISTS %I + $$, schema_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_root_table( + schema_name NAME, + table_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %I.%I ( + time BIGINT NOT NULL + ) + $$, schema_name, table_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_root_distinct_table( + schema_name NAME, + table_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %1$I.%2$I ( + field TEXT, + value TEXT, + last_time_approx BIGINT, + PRIMARY KEY(field, value) + ); + $$, schema_name, table_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_local_distinct_table( + schema_name NAME, + table_name NAME, + replica_schema_name NAME, + replica_table_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %1$I.%2$I (PRIMARY KEY(field, value)) + INHERITS(%3$I.%4$I); + + $$, schema_name, table_name, replica_schema_name, replica_table_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_remote_table( + schema_name NAME, + table_name NAME, + parent_schema_name NAME, + parent_table_name NAME, + database_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + node_row node; +BEGIN + SELECT * INTO STRICT node_row + FROM node n + WHERE n.database_name = create_remote_table.database_name; + + EXECUTE format( + $$ + CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I) SERVER %5$I + OPTIONS (schema_name %1$L, table_name %2$L) + $$, + schema_name, table_name, parent_schema_name, parent_table_name, node_row.server_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_local_data_table( + schema_name NAME, + table_name NAME, + parent_schema_name NAME, + parent_table_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I); + $$, + schema_name, table_name, parent_schema_name, parent_table_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_replica_table( + schema_name NAME, + table_name NAME, + parent_schema_name NAME, + parent_table_name NAME +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +BEGIN + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I) + $$, + schema_name, table_name, parent_schema_name, parent_table_name); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.create_data_partition_table( + schema_name NAME, + table_name NAME, + parent_schema_name NAME, + parent_table_name NAME, + keyspace_start SMALLINT, + keyspace_end SMALLINT, + epoch_id INT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + epoch_row partition_epoch; + field_exists BOOLEAN; +BEGIN + SELECT * INTO STRICT epoch_row + FROM partition_epoch pe + WHERE pe.id = epoch_id; + + EXECUTE format( + $$ + CREATE TABLE IF NOT EXISTS %1$I.%2$I ( + ) INHERITS(%3$I.%4$I) + $$, + schema_name, table_name, parent_schema_name, parent_table_name); + + SELECT COUNT(*) > 0 + INTO field_exists + FROM field f + WHERE f.hypertable_name = epoch_row.hypertable_name + AND f.name = epoch_row.partitioning_field; + + IF field_exists THEN + PERFORM _sysinternal.add_partition_constraint(schema_name, table_name, keyspace_start, keyspace_end, epoch_id); + END IF; +END +$BODY$; + +CREATE SEQUENCE IF NOT EXISTS pidx_index_name_seq; + +CREATE OR REPLACE FUNCTION _sysinternal.add_partition_constraint( + schema_name NAME, + table_name NAME, + keyspace_start SMALLINT, + keyspace_end SMALLINT, + epoch_id INT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + epoch_row partition_epoch; +BEGIN + SELECT * INTO STRICT epoch_row + FROM partition_epoch pe + WHERE pe.id = epoch_id; + + EXECUTE format( + $$ + ALTER TABLE %1$I.%2$I + ADD CONSTRAINT partition CHECK(%3$s(%4$I::text, %5$L) BETWEEN %6$L AND %7$L) + $$, + schema_name, table_name, + epoch_row.partitioning_func, epoch_row.partitioning_field, + epoch_row.partitioning_mod, keyspace_start, keyspace_end); + + EXECUTE FORMAT( + $$ + CREATE INDEX %3$I ON %1$I.%2$I ("time" DESC NULLS LAST, %4$I) + $$, + schema_name, table_name, + format('%s_pidx', nextval('pidx_index_name_seq')), + epoch_row.partitioning_field); +END +$BODY$; + +CREATE OR REPLACE FUNCTION _sysinternal.set_time_constraint( + schema_name NAME, + table_name NAME, + start_time BIGINT, + end_time BIGINT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE +BEGIN + EXECUTE FORMAT( + $$ + ALTER TABLE %I.%I DROP CONSTRAINT IF EXISTS time_range + $$, + schema_name, table_name); + + IF start_time IS NOT NULL AND end_time IS NOT NULL THEN + EXECUTE FORMAT( + $$ + ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time >= %L AND time <= %L) + $$, + schema_name, table_name, start_time, end_time); + ELSIF start_time IS NOT NULL THEN + EXECUTE FORMAT( + $$ + ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time >= %L) + $$, + schema_name, table_name, start_time); + ELSIF end_time IS NOT NULL THEN + EXECUTE FORMAT( + $$ + ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time <= %L) + $$, + schema_name, table_name, end_time); + END IF; +END +$BODY$; + + diff --git a/sql/main/tables.sql b/sql/main/tables.sql index e184d29fd..0c50a36f7 100644 --- a/sql/main/tables.sql +++ b/sql/main/tables.sql @@ -1,39 +1,13 @@ -CREATE TABLE IF NOT EXISTS partition_table ( - namespace_name NAME REFERENCES namespace (name) NOT NULL, - partition_number SMALLINT NOT NULL CHECK (partition_number >= 0), - total_partitions SMALLINT NOT NULL CHECK (total_partitions > 0), - partitioning_field NAME NOT NULL, - table_name NAME NOT NULL, - PRIMARY KEY (namespace_name, partition_number, total_partitions), --do not allow multiple partitioning fields, for now - UNIQUE (namespace_name, partition_number, total_partitions, partitioning_field), - UNIQUE (namespace_name, table_name), - CHECK (partition_number < total_partitions) -); +CREATE SEQUENCE IF NOT EXISTS chunk_replica_node_index_name_prefix; -CREATE TABLE IF NOT EXISTS data_table ( - table_oid REGCLASS PRIMARY KEY NOT NULL, - namespace_name NAME REFERENCES namespace (name) NOT NULL, - partition_number SMALLINT NOT NULL CHECK (partition_number >= 0), - total_partitions SMALLINT NOT NULL CHECK (total_partitions > 0), - partitioning_field NAME NOT NULL, - start_time BIGINT, - end_time BIGINT, - FOREIGN KEY (namespace_name, partition_number, total_partitions, partitioning_field) - REFERENCES partition_table (namespace_name, partition_number, total_partitions, partitioning_field), - UNIQUE (namespace_name, partition_number, total_partitions, start_time, end_time), - CHECK (start_time IS NOT NULL OR end_time IS NOT NULL), - CHECK (partition_number < total_partitions) -); ---TODO: any constrants for when total_partitions change? - -CREATE SEQUENCE IF NOT EXISTS data_table_index_name_prefix; - -CREATE TABLE IF NOT EXISTS data_table_index ( - table_oid REGCLASS NOT NULL REFERENCES data_table (table_oid) ON DELETE CASCADE, - field_name NAME NOT NULL, - index_name NAME NOT NULL, --not regclass since regclass create problems with database backup/restore (indexes created after data load) - index_type field_index_type NOT NULL, - PRIMARY KEY (table_oid, field_name, index_name) +CREATE TABLE IF NOT EXISTS chunk_replica_node_index ( + schema_name NAME NOT NULL, + table_name NAME NOT NULL, + field_name NAME NOT NULL, + index_name NAME NOT NULL, --not regclass since regclass create problems with database backup/restore (indexes created after data load) + index_type field_index_type NOT NULL, + PRIMARY KEY (schema_name, table_name, field_name, index_name), + FOREIGN KEY (schema_name, table_name) REFERENCES chunk_replica_node(schema_name, table_name) ); diff --git a/sql/meta/chunk.sql b/sql/meta/chunk.sql new file mode 100644 index 000000000..99202d3f6 --- /dev/null +++ b/sql/meta/chunk.sql @@ -0,0 +1,199 @@ +--calculate new times for a new chunks for appropriate time values +--tables are created open-ended in one or two direction (either start_time or end_time is NULL) +--that way, tables grow in some time dir. +--Tables are always created adjacent to existing tables. So, tables +--will never be disjoint in terms of time. Therefore you will have: +-- <---current open ended start_time table --| existing closed tables | -- current open ended end_time table ---> +--Should not be called directly. Requires a lock on partition (prevents simultaneous inserts) +CREATE OR REPLACE FUNCTION _sysinternal.calculate_new_chunk_times( + partition_id INT, + "time" BIGINT, + OUT table_start BIGINT, + OUT table_end BIGINT +) +LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + partition_epoch_row partition_epoch; + chunk_row chunk; +BEGIN + SELECT pe.* INTO partition_epoch_row + FROM partition p + INNER JOIN partition_epoch pe ON (p.epoch_id = pe.id) + WHERE p.id = partition_id FOR SHARE; + + table_start := partition_epoch_row.start_time; + table_end := partition_epoch_row.end_time; + + SELECT * + INTO chunk_row + FROM chunk AS c + WHERE c.end_time < calculate_new_chunk_times."time" AND + c.partition_id = calculate_new_chunk_times.partition_id + ORDER BY c.end_time DESC + LIMIT 1 + FOR SHARE; + + IF FOUND THEN + --there is a table that ends before this point; + table_start := chunk_row.end_time + 1; + ELSE + SELECT * + INTO chunk_row + FROM chunk AS c + WHERE c.start_time > calculate_new_chunk_times."time" AND + c.partition_id = calculate_new_chunk_times.partition_id + ORDER BY c.start_time DESC + LIMIT 1 + FOR SHARE; + IF FOUND THEN + --there is a table that ends before this point + table_end = chunk_row.start_time - 1; + END IF; + END IF; +END +$BODY$; + + +CREATE OR REPLACE FUNCTION _sysinternal.get_chunk( + partition_id INT, + time_point BIGINT +) + RETURNS chunk LANGUAGE SQL VOLATILE AS +$BODY$ + SELECT * + FROM chunk c + WHERE c.partition_id = get_chunk.partition_id AND + (c.start_time <= time_point OR c.start_time IS NULL) AND + (c.end_time >= time_point OR c.end_time IS NULL) + FOR SHARE; +$BODY$; + + +--creates chunk. Must be called after aquiring a lock on partition. +CREATE OR REPLACE FUNCTION _sysinternal.create_chunk( + partition_id INT, + time_point BIGINT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + table_start BIGINT; + table_end BIGINT; +BEGIN + SELECT * + INTO table_start, table_end + FROM _sysinternal.calculate_new_chunk_times(partition_id, time_point); + + INSERT INTO chunk(partition_id, start_time, end_time) + VALUES(partition_id, table_start, table_end); +END +$BODY$; + + +--gets or creates chunk. If creating chunk takes a lock on the corresponding partition. +--This prevents concurrently creating chunks on same partitions. +CREATE OR REPLACE FUNCTION get_or_create_chunk( + partition_id INT, + time_point BIGINT +) + RETURNS chunk LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + chunk_row chunk; + partition_row partition; +BEGIN + chunk_row := _sysinternal.get_chunk(partition_id, time_point); + + --uses double-checked locking + IF chunk_row IS NULL THEN + --get lock + SELECT * INTO partition_row + FROM partition + WHERE id = partition_id FOR UPDATE; + --recheck: + chunk_row := _sysinternal.get_chunk(partition_id, time_point); + + IF chunk_row IS NULL THEN + PERFORM _sysinternal.create_chunk(partition_id, time_point); + END IF; + + chunk_row := _sysinternal.get_chunk(partition_id, time_point); + + IF chunk_row IS NULL THEN --recheck + RAISE EXCEPTION 'Should never happen' + USING ERRCODE = 'IO501'; + END IF; + END IF; + + RETURN chunk_row; +END +$BODY$; + +CREATE OR REPLACE FUNCTION close_chunk_end( + chunk_id INT +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + crn_node_row RECORD; + node_row node; + max_time_replica BIGINT; + max_time BIGINT = 0; + table_end BIGINT; + chunk_row chunk; +BEGIN + SELECT * + INTO chunk_row + FROM chunk c + WHERE c.id = chunk_id + FOR UPDATE; + + --PHASE 1: lock chunk row on all rows (prevents concurrent chunk insert) + FOR node_row IN + SELECT * + FROM node n + LOOP + PERFORM dblink_connect(node_row.server_name, node_row.server_name); + PERFORM dblink_exec(node_row.server_name, 'BEGIN'); + PERFORM 1 FROM + dblink(node_row.server_name, format('SELECT * FROM lock_for_chunk_close(%L)', chunk_id)) AS t(x text); + END LOOP; + + --PHASE 2: get max time for chunk + FOR crn_node_row IN + SELECT crn.*, n.* + FROM chunk_replica_node crn + INNER JOIN node n ON (n.database_name = crn.database_name) + WHERE crn.chunk_id = close_chunk_end.chunk_id + LOOP + SELECT t.max_time + INTO max_time_replica + FROM dblink(crn_node_row.server_name, format('SELECT * FROM max_time_for_chunk_close(%L, %L)', crn_node_row.schema_name, crn_node_row.table_name)) AS t(max_time BIGINT); + + IF max_time = 0 THEN + max_time := max_time_replica; + ELSIF max_time <> max_time_replica THEN + RAISE EXCEPTION 'Should never happen: max_time % not equal max_time_replica %', max_time, max_time_replica + USING ERRCODE = 'IO501'; + END IF; + END LOOP; + + --TODO: is this right? + table_end := ((coalesce(max_time, chunk_row.start_time, 0) :: BIGINT / (1e9 * 60 * 60 * 24) + 1) :: BIGINT) * (1e9 * 60 * 60 * 24) :: BIGINT - 1; + + --set time locally + UPDATE chunk SET end_time = table_end WHERE id = chunk_id; + + --PHASE 3: set max_time remotely + FOR node_row IN + SELECT * + FROM node n + LOOP + PERFORM 1 + FROM dblink(node_row.server_name, format('SELECT * FROM set_end_time_for_chunk_close(%L, %L)', chunk_id, table_end)) AS t(x text); + PERFORM dblink_exec(node_row.server_name, 'COMMIT'); + PERFORM dblink_disconnect(node_row.server_name); + END LOOP; +END +$BODY$; diff --git a/sql/meta/chunk_replica_node_triggers.sql b/sql/meta/chunk_replica_node_triggers.sql new file mode 100644 index 000000000..92f585c45 --- /dev/null +++ b/sql/meta/chunk_replica_node_triggers.sql @@ -0,0 +1,31 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node_meta() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + INSERT INTO distinct_replica_node(hypertable_name, replica_id, database_name, schema_name, table_name) + SELECT pr.hypertable_name, pr.replica_id, NEW.database_name, + h.associated_schema_name, + format('%s_%s_%s_distinct_data', h.associated_table_prefix, pr.replica_id, n.id) + FROM partition_replica pr + INNER JOIN hypertable h ON (h.name = pr.hypertable_name) + INNER JOIN node n ON (n.database_name = NEW.database_name) + WHERE pr.id = NEW.partition_replica_id + ON CONFLICT DO NOTHING; + + RETURN NEW; +END +$BODY$ +SET SEARCH_PATH = 'public'; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node +ON chunk_replica_node; +CREATE TRIGGER trigger_on_create_chunk_replica_node AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node_meta(); +COMMIT; diff --git a/sql/meta/chunk_triggers.sql b/sql/meta/chunk_triggers.sql new file mode 100644 index 000000000..776878013 --- /dev/null +++ b/sql/meta/chunk_triggers.sql @@ -0,0 +1,68 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + field_row field; + schema_name NAME; +BEGIN + IF TG_OP = 'UPDATE' THEN + IF ( + (OLD.start_time IS NULL AND new.start_time IS NOT NULL) + OR + (OLD.end_time IS NULL AND new.end_time IS NOT NULL) + ) + AND ( + OLD.id = NEW.id AND + OLD.partition_id = NEW.partition_id + ) THEN + NULL; + ELSE + RAISE EXCEPTION 'This type of update not allowed on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + ELSIF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts and updates supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + --sync data on insert + IF TG_OP = 'INSERT' THEN + FOR schema_name IN + SELECT n.schema_name + FROM node AS n + LOOP + EXECUTE format( + $$ + INSERT INTO %I.%I SELECT $1.* + $$, + schema_name, + TG_TABLE_NAME + ) + USING NEW; + END LOOP; + + --do not sync data on update. synced by close_chunk logic. + + --TODO: random node picking broken (should make sure replicas are on different nodes). also stickiness. + INSERT INTO chunk_replica_node(chunk_id,partition_replica_id, database_name, schema_name, table_name) + SELECT NEW.id, + pr.id, + (SELECT database_name FROM node ORDER BY random() LIMIT 1), + pr.schema_name, + format('%s_%s_%s_%s_data', h.associated_table_prefix, pr.id, pr.replica_id, NEW.id) + FROM partition_replica pr + INNER JOIN hypertable h ON (h.name = pr.hypertable_name) + WHERE pr.partition_id = NEW.partition_id; + END IF; + + RETURN NEW; +END +$BODY$; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_on_create_chunk +ON chunk; +CREATE TRIGGER trigger_on_create_chunk AFTER INSERT OR UPDATE OR DELETE ON chunk +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk(); +COMMIT; + diff --git a/sql/meta/cluster.sql b/sql/meta/cluster.sql index 4e0235295..a0184a120 100644 --- a/sql/meta/cluster.sql +++ b/sql/meta/cluster.sql @@ -29,32 +29,92 @@ END $BODY$; -CREATE OR REPLACE FUNCTION add_namespace( - namespace_name NAME +CREATE SEQUENCE IF NOT EXISTS default_hypertable_seq; + +CREATE OR REPLACE FUNCTION add_hypertable( + hypertable_name NAME, + partitioning_field NAME, + main_schema_name NAME = 'public', + associated_schema_name NAME = NULL, + associated_table_prefix NAME = NULL, + number_partitions SMALLINT = NULL, + replication_factor SMALLINT = 1 +) + RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS +$BODY$ +DECLARE + id INTEGER; +BEGIN + + id := nextval('default_hypertable_seq'); + + IF associated_schema_name IS NULL THEN + associated_schema_name = format('_sys_%s_%s', id, hypertable_name); + END IF; + + IF associated_table_prefix IS NULL THEN + associated_table_prefix = format('_hyper_%s', id); + END IF; + + IF number_partitions IS NULL THEN + SELECT COUNT(*) INTO number_partitions + FROM node; + END IF; + + INSERT INTO hypertable ( + name, + main_schema_name, main_table_name, + associated_schema_name, associated_table_prefix, + root_schema_name, root_table_name, + distinct_schema_name, distinct_table_name, + replication_factor) + VALUES ( + hypertable_name, + main_schema_name, hypertable_name, + associated_schema_name, associated_table_prefix, + associated_schema_name, format('%s_root',associated_table_prefix), + associated_schema_name, format('%s_distinct',associated_table_prefix), + replication_factor) + ON CONFLICT DO NOTHING; + + IF number_partitions != 0 THEN + PERFORM add_equi_partition_epoch(hypertable_name, number_partitions, partitioning_field); + END IF; +END +$BODY$; + +CREATE OR REPLACE FUNCTION add_partition_epoch( + hypertable_name NAME, + keyspace_start SMALLINT[], + partitioning_field NAME ) RETURNS VOID LANGUAGE SQL VOLATILE AS $BODY$ -INSERT INTO namespace (name, schema_name, cluster_table_name, cluster_distinct_table_name) -VALUES (namespace_name, get_schema_name(namespace_name), get_cluster_table_name(namespace_name), - get_cluster_distinct_table_name(namespace_name)) -ON CONFLICT DO NOTHING; - -INSERT INTO namespace_node (namespace_name, database_name, master_table_name, remote_table_name, - distinct_local_table_name, distinct_remote_table_name) - SELECT - namespace_name, - n.database_name, - get_master_table_name(namespace_name), - get_remote_table_name(namespace_name, n), - get_local_distinct_table_name(namespace_name), - get_remote_distinct_table_name(namespace_name, n) - FROM node AS n -ON CONFLICT DO NOTHING; +WITH epoch AS ( + INSERT INTO partition_epoch (hypertable_name, start_time, end_time, partitioning_func, partitioning_mod, partitioning_field) + VALUES (hypertable_name, NULL, NULL, 'get_partition_for_key', 32768, partitioning_field) RETURNING id +) +INSERT INTO partition(epoch_id, keyspace_start, keyspace_end) + SELECT epoch.id, lag(start,1,0) OVER (), start-1 + FROM unnest(keyspace_start::int[] || (32768)::INT) start, epoch $BODY$; +CREATE OR REPLACE FUNCTION add_equi_partition_epoch( +hypertable_name NAME, +number_partitions SMALLINT, +partitioning_field NAME +) +RETURNS VOID LANGUAGE SQL VOLATILE AS +$BODY$ +SELECT add_partition_epoch( + hypertable_name, + (SELECT ARRAY(SELECT start * 32768/(number_partitions) from generate_series(1, number_partitions-1) as start)::SMALLINT[]), + partitioning_field +) +$BODY$; CREATE OR REPLACE FUNCTION add_field( - namespace_name NAME, + hypertable_name NAME, field_name NAME, data_type REGTYPE, is_partitioning BOOLEAN, @@ -63,8 +123,8 @@ CREATE OR REPLACE FUNCTION add_field( ) RETURNS VOID LANGUAGE SQL VOLATILE AS $BODY$ -INSERT INTO field (namespace_name, name, data_type, is_partitioning, is_distinct, index_types) -VALUES (namespace_name, field_name, data_type, is_partitioning, is_distinct, idx_types) +INSERT INTO field (hypertable_name, name, data_type, is_partitioning, is_distinct, index_types) +VALUES (hypertable_name, field_name, data_type, is_partitioning, is_distinct, idx_types) ON CONFLICT DO NOTHING; $BODY$; diff --git a/sql/meta/field_triggers.sql b/sql/meta/field_triggers.sql deleted file mode 100644 index 527885add..000000000 --- a/sql/meta/field_triggers.sql +++ /dev/null @@ -1,33 +0,0 @@ -CREATE OR REPLACE FUNCTION _sysinternal.sync_field() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - schema_name NAME; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on field table' - USING ERRCODE = 'IO101'; - END IF; - - FOR schema_name IN - SELECT n.schema_name - FROM node AS n - LOOP - EXECUTE format( - $$ - INSERT INTO %I.field SELECT $1.* - $$, - schema_name - ) - USING NEW; - END LOOP; - RETURN NEW; -END -$BODY$; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_sync_field -ON field; -CREATE TRIGGER trigger_sync_field AFTER INSERT OR UPDATE OR DELETE ON field -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_field(); -COMMIT; diff --git a/sql/meta/hypertable_triggers.sql b/sql/meta/hypertable_triggers.sql new file mode 100644 index 000000000..0ae1e1068 --- /dev/null +++ b/sql/meta/hypertable_triggers.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on hypertable name ' + USING ERRCODE = 'IO101'; + END IF; + + INSERT INTO hypertable_replica + SELECT NEW.name, replica_id, + NEW.associated_schema_name, format('%s_%s', NEW.associated_table_prefix, replica_id), + NEW.associated_schema_name, format('%s_%s_distinct', NEW.associated_table_prefix, replica_id) + FROM generate_series(0, NEW.replication_factor-1) AS replica_id + ON CONFLICT DO NOTHING; + + RETURN NEW; +END +$BODY$; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_create_hypertable ON hypertable; +CREATE TRIGGER trigger_create_hypertable AFTER INSERT OR UPDATE OR DELETE ON hypertable +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable(); +COMMIT; + + diff --git a/sql/meta/namespace_triggers.sql b/sql/meta/namespace_triggers.sql deleted file mode 100644 index 33f4d5cfe..000000000 --- a/sql/meta/namespace_triggers.sql +++ /dev/null @@ -1,67 +0,0 @@ -CREATE OR REPLACE FUNCTION _sysinternal.sync_namespace() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - schema_name NAME; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace table' - USING ERRCODE = 'IO101'; - END IF; - - FOR schema_name IN - SELECT n.schema_name - FROM node AS n - LOOP - EXECUTE format( - $$ - INSERT INTO %I.namespace SELECT $1.* - $$, - schema_name - ) - USING NEW; - END LOOP; - RETURN NEW; -END -$BODY$; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_sync_namespace -ON namespace; -CREATE TRIGGER trigger_sync_namespace AFTER INSERT OR UPDATE OR DELETE ON namespace -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_namespace(); -COMMIT; - -CREATE OR REPLACE FUNCTION _sysinternal.sync_namespace_node() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - schema_name NAME; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on namespace_node table' - USING ERRCODE = 'IO101'; - END IF; - - FOR schema_name IN - SELECT n.schema_name - FROM node AS n - LOOP - EXECUTE format( - $$ - INSERT INTO %I.namespace_node SELECT $1.* - $$, - schema_name - ) - USING NEW; - END LOOP; - RETURN NEW; -END -$BODY$; - -BEGIN; -DROP TRIGGER IF EXISTS trigger_sync_namespace_node -ON namespace_node; -CREATE TRIGGER trigger_sync_namespace_node AFTER INSERT OR UPDATE OR DELETE ON namespace_node -FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_namespace_node(); -COMMIT; diff --git a/sql/meta/partition_triggers.sql b/sql/meta/partition_triggers.sql new file mode 100644 index 000000000..afe613b00 --- /dev/null +++ b/sql/meta/partition_triggers.sql @@ -0,0 +1,35 @@ +CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % name ', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + INSERT INTO partition_replica(partition_id, hypertable_name, replica_id, schema_name, table_name) + SELECT + NEW.id, + hypertable_name, + replica_id, + h.associated_schema_name, + format('%s_%s_%s_partition', h.associated_table_prefix, NEW.id, replica_id) + FROM hypertable_replica hr + INNER JOIN hypertable h ON (h.name = hr.hypertable_name) + WHERE hypertable_name = ( + SELECT hypertable_name + FROM partition_epoch + WHERE id = NEW.epoch_id + ); + + RETURN NEW; +END +$BODY$; + +BEGIN; +DROP TRIGGER IF EXISTS trigger_create_partition ON partition; +CREATE TRIGGER trigger_create_partition AFTER INSERT OR UPDATE OR DELETE ON partition +FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition(); +COMMIT; + + diff --git a/sql/meta/sync_triggers.sql b/sql/meta/sync_triggers.sql new file mode 100644 index 000000000..98b6669e4 --- /dev/null +++ b/sql/meta/sync_triggers.sql @@ -0,0 +1,51 @@ +CREATE OR REPLACE FUNCTION _sysinternal.sync_only_insert() + RETURNS TRIGGER LANGUAGE PLPGSQL AS +$BODY$ +DECLARE + schema_name NAME; +BEGIN + IF TG_OP <> 'INSERT' THEN + RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME + USING ERRCODE = 'IO101'; + END IF; + + FOR schema_name IN + SELECT n.schema_name + FROM node AS n + LOOP + EXECUTE format( + $$ + INSERT INTO %I.%I SELECT $1.* + $$, + schema_name, + TG_TABLE_NAME + ) + USING NEW; + END LOOP; + RETURN NEW; +END +$BODY$; + +DO +$BODY$ +DECLARE + table_name NAME; +BEGIN + FOREACH table_name IN ARRAY ARRAY['cluster_user', 'hypertable', 'hypertable_replica', + 'distinct_replica_node', 'partition_epoch', 'partition','partition_replica', + 'chunk_replica_node', 'field']::NAME[] LOOP + EXECUTE format( + $$ + DROP TRIGGER IF EXISTS trigger_0_sync_%1$s ON %1$s + $$, + table_name); + EXECUTE format( + $$ + CREATE TRIGGER trigger_0_sync_%1$s AFTER INSERT OR UPDATE OR DELETE ON %1$s + FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_only_insert(); + $$, + table_name); + END LOOP; +END +$BODY$; + diff --git a/sql/setup/sql/load_main.sql b/sql/setup/sql/load_main.sql index da8b39db6..7011186df 100644 --- a/sql/setup/sql/load_main.sql +++ b/sql/setup/sql/load_main.sql @@ -1,23 +1,30 @@ +\ir ../../main/table_creation.sql \ir ../../main/cluster_user_triggers.sql \ir ../../main/node_triggers.sql -\ir ../../main/namespace_triggers.sql +\ir ../../main/hypertable_triggers.sql +\ir ../../main/hypertable_replica_triggers.sql +\ir ../../main/distinct_replica_node_triggers.sql \ir ../../main/field_triggers.sql \ir ../../main/partitioning.sql \ir ../../main/schema_info.sql \ir ../../main/names.sql \ir ../../main/tables.sql -\ir ../../main/data_table_triggers.sql -\ir ../../main/partition_table_triggers.sql -\ir ../../main/data_table_constructors.sql -\ir ../../main/insert.sql +\ir ../../main/partition_replica_triggers.sql +\ir ../../main/chunk_replica_node_index.sql +\ir ../../main/chunk_replica_node_index_triggers.sql +\ir ../../main/chunk_replica_node_triggers.sql +\ir ../../main/chunk_triggers.sql +\ir ../../main/chunk.sql +--\ir ../../main/data_table_constructors.sql +--\ir ../../main/insert.sql -\ir ../../main/ioql.sql -\ir ../../main/ioql_result_schema_nonagg.sql -\ir ../../main/ioql_result_schema_agg.sql -\ir ../../main/ioql_schema_info.sql -\ir ../../main/ioql_sql_gen.sql -\ir ../../main/ioql_remote_calls.sql -\ir ../../main/ioql_optimized_agg.sql -\ir ../../main/ioql_distinct_queries.sql -\ir ../../main/ioql_optimized_nonagg.sql -\ir ../../main/ioql_optimized.sql +--\ir ../../main/ioql.sql +--\ir ../../main/ioql_result_schema_nonagg.sql +--\ir ../../main/ioql_result_schema_agg.sql +--\ir ../../main/ioql_schema_info.sql +--\ir ../../main/ioql_sql_gen.sql +--\ir ../../main/ioql_remote_calls.sql +--\ir ../../main/ioql_optimized_agg.sql +--\ir ../../main/ioql_distinct_queries.sql +--\ir ../../main/ioql_optimized_nonagg.sql +--\ir ../../main/ioql_optimized.sql diff --git a/sql/setup/sql/load_meta.sql b/sql/setup/sql/load_meta.sql index 7fc17a6aa..f614d689d 100644 --- a/sql/setup/sql/load_meta.sql +++ b/sql/setup/sql/load_meta.sql @@ -2,5 +2,9 @@ \ir ../../meta/cluster.sql \ir ../../meta/node_triggers.sql \ir ../../meta/cluster_user_triggers.sql -\ir ../../meta/namespace_triggers.sql -\ir ../../meta/field_triggers.sql \ No newline at end of file +\ir ../../meta/chunk_triggers.sql +\ir ../../meta/chunk_replica_node_triggers.sql +\ir ../../meta/chunk.sql +\ir ../../meta/hypertable_triggers.sql +\ir ../../meta/partition_triggers.sql +\ir ../../meta/sync_triggers.sql diff --git a/sql/tests/regression/cluster.sql b/sql/tests/regression/cluster.sql index f9ef80c6d..d4ade300e 100644 --- a/sql/tests/regression/cluster.sql +++ b/sql/tests/regression/cluster.sql @@ -9,58 +9,94 @@ SELECT add_cluster_user('postgres', NULL); SELECT add_node('Test1' :: NAME, 'localhost'); SELECT add_node('test2' :: NAME, 'localhost'); -SELECT add_namespace('testNs' :: NAME); +SELECT add_hypertable('testNs' :: NAME, 'Device_id'); +SELECT * +FROM partition_replica; + + SELECT add_field('testNs' :: NAME, 'Device_id', 'text', TRUE, TRUE, ARRAY['TIME-VALUE'] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'temp', 'double precision', FALSE, FALSE, ARRAY['VALUE-TIME'] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'occupied', 'boolean', FALSE, FALSE, ARRAY[] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'latitude', 'bigint', FALSE, FALSE, ARRAY[] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'really_long_field_goes_on_and_on_and_on_and_on_and_on_and_on_and_on_and_on', 'bigint', FALSE, FALSE, ARRAY['TIME-VALUE','VALUE-TIME'] :: field_index_type []); +SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint); + SELECT * FROM node; SELECT * -FROM namespace; +FROM hypertable; SELECT * -FROM namespace_node; +FROM hypertable_replica; +SELECT * +FROM distinct_replica_node; +SELECT * +FROM partition_epoch; +SELECT * +FROM partition; +SELECT * +FROM partition_replica; +SELECT * +FROM chunk; +SELECT * +FROM chunk_replica_node; SELECT * FROM field; +\echo *********************************************************************************************************ß \c Test1 SELECT * FROM node; SELECT * -FROM namespace; +FROM hypertable; SELECT * -FROM namespace_node; +FROM hypertable_replica; +SELECT * +FROM distinct_replica_node; +SELECT * +FROM partition_epoch; +SELECT * +FROM partition; +SELECT * +FROM partition_replica; +SELECT * +FROM chunk; +SELECT * +FROM chunk_replica_node; SELECT * FROM field; -\dt "testNs".* -\det "testNs".* -\d+ "testNs".distinct -\d+ "testNs".cluster - +\d+ "_sys_1_testNs".* +--\d+ "_sys_1_testNs"."_sys_1_testNs_1_0_partition" +--\d+ "_sys_1_testNs"."_sys_1_testNs_2_0_partition" +--\det "_sys_1_testNs".* +--\d+ "testNs".distinct --test idempotence \c meta -SELECT add_namespace('testNs' :: NAME); + +SELECT add_hypertable('testNs' :: NAME, 'Device_id'); + SELECT add_field('testNs' :: NAME, 'Device_id', 'text', TRUE, TRUE, ARRAY['TIME-VALUE'] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'temp', 'double precision', FALSE, FALSE, ARRAY['VALUE-TIME'] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'occupied', 'boolean', FALSE, FALSE, ARRAY[] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'latitude', 'bigint', FALSE, FALSE, ARRAY[] :: field_index_type []); SELECT add_field('testNs' :: NAME, 'really_long_field_goes_on_and_on_and_on_and_on_and_on_and_on_and_on_and_on', 'bigint', FALSE, FALSE, ARRAY['TIME-VALUE','VALUE-TIME'] :: field_index_type []); + +SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint); \c Test1 -\d+ "testNs".cluster +\d+ "_sys_1_testNs".* -SELECT get_or_create_data_table((1477075243*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT); -\dt "testNs".* -\det "testNs".* -\d+ "testNs".data_0_10_1477008000 -\d+ "testNs".partition_0_10 +\c meta +SELECT close_chunk_end(1); +SELECT * +FROM chunk; + +SELECT * FROM get_or_create_chunk(1,10::bigint); +SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint); +SELECT * +FROM chunk; + +\c Test1 +\d+ "_sys_1_testNs".* -SELECT close_data_table_end('"testNs".data_0_10_1477008000'); -\d+ "testNs".data_0_10_1477008000 -SELECT get_or_create_data_table((1477075243*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT); -\dt "testNs".* -SELECT get_or_create_data_table(((1477075243+(60*60*25))*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT); -\dt "testNs".*