starting refactor of clustering and naming logic

This fix allows more flexible placement of tables on a node; better
and more flexible logic for remote placement of chunks on nodes.
This commit is contained in:
Matvey Arye 2016-11-15 15:11:31 -05:00
parent e0c6ee8943
commit 42ee7c8586
31 changed files with 1347 additions and 671 deletions

View File

@ -2,7 +2,9 @@ CREATE TABLE IF NOT EXISTS node (
database_name NAME PRIMARY KEY NOT NULL,
schema_name NAME UNIQUE NOT NULL, --public schema of remote
server_name NAME UNIQUE NOT NULL,
hostname TEXT NOT NULL
hostname TEXT NOT NULL,
active BOOLEAN NOT NULL DEFAULT TRUE,
id SERIAL UNIQUE -- id for node. used in naming
);
CREATE TABLE IF NOT EXISTS cluster_user (
@ -10,39 +12,121 @@ CREATE TABLE IF NOT EXISTS cluster_user (
password TEXT --not any more of a security hole than usual since stored in pg_user_mapping anyway
);
CREATE TABLE IF NOT EXISTS namespace (
name NAME PRIMARY KEY NOT NULL,
schema_name NAME UNIQUE NOT NULL,
cluster_table_name NAME NOT NULL,
cluster_distinct_table_name NAME NOT NULL
--The hypertable is an abstraction that represents a replicated table that is partition on 2 dimensions.
--One of the dimensions is time, the other is arbitrary.
--This abstraction also tracks the distinct value set of any columns marked as `distinct`.
CREATE TABLE IF NOT EXISTS hypertable (
name NAME NOT NULL PRIMARY KEY CHECK(name NOT LIKE '\_%'),
main_schema_name NAME NOT NULL,
main_table_name NAME NOT NULL,
associated_schema_name NAME NOT NULL,
associated_table_prefix NAME NOT NULL,
root_schema_name NAME NOT NULL,
root_table_name NAME NOT NULL,
distinct_schema_name NAME NOT NULL,
distinct_table_name NAME NOT NULL,
replication_factor SMALLINT NOT NULL CHECK(replication_factor > 0),
UNIQUE (main_schema_name, main_table_name),
UNIQUE (associated_schema_name, associated_table_prefix),
UNIQUE (root_schema_name, root_table_name)
);
CREATE TABLE IF NOT EXISTS namespace_node (
namespace_name NAME REFERENCES namespace (name) NOT NULL,
database_name NAME REFERENCES node (database_name) NOT NULL,
master_table_name NAME NOT NULL,
remote_table_name NAME NOT NULL,
distinct_local_table_name NAME NOT NULL,
distinct_remote_table_name NAME NOT NULL,
PRIMARY KEY (namespace_name, database_name)
CREATE TABLE IF NOT EXISTS hypertable_replica (
hypertable_name NAME NOT NULL REFERENCES hypertable (name),
replica_id SMALLINT NOT NULL CHECK(replica_id >= 0),
schema_name NAME NOT NULL,
table_name NAME NOT NULL,
distinct_schema_name NAME NOT NULL,
distinct_table_name NAME NOT NULL,
PRIMARY KEY (hypertable_name, replica_id),
UNIQUE (schema_name, table_name)
);
CREATE UNIQUE INDEX IF NOT EXISTS unique_remote_table_name_per_namespace
ON namespace_node (namespace_name, remote_table_name);
CREATE UNIQUE INDEX IF NOT EXISTS unique_distinct_remote_table_name_per_namespace
ON namespace_node (namespace_name, distinct_remote_table_name);
--there should be one distinct_replica_node for each node with a chunk from that replica
--so there can be multiple rows for one hypertable-replica on different nodes.
--that way writes are local. Optimized reads are also local for many queries.
--But, some read queries are cross-node.
CREATE TABLE IF NOT EXISTS distinct_replica_node (
hypertable_name NAME NOT NULL,
replica_id SMALLINT NOT NULL,
database_name NAME NOT NULL REFERENCES node(database_name),
schema_name NAME NOT NULL,
table_name NAME NOT NULL,
PRIMARY KEY (hypertable_name, replica_id, database_name),
UNIQUE (schema_name, table_name),
FOREIGN KEY (hypertable_name, replica_id) REFERENCES hypertable_replica(hypertable_name, replica_id)
);
CREATE TABLE IF NOT EXISTS partition_epoch (
id SERIAL PRIMARY KEY,
hypertable_name NAME NOT NULL REFERENCES hypertable (name),
start_time BIGINT CHECK(start_time > 0),
end_time BIGINT CHECK(end_time > 0),
partitioning_func NAME NOT NULL,
partitioning_mod INT NOT NULL CHECK(partitioning_mod < 65536),
partitioning_field NAME NOT NULL,
UNIQUE(hypertable_name, start_time),
UNIQUE(hypertable_name, end_time),
CHECK(start_time < end_time)
);
CREATE TABLE IF NOT EXISTS partition (
id SERIAL PRIMARY KEY,
epoch_id INT NOT NULL REFERENCES partition_epoch(id),
keyspace_start SMALLINT NOT NULL CHECK(keyspace_start >= 0), --start inclusive
keyspace_end SMALLINT NOT NULL CHECK(keyspace_end > 0), --end inclusive; compatible with between operator
UNIQUE(epoch_id, keyspace_start),
CHECK(keyspace_end > keyspace_start)
);
--todo: trigger to verify partition_epoch hypertable name matches this hypertable_name
CREATE TABLE IF NOT EXISTS partition_replica (
id SERIAL PRIMARY KEY,
partition_id INT NOT NULL REFERENCES partition(id),
hypertable_name NAME NOT NULL,
replica_id SMALLINT NOT NULL,
schema_name NAME NOT NULL,
table_name NAME NOT NULL,
UNIQUE(schema_name, table_name),
FOREIGN KEY (hypertable_name, replica_id) REFERENCES hypertable_replica(hypertable_name, replica_id)
);
CREATE TABLE IF NOT EXISTS chunk (
id SERIAL PRIMARY KEY,
partition_id INT NOT NULL REFERENCES partition(id),
start_time BIGINT CHECK(start_time > 0),
end_time BIGINT CHECK(end_time > 0),
UNIQUE(partition_id, start_time),
UNIQUE(partition_id, end_time),
CHECK(start_time < end_time)
);
--mapping between chunks, partition_replica, and node
CREATE TABLE IF NOT EXISTS chunk_replica_node (
chunk_id INT NOT NULL REFERENCES chunk(id),
partition_replica_id INT NOT NULL REFERENCES partition_replica(id),
database_name NAME NOT NULL REFERENCES node(database_name),
schema_name NAME NOT NULL,
table_name NAME NOT NULL,
PRIMARY KEY (chunk_id, partition_replica_id), --a single chunk, replica tuple
UNIQUE (chunk_id, database_name), --no two chunk replicas on same node
UNIQUE (schema_name, table_name)
);
CREATE TABLE IF NOT EXISTS field (
namespace_name NAME NOT NULL REFERENCES namespace (name),
hypertable_name NAME NOT NULL REFERENCES hypertable (name),
name NAME NOT NULL,
data_type REGTYPE NOT NULL CHECK (data_type IN
('double precision' :: REGTYPE, 'text' :: REGTYPE, 'boolean' :: REGTYPE, 'bigint' :: REGTYPE)),
is_partitioning BOOLEAN DEFAULT FALSE,
is_distinct BOOLEAN DEFAULT FALSE,
index_types field_index_type [],
PRIMARY KEY (namespace_name, name)
data_type REGTYPE NOT NULL,
is_partitioning BOOLEAN NOT NULL DEFAULT FALSE,
is_distinct BOOLEAN NOT NULL DEFAULT FALSE,
index_types field_index_type [] NOT NULL,
PRIMARY KEY (hypertable_name, name)
);
CREATE UNIQUE INDEX IF NOT EXISTS one_partition_field
ON field (namespace_name)
ON field (hypertable_name)
WHERE is_partitioning;

51
sql/main/chunk.sql Normal file
View File

@ -0,0 +1,51 @@
CREATE OR REPLACE FUNCTION lock_for_chunk_close(
chunk_id INTEGER
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
BEGIN
--take an update lock on the chunk row
--this conflicts, by design, with the lock taken when inserting on the node getting the insert command (not the node with the chunk table)
PERFORM *
FROM chunk c
WHERE c.id = chunk_id
FOR UPDATE;
END
$BODY$;
CREATE OR REPLACE FUNCTION max_time_for_chunk_close(
schema_name NAME,
table_name NAME
)
RETURNS BIGINT LANGUAGE PLPGSQL STABLE AS
$BODY$
DECLARE
max_time BIGINT;
BEGIN
EXECUTE format(
$$
SELECT max("time")
FROM %I.%I
$$,
schema_name, table_name)
INTO max_time;
RETURN max_time;
END
$BODY$;
CREATE OR REPLACE FUNCTION set_end_time_for_chunk_close(
chunk_id INTEGER,
max_time BIGINT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
BEGIN
UPDATE chunk SET end_time = max_time WHERE id = chunk_id;
END
$BODY$;

View File

@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION _sysinternal.create_chunk_replica_node_index(
schema_name NAME,
table_name NAME,
field_name NAME,
index_type field_index_type
)
RETURNS VOID LANGUAGE PLPGSQL AS
$BODY$
DECLARE
index_name NAME;
prefix BIGINT;
BEGIN
prefix = nextval('chunk_replica_node_index_name_prefix');
IF index_type = 'TIME-VALUE' THEN
index_name := format('%s-time-%s', prefix, field_name);
ELSIF index_type = 'VALUE-TIME' THEN
index_name := format('%s-%s-time', prefix, field_name);
ELSE
RAISE EXCEPTION 'Unknown index type %', index_type
USING ERRCODE = 'IO103';
END IF;
INSERT INTO chunk_replica_node_index (schema_name, table_name, field_name, index_name, index_type) VALUES
(schema_name, table_name, field_name, index_name, index_type)
ON CONFLICT DO NOTHING;
END
$BODY$;

View File

@ -0,0 +1,41 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node_index()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
IF NEW.index_type = 'TIME-VALUE' THEN
EXECUTE format(
$$
CREATE INDEX %1$I ON %2$I.%3$I
USING BTREE(time DESC NULLS LAST, %4$I)
WHERE %4$I IS NOT NULL
$$,
NEW.index_name, NEW.schema_name, NEW.table_name, NEW.field_name);
ELSIF NEW.index_type = 'VALUE-TIME' THEN
EXECUTE format(
$$
CREATE INDEX %1$I ON %2$I.%3$I
USING BTREE(%4$I, time DESC NULLS LAST)
WHERE %4$I IS NOT NULL
$$,
NEW.index_name, NEW.schema_name, NEW.table_name, NEW.field_name);
ELSE
RAISE EXCEPTION 'Unknown index type %', index_type
USING ERRCODE = 'IO103';
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node_index
ON chunk_replica_node_index;
CREATE TRIGGER trigger_on_create_chunk_replica_node_index AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node_index
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node_index();
COMMIT;

View File

@ -0,0 +1,51 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
partition_replica_row partition_replica;
chunk_row chunk;
field_row field;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT partition_replica_row
FROM partition_replica AS p
WHERE p.id = NEW.partition_replica_id;
SELECT *
INTO STRICT chunk_row
FROM chunk AS c
WHERE c.id = NEW.chunk_id;
IF NEW.database_name = current_database() THEN
PERFORM _sysinternal.create_local_data_table(NEW.schema_name, NEW.table_name,
partition_replica_row.schema_name, partition_replica_row.table_name);
FOR field_row IN SELECT f.*
FROM field AS f
WHERE f.hypertable_name = partition_replica_row.hypertable_name LOOP
PERFORM _sysinternal.create_chunk_replica_node_index(NEW.schema_name, NEW.table_name, field_row.name, index_type)
FROM unnest(field_row.index_types) AS index_type;
END LOOP;
ELSE
PERFORM _sysinternal.create_remote_table(NEW.schema_name, NEW.table_name,
partition_replica_row.schema_name, partition_replica_row.table_name, NEW.database_name);
END IF;
PERFORM _sysinternal.set_time_constraint(NEW.schema_name, NEW.table_name, chunk_row.start_time, chunk_row.end_time);
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node
ON chunk_replica_node;
CREATE TRIGGER trigger_on_create_chunk_replica_node AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node();
COMMIT;

View File

@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
BEGIN
IF TG_OP <> 'INSERT' AND TG_OP <> 'UPDATE' THEN
RAISE EXCEPTION 'Only inserts and updates supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
IF TG_OP = 'UPDATE' THEN
PERFORM _sysinternal.set_time_constraint(crn.schema_name, crn.table_name, NEW.start_time, NEW.end_time)
FROM chunk_replica_node crn
WHERE crn.chunk_id = NEW.id;
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_chunk
ON chunk;
CREATE TRIGGER trigger_on_create_chunk AFTER INSERT OR UPDATE OR DELETE ON chunk
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk();
COMMIT;

View File

@ -1,119 +0,0 @@
CREATE OR REPLACE FUNCTION _sysinternal.create_data_table_index(
table_oid REGCLASS,
field_name NAME,
index_type field_index_type
)
RETURNS VOID LANGUAGE PLPGSQL AS
$BODY$
DECLARE
index_name NAME;
prefix BIGINT;
BEGIN
prefix = nextval('data_table_index_name_prefix');
IF index_type = 'TIME-VALUE' THEN
index_name := format('%s-time-%s', prefix, field_name);
ELSIF index_type = 'VALUE-TIME' THEN
index_name := format('%s-%s-time', prefix, field_name);
ELSE
RAISE EXCEPTION 'Unknown index type %', index_type
USING ERRCODE = 'IO103';
END IF;
INSERT INTO data_table_index (table_oid, field_name, index_name, index_type) VALUES
(table_oid, field_name, index_name, index_type)
ON CONFLICT DO NOTHING;
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_data_table()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
field_row field;
BEGIN
IF TG_OP = 'UPDATE' THEN
IF (
(OLD.start_time IS NULL AND new.start_time IS NOT NULL)
OR
(OLD.end_time IS NULL AND new.end_time IS NOT NULL)
)
AND (
OLD.table_oid = NEW.table_oid AND
OLD.namespace_name = NEW.namespace_name AND
OLD.partition_number = NEW.partition_number AND
OLD.total_partitions = NEW.total_partitions AND
OLD.partitioning_field = NEW.partitioning_field
) THEN
RETURN NEW;
ELSE
RAISE EXCEPTION 'This type of update not allowed on data_table'
USING ERRCODE = 'IO101';
END IF;
END IF;
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts and updates supported on data_table table'
USING ERRCODE = 'IO101';
END IF;
FOR field_row IN SELECT f.*
FROM field AS f
WHERE f.namespace_name = NEW.namespace_name LOOP
PERFORM _sysinternal.create_data_table_index(NEW.table_oid, field_row.name, index_type)
FROM unnest(field_row.index_types) AS index_type;
END LOOP;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_data_table
ON data_table;
CREATE TRIGGER trigger_on_create_data_table AFTER INSERT OR UPDATE OR DELETE ON data_table
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_data_table();
COMMIT;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_data_table_index()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
USING ERRCODE = 'IO101';
END IF;
IF NEW.index_type = 'TIME-VALUE' THEN
EXECUTE format(
$$
CREATE INDEX %1$I ON %2$s
USING BTREE(time DESC NULLS LAST, %3$I)
WHERE %3$I IS NOT NULL
$$,
NEW.index_name, NEW.table_oid, NEW.field_name);
ELSIF NEW.index_type = 'VALUE-TIME' THEN
EXECUTE format(
$$
CREATE INDEX %1$I ON %2$s
USING BTREE(%3$I, time DESC NULLS LAST)
WHERE %3$I IS NOT NULL
$$,
NEW.index_name, NEW.table_oid, NEW.field_name);
ELSE
RAISE EXCEPTION 'Unknown index type %', index_type
USING ERRCODE = 'IO103';
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_data_table_index
ON data_table_index;
CREATE TRIGGER trigger_on_create_data_table_index AFTER INSERT OR UPDATE OR DELETE ON data_table_index
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_data_table_index();
COMMIT;

View File

@ -0,0 +1,36 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_distinct_replica_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
hypertable_replica_row hypertable_replica;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT hypertable_replica_row
FROM hypertable_replica AS h
WHERE h.hypertable_name = NEW.hypertable_name AND
h.replica_id = NEW.replica_id;
IF NEW.database_name = current_database() THEN
PERFORM _sysinternal.create_local_distinct_table(NEW.schema_name, NEW.table_name,
hypertable_replica_row.distinct_schema_name, hypertable_replica_row.distinct_table_name);
ELSE
PERFORM _sysinternal.create_remote_table(NEW.schema_name, NEW.table_name,
hypertable_replica_row.distinct_schema_name, hypertable_replica_row.distinct_table_name, NEW.database_name);
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_distinct_replica_node
ON distinct_replica_node;
CREATE TRIGGER trigger_on_create_distinct_replica_node AFTER INSERT OR UPDATE OR DELETE ON distinct_replica_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_distinct_replica_node();
COMMIT;

View File

@ -1,5 +1,5 @@
CREATE OR REPLACE FUNCTION create_field_indexes_on_data_tables(
namespace_name NAME,
CREATE OR REPLACE FUNCTION _sysinternal.create_chunk_replica_node_indexes_for_field(
hypertable_name NAME,
field_name NAME,
index_types field_index_type []
)
@ -7,16 +7,33 @@ CREATE OR REPLACE FUNCTION create_field_indexes_on_data_tables(
$BODY$
DECLARE
BEGIN
PERFORM _sysinternal.create_data_table_index(dt.table_oid, field_name, index_type)
FROM data_table dt
PERFORM _sysinternal.create_chunk_replica_node_index(crn.schema_name, crn.table_name, field_name, index_type)
FROM chunk_replica_node crn
INNER JOIN partition_replica pr ON (pr.id = crn.partition_replica_id)
CROSS JOIN unnest(index_types) AS index_type
WHERE dt.namespace_name = create_field_indexes_on_data_tables.namespace_name;
WHERE pr.hypertable_name = create_chunk_replica_node_indexes_for_field.hypertable_name;
END
$BODY$;
CREATE OR REPLACE FUNCTION create_field_on_cluster_table(
CREATE OR REPLACE FUNCTION _sysinternal.create_partition_constraint_for_field(
hypertable_name NAME,
field_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
PERFORM _sysinternal.add_partition_constraint(pr.schema_name, pr.table_name, p.keyspace_start, p.keyspace_end, p.epoch_id)
FROM partition_epoch pe
INNER JOIN partition p ON (p.epoch_id = pe.id)
INNER JOIN partition_replica pr ON (pr.partition_id = p.id)
WHERE pe.hypertable_name = create_partition_constraint_for_field.hypertable_name
AND pe.partitioning_field = create_partition_constraint_for_field.field_name;
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_field_on_root_table(
schema_name NAME,
cluster_table_name NAME,
table_name NAME,
field NAME,
data_type_oid REGTYPE
)
@ -28,7 +45,7 @@ BEGIN
$$
ALTER TABLE %1$I.%2$I ADD COLUMN %3$I %4$s DEFAULT NULL
$$,
schema_name, cluster_table_name, field, data_type_oid);
schema_name, table_name, field, data_type_oid);
END
$BODY$;
@ -38,23 +55,23 @@ CREATE OR REPLACE FUNCTION _sysinternal.on_create_field()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
namespace_row namespace;
node_row node;
hypertable_row hypertable;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT namespace_row
FROM namespace AS ns
WHERE ns.name = NEW.namespace_name;
INTO STRICT hypertable_row
FROM hypertable AS h
WHERE h.name = NEW.hypertable_name;
PERFORM create_field_on_cluster_table(namespace_row.schema_name, namespace_row.cluster_table_name, NEW.name,
PERFORM _sysinternal.create_field_on_root_table(hypertable_row.root_schema_name, hypertable_row.root_table_name, NEW.name,
NEW.data_type);
PERFORM create_field_indexes_on_data_tables(NEW.namespace_name, NEW.name, NEW.index_types);
PERFORM _sysinternal.create_partition_constraint_for_field(NEW.hypertable_name, NEW.name);
PERFORM _sysinternal.create_chunk_replica_node_indexes_for_field(NEW.hypertable_name, NEW.name, NEW.index_types);
RETURN NEW;
END
$BODY$

View File

@ -0,0 +1,33 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable_replica()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
hypertable_row hypertable;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on hypertable_replica table'
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT hypertable_row
FROM hypertable AS h
WHERE h.name = NEW.hypertable_name;
PERFORM _sysinternal.create_replica_table(NEW.schema_name, NEW.table_name,
hypertable_row.root_schema_name, hypertable_row.root_table_name);
PERFORM _sysinternal.create_replica_table(NEW.distinct_schema_name, NEW.distinct_table_name,
hypertable_row.distinct_schema_name, hypertable_row.distinct_table_name);
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_hypertable_replica
ON hypertable_replica;
CREATE TRIGGER trigger_on_create_hypertable_replica AFTER INSERT OR UPDATE OR DELETE ON hypertable_replica
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable_replica();
COMMIT;

View File

@ -0,0 +1,33 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
remote_node node;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
USING ERRCODE = 'IO101';
END IF;
PERFORM _sysinternal.create_schema(NEW.main_schema_name);
PERFORM _sysinternal.create_schema(NEW.associated_schema_name);
PERFORM _sysinternal.create_schema(NEW.root_schema_name);
PERFORM _sysinternal.create_schema(NEW.distinct_schema_name);
PERFORM _sysinternal.create_root_table(NEW.root_schema_name, NEW.root_table_name);
PERFORM _sysinternal.create_root_distinct_table(NEW.distinct_schema_name, NEW.distinct_table_name);
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_hypertable
ON hypertable;
CREATE TRIGGER trigger_on_create_hypertable AFTER INSERT OR UPDATE OR DELETE ON hypertable
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable();
COMMIT;

View File

@ -1,5 +1,5 @@
CREATE OR REPLACE FUNCTION get_fields_from_json(
namespace_name NAME
hypertable_name NAME
)
RETURNS TEXT [] LANGUAGE SQL STABLE AS
$BODY$
@ -10,27 +10,27 @@ SELECT ARRAY(
f.name AS field_name,
f.data_type
FROM field AS f
WHERE f.namespace_name = get_fields_from_json.namespace_name
WHERE f.hypertable_name = get_fields_from_json.hypertable_name
ORDER BY f.name
) AS info
);
$BODY$;
CREATE OR REPLACE FUNCTION get_field_list(
namespace_name NAME
hypertable_name NAME
)
RETURNS TEXT LANGUAGE SQL STABLE AS
$BODY$
SELECT array_to_string(get_quoted_field_names(namespace_name), ', ')
SELECT array_to_string(get_quoted_field_names(hypertable_name), ', ')
$BODY$;
CREATE OR REPLACE FUNCTION get_field_from_json_list(
namespace_name NAME
hypertable_name NAME
)
RETURNS TEXT LANGUAGE SQL STABLE AS
$BODY$
SELECT array_to_string(get_fields_from_json(namespace_name), ', ')
SELECT array_to_string(get_fields_from_json(hypertable_name), ', ')
$BODY$;
@ -46,7 +46,7 @@ BEGIN
EXECUTE format(
$$
CREATE TEMP TABLE "%s" (
namespace_name name NOT NULL,
hypertable_name name NOT NULL,
time BIGINT NOT NULL,
value jsonb
) ON COMMIT DROP
@ -65,28 +65,28 @@ CREATE OR REPLACE FUNCTION insert_data_one_partition(
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
data_table_row data_table;
chunk_replica_node_row chunk_replica;
distinct_table_oid REGCLASS;
time_point BIGINT;
namespace_point NAME;
hypertable_point NAME;
BEGIN
time_point := 1;
EXECUTE format(
$$
SELECT "time", namespace_name FROM %s ORDER BY namespace_name LIMIT 1
SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1
$$,
copy_table_oid)
INTO time_point, namespace_point;
INTO time_point, hypertable_point;
WHILE time_point IS NOT NULL LOOP
SELECT *
INTO distinct_table_oid
FROM get_distinct_local_table_oid(namespace_point);
FROM get_distinct_local_table_oid(hypertable_point);
SELECT *
INTO data_table_row
FROM get_or_create_data_table(time_point, namespace_point,
FROM get_or_create_data_table(time_point, hypertable_point,
partition_number, total_partitions);
BEGIN
@ -96,13 +96,13 @@ BEGIN
(
DELETE FROM %2$s
WHERE ("time" >= %3$L OR %3$L IS NULL) and ("time" <= %4$L OR %4$L IS NULL)
AND namespace_name = %5$L
AND hypertable_name = %5$L
RETURNING *
),
distinct_field AS (
SELECT name
FROM field
WHERE namespace_name = %5$L AND is_distinct = TRUE
WHERE hypertable_name = %5$L AND is_distinct = TRUE
),
insert_distinct AS (
INSERT INTO %6$s as distinct_table
@ -120,17 +120,17 @@ BEGIN
)
INSERT INTO %1$s (time, %7$s) SELECT time, %8$s FROM selected;
$$, data_table_row.table_oid, copy_table_oid, data_table_row.start_time,
data_table_row.end_time, namespace_point, distinct_table_oid,
get_field_list(namespace_point),
get_field_from_json_list(namespace_point))
data_table_row.end_time, hypertable_point, distinct_table_oid,
get_field_list(hypertable_point),
get_field_from_json_list(hypertable_point))
USING data_table_row;
EXECUTE format(
$$
SELECT "time", namespace_name FROM %s ORDER BY namespace_name LIMIT 1
SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1
$$,
copy_table_oid)
INTO time_point, namespace_point;
INTO time_point, hypertable_point;
EXCEPTION WHEN deadlock_detected THEN
--do nothing, rerun loop (deadlock can be caused by concurrent updates to distinct table)

View File

@ -1,201 +0,0 @@
CREATE OR REPLACE FUNCTION create_schema(
schema_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE SCHEMA %I
$$, schema_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_cluster_table(
schema_name NAME,
table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %I.%I (
time BIGINT NOT NULL
)
$$, schema_name, table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_cluster_distinct_table(
schema_name NAME,
table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (
field TEXT,
value TEXT,
last_time_approx BIGINT,
PRIMARY KEY(field, value)
);
$$, schema_name, table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_local_distinct_table(
schema_name NAME,
table_name NAME,
cluster_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (PRIMARY KEY(field, value))
INHERITS(%1$I.%3$I);
$$, schema_name, table_name, cluster_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_remote_distinct_table(
schema_name NAME,
local_table_name NAME,
remote_table_name NAME,
cluster_table_name NAME,
server_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I) SERVER %4$I
OPTIONS (schema_name %1$L, table_name %5$L)
$$,
schema_name, remote_table_name, cluster_table_name, server_name, local_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_master_table(
schema_name NAME,
table_name NAME,
cluster_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I)
$$,
schema_name, table_name, cluster_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION create_remote_table(
schema_name NAME,
remote_table_name NAME,
master_table_name NAME,
cluster_table_name NAME,
server_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%1$I.%3$I) SERVER %4$I OPTIONS (schema_name %1$L, table_name %5$L)
$$,
schema_name, remote_table_name, cluster_table_name, server_name, master_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_namespace()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
remote_node node;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
USING ERRCODE = 'IO101';
END IF;
IF NEW.schema_name = 'public' THEN
RAISE EXCEPTION 'public is not an allowed namespace name'
USING ERRCODE = 'IO104';
END IF;
IF position('_' IN NEW.schema_name) = 1 THEN
RAISE EXCEPTION 'Namespace cannot begin with an underscore (_)'
USING ERRCODE = 'IO104';
END IF;
PERFORM create_schema(NEW.schema_name);
PERFORM create_cluster_table(NEW.schema_name, NEW.cluster_table_name);
PERFORM create_cluster_distinct_table(NEW.schema_name, NEW.cluster_distinct_table_name);
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_namespace
ON namespace;
CREATE TRIGGER trigger_on_create_namespace AFTER INSERT OR UPDATE OR DELETE ON namespace
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_namespace();
COMMIT;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_namespace_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
namespace_row namespace;
node_row node;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT namespace_row
FROM namespace AS ns
WHERE ns.name = NEW.namespace_name;
IF NEW.database_name = current_database() THEN
PERFORM create_master_table(namespace_row.schema_name, NEW.master_table_name, namespace_row.cluster_table_name);
PERFORM create_local_distinct_table(namespace_row.schema_name, NEW.distinct_local_table_name,
namespace_row.cluster_distinct_table_name);
ELSE
SELECT *
INTO STRICT node_row
FROM node AS n
WHERE n.database_name = NEW.database_name;
PERFORM create_remote_table(namespace_row.schema_name, NEW.remote_table_name, NEW.master_table_name,
namespace_row.cluster_table_name,
node_row.server_name);
PERFORM create_remote_distinct_table(namespace_row.schema_name, NEW.distinct_local_table_name,
NEW.distinct_remote_table_name,
namespace_row.cluster_distinct_table_name,
node_row.server_name);
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_namespace_node
ON namespace_node;
CREATE TRIGGER trigger_on_create_namespace_node AFTER INSERT OR UPDATE OR DELETE ON namespace_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_namespace_node();
COMMIT;

View File

@ -0,0 +1,31 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition_replica_table()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
PERFORM _sysinternal.create_data_partition_table(
NEW.schema_name, NEW.table_name,
hr.schema_name, hr.table_name,
p.keyspace_start, p.keyspace_end,
p.epoch_id)
FROM hypertable_replica hr
CROSS JOIN partition p
WHERE hr.hypertable_name = NEW.hypertable_name AND
hr.replica_id = NEW.replica_id AND
p.id = NEW.partition_id;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_partition_replica_table
ON partition_replica;
CREATE TRIGGER trigger_on_create_partition_replica_table AFTER INSERT OR UPDATE OR DELETE ON partition_replica
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition_replica_table();
COMMIT;

View File

@ -1,38 +0,0 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition_table()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
namespace_node_row namespace_node;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on partition_table table'
USING ERRCODE = 'IO101';
END IF;
SELECT *
INTO STRICT namespace_node_row
FROM namespace_node n
WHERE n.namespace_name = NEW.namespace_name AND
n.database_name = current_database();
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (
CONSTRAINT partition CHECK(get_partition_for_key(%4$I::text, %5$L) = %6$L)
) INHERITS(%1$I.%3$I)
$$,
get_schema_name(NEW.namespace_name), NEW.table_name, namespace_node_row.master_table_name,
NEW.partitioning_field,
NEW.total_partitions, NEW.partition_number
);
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_partition_table
ON partition_table;
CREATE TRIGGER trigger_on_create_partition_table AFTER INSERT OR UPDATE OR DELETE ON partition_table
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition_table();
COMMIT;

View File

@ -1,6 +1,6 @@
CREATE OR REPLACE FUNCTION get_partition_for_key(key TEXT, num_nodes SMALLINT)
CREATE OR REPLACE FUNCTION get_partition_for_key(key TEXT, mod_factor INT)
RETURNS SMALLINT LANGUAGE SQL IMMUTABLE STRICT AS $$
SELECT ((public.hash_string(key, 'murmur3' :: TEXT, 1 :: INT4) & x'7fffffff' :: INTEGER) % num_nodes)::SMALLINT;
SELECT ((public.hash_string(key, 'murmur3' :: TEXT, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor)::SMALLINT;
$$;

View File

@ -1,64 +1,44 @@
CREATE OR REPLACE FUNCTION get_partitioning_field_name(
namespace_name NAME
)
RETURNS NAME LANGUAGE SQL STABLE AS
$BODY$
SELECT name
FROM field f
WHERE f.namespace_name = get_partitioning_field_name.namespace_name AND f.is_partitioning;
$BODY$;
CREATE OR REPLACE FUNCTION get_schema_name(
namespace_name NAME
)
RETURNS NAME LANGUAGE SQL STABLE AS
$BODY$
SELECT schema_name
FROM namespace AS n
WHERE n.name = namespace_name;
$BODY$;
CREATE OR REPLACE FUNCTION get_distinct_local_table_oid(
namespace_name NAME
hypertable_name NAME,
replica_id SMALLINT
)
RETURNS REGCLASS LANGUAGE SQL STABLE AS
$BODY$
SELECT format('%I.%I', n.schema_name, nn.distinct_local_table_name) :: REGCLASS
FROM namespace AS n
INNER JOIN namespace_node AS nn ON (n.name = nn.namespace_name)
WHERE n.name = get_distinct_local_table_oid.namespace_name AND
nn.database_name = current_database();
SELECT format('%I.%I', drn.schema_name, drn.table_name) :: REGCLASS
FROM distinct_replica_node AS drn
WHERE drn.hypertable_name = get_distinct_local_table_oid.hypertable_name AND
drn.replica_id = get_distinct_local_table_oid.replica_id AND
drn.database_name = current_database();
$BODY$;
CREATE OR REPLACE FUNCTION get_field_names(
namespace_name NAME
hypertable_name NAME
)
RETURNS NAME [] LANGUAGE SQL STABLE AS
$BODY$
SELECT ARRAY(
SELECT name
FROM field f
WHERE f.namespace_name = get_field_names.namespace_name
WHERE f.hypertable_name = get_field_names.hypertable_name
ORDER BY name
);
$BODY$;
CREATE OR REPLACE FUNCTION get_quoted_field_names(
namespace_name NAME
hypertable_name NAME
)
RETURNS TEXT [] LANGUAGE SQL STABLE AS
$BODY$
SELECT ARRAY(
SELECT format('%I', name)
FROM field f
WHERE f.namespace_name = get_quoted_field_names.namespace_name
WHERE f.hypertable_name = get_quoted_field_names.hypertable_name
ORDER BY name
);
$BODY$;
CREATE OR REPLACE FUNCTION get_field_names_and_types(
namespace_name NAME,
hypertable_name NAME,
field_names NAME []
)
RETURNS TABLE(field NAME, data_type REGTYPE) LANGUAGE PLPGSQL STABLE AS
@ -72,7 +52,7 @@ BEGIN
FROM field f
INNER JOIN unnest(field_names) WITH ORDINALITY
AS x(field_name, ordering) ON f.name = x.field_name
WHERE f.namespace_name = get_field_names_and_types.namespace_name
WHERE f.hypertable_name = get_field_names_and_types.hypertable_name
ORDER BY x.ordering;
GET DIAGNOSTICS rows_returned = ROW_COUNT;
IF rows_returned != cardinality(field_names) THEN
@ -85,10 +65,10 @@ BEGIN
WHERE NOT EXISTS(
SELECT 1
FROM field f
WHERE f.namespace_name = get_field_names_and_types.namespace_name AND
WHERE f.hypertable_name = get_field_names_and_types.hypertable_name AND
f.name = field_name
);
RAISE 'Missing field "%" in namespace "%"', missing_field, namespace_name
RAISE 'Missing field "%" in namespace "%"', missing_field, hypertable_name
USING ERRCODE = 'IO002';
END;
END IF;
@ -96,7 +76,7 @@ END
$BODY$;
CREATE OR REPLACE FUNCTION get_field_type(
namespace_name NAME,
hypertable_name NAME,
field_name NAME
)
RETURNS REGTYPE LANGUAGE PLPGSQL STABLE AS
@ -107,22 +87,14 @@ BEGIN
SELECT f.data_type
INTO data_type
FROM field f
WHERE f.name = get_field_type.field_name AND f.namespace_name = get_field_type.namespace_name;
WHERE f.name = get_field_type.field_name AND f.hypertable_name = get_field_type.hypertable_name;
IF NOT FOUND THEN
RAISE 'Missing field "%" in namespace "%"', field_name, namespace_name
RAISE 'Missing field "%" in namespace "%"', field_name, hypertable_name
USING ERRCODE = 'IO002';
END IF;
RETURN data_type;
END
$BODY$;
CREATE OR REPLACE FUNCTION get_cluster_table(
namespace_name NAME
)
RETURNS REGCLASS LANGUAGE SQL STABLE AS
$BODY$
SELECT format('%I.%I', schema_name, cluster_table_name) :: REGCLASS
FROM namespace AS n
WHERE n.name = namespace_name
$BODY$;

239
sql/main/table_creation.sql Normal file
View File

@ -0,0 +1,239 @@
CREATE OR REPLACE FUNCTION _sysinternal.create_schema(
schema_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE SCHEMA IF NOT EXISTS %I
$$, schema_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_root_table(
schema_name NAME,
table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %I.%I (
time BIGINT NOT NULL
)
$$, schema_name, table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_root_distinct_table(
schema_name NAME,
table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (
field TEXT,
value TEXT,
last_time_approx BIGINT,
PRIMARY KEY(field, value)
);
$$, schema_name, table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_local_distinct_table(
schema_name NAME,
table_name NAME,
replica_schema_name NAME,
replica_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (PRIMARY KEY(field, value))
INHERITS(%3$I.%4$I);
$$, schema_name, table_name, replica_schema_name, replica_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_remote_table(
schema_name NAME,
table_name NAME,
parent_schema_name NAME,
parent_table_name NAME,
database_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
node_row node;
BEGIN
SELECT * INTO STRICT node_row
FROM node n
WHERE n.database_name = create_remote_table.database_name;
EXECUTE format(
$$
CREATE FOREIGN TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I) SERVER %5$I
OPTIONS (schema_name %1$L, table_name %2$L)
$$,
schema_name, table_name, parent_schema_name, parent_table_name, node_row.server_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_local_data_table(
schema_name NAME,
table_name NAME,
parent_schema_name NAME,
parent_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I);
$$,
schema_name, table_name, parent_schema_name, parent_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_replica_table(
schema_name NAME,
table_name NAME,
parent_schema_name NAME,
parent_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I () INHERITS(%3$I.%4$I)
$$,
schema_name, table_name, parent_schema_name, parent_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.create_data_partition_table(
schema_name NAME,
table_name NAME,
parent_schema_name NAME,
parent_table_name NAME,
keyspace_start SMALLINT,
keyspace_end SMALLINT,
epoch_id INT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
epoch_row partition_epoch;
field_exists BOOLEAN;
BEGIN
SELECT * INTO STRICT epoch_row
FROM partition_epoch pe
WHERE pe.id = epoch_id;
EXECUTE format(
$$
CREATE TABLE IF NOT EXISTS %1$I.%2$I (
) INHERITS(%3$I.%4$I)
$$,
schema_name, table_name, parent_schema_name, parent_table_name);
SELECT COUNT(*) > 0
INTO field_exists
FROM field f
WHERE f.hypertable_name = epoch_row.hypertable_name
AND f.name = epoch_row.partitioning_field;
IF field_exists THEN
PERFORM _sysinternal.add_partition_constraint(schema_name, table_name, keyspace_start, keyspace_end, epoch_id);
END IF;
END
$BODY$;
CREATE SEQUENCE IF NOT EXISTS pidx_index_name_seq;
CREATE OR REPLACE FUNCTION _sysinternal.add_partition_constraint(
schema_name NAME,
table_name NAME,
keyspace_start SMALLINT,
keyspace_end SMALLINT,
epoch_id INT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
epoch_row partition_epoch;
BEGIN
SELECT * INTO STRICT epoch_row
FROM partition_epoch pe
WHERE pe.id = epoch_id;
EXECUTE format(
$$
ALTER TABLE %1$I.%2$I
ADD CONSTRAINT partition CHECK(%3$s(%4$I::text, %5$L) BETWEEN %6$L AND %7$L)
$$,
schema_name, table_name,
epoch_row.partitioning_func, epoch_row.partitioning_field,
epoch_row.partitioning_mod, keyspace_start, keyspace_end);
EXECUTE FORMAT(
$$
CREATE INDEX %3$I ON %1$I.%2$I ("time" DESC NULLS LAST, %4$I)
$$,
schema_name, table_name,
format('%s_pidx', nextval('pidx_index_name_seq')),
epoch_row.partitioning_field);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.set_time_constraint(
schema_name NAME,
table_name NAME,
start_time BIGINT,
end_time BIGINT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
BEGIN
EXECUTE FORMAT(
$$
ALTER TABLE %I.%I DROP CONSTRAINT IF EXISTS time_range
$$,
schema_name, table_name);
IF start_time IS NOT NULL AND end_time IS NOT NULL THEN
EXECUTE FORMAT(
$$
ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time >= %L AND time <= %L)
$$,
schema_name, table_name, start_time, end_time);
ELSIF start_time IS NOT NULL THEN
EXECUTE FORMAT(
$$
ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time >= %L)
$$,
schema_name, table_name, start_time);
ELSIF end_time IS NOT NULL THEN
EXECUTE FORMAT(
$$
ALTER TABLE %I.%I ADD CONSTRAINT time_range CHECK(time <= %L)
$$,
schema_name, table_name, end_time);
END IF;
END
$BODY$;

View File

@ -1,39 +1,13 @@
CREATE TABLE IF NOT EXISTS partition_table (
namespace_name NAME REFERENCES namespace (name) NOT NULL,
partition_number SMALLINT NOT NULL CHECK (partition_number >= 0),
total_partitions SMALLINT NOT NULL CHECK (total_partitions > 0),
partitioning_field NAME NOT NULL,
CREATE SEQUENCE IF NOT EXISTS chunk_replica_node_index_name_prefix;
CREATE TABLE IF NOT EXISTS chunk_replica_node_index (
schema_name NAME NOT NULL,
table_name NAME NOT NULL,
PRIMARY KEY (namespace_name, partition_number, total_partitions), --do not allow multiple partitioning fields, for now
UNIQUE (namespace_name, partition_number, total_partitions, partitioning_field),
UNIQUE (namespace_name, table_name),
CHECK (partition_number < total_partitions)
);
CREATE TABLE IF NOT EXISTS data_table (
table_oid REGCLASS PRIMARY KEY NOT NULL,
namespace_name NAME REFERENCES namespace (name) NOT NULL,
partition_number SMALLINT NOT NULL CHECK (partition_number >= 0),
total_partitions SMALLINT NOT NULL CHECK (total_partitions > 0),
partitioning_field NAME NOT NULL,
start_time BIGINT,
end_time BIGINT,
FOREIGN KEY (namespace_name, partition_number, total_partitions, partitioning_field)
REFERENCES partition_table (namespace_name, partition_number, total_partitions, partitioning_field),
UNIQUE (namespace_name, partition_number, total_partitions, start_time, end_time),
CHECK (start_time IS NOT NULL OR end_time IS NOT NULL),
CHECK (partition_number < total_partitions)
);
--TODO: any constrants for when total_partitions change?
CREATE SEQUENCE IF NOT EXISTS data_table_index_name_prefix;
CREATE TABLE IF NOT EXISTS data_table_index (
table_oid REGCLASS NOT NULL REFERENCES data_table (table_oid) ON DELETE CASCADE,
field_name NAME NOT NULL,
index_name NAME NOT NULL, --not regclass since regclass create problems with database backup/restore (indexes created after data load)
index_type field_index_type NOT NULL,
PRIMARY KEY (table_oid, field_name, index_name)
PRIMARY KEY (schema_name, table_name, field_name, index_name),
FOREIGN KEY (schema_name, table_name) REFERENCES chunk_replica_node(schema_name, table_name)
);

199
sql/meta/chunk.sql Normal file
View File

@ -0,0 +1,199 @@
--calculate new times for a new chunks for appropriate time values
--tables are created open-ended in one or two direction (either start_time or end_time is NULL)
--that way, tables grow in some time dir.
--Tables are always created adjacent to existing tables. So, tables
--will never be disjoint in terms of time. Therefore you will have:
-- <---current open ended start_time table --| existing closed tables | -- current open ended end_time table --->
--Should not be called directly. Requires a lock on partition (prevents simultaneous inserts)
CREATE OR REPLACE FUNCTION _sysinternal.calculate_new_chunk_times(
partition_id INT,
"time" BIGINT,
OUT table_start BIGINT,
OUT table_end BIGINT
)
LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
partition_epoch_row partition_epoch;
chunk_row chunk;
BEGIN
SELECT pe.* INTO partition_epoch_row
FROM partition p
INNER JOIN partition_epoch pe ON (p.epoch_id = pe.id)
WHERE p.id = partition_id FOR SHARE;
table_start := partition_epoch_row.start_time;
table_end := partition_epoch_row.end_time;
SELECT *
INTO chunk_row
FROM chunk AS c
WHERE c.end_time < calculate_new_chunk_times."time" AND
c.partition_id = calculate_new_chunk_times.partition_id
ORDER BY c.end_time DESC
LIMIT 1
FOR SHARE;
IF FOUND THEN
--there is a table that ends before this point;
table_start := chunk_row.end_time + 1;
ELSE
SELECT *
INTO chunk_row
FROM chunk AS c
WHERE c.start_time > calculate_new_chunk_times."time" AND
c.partition_id = calculate_new_chunk_times.partition_id
ORDER BY c.start_time DESC
LIMIT 1
FOR SHARE;
IF FOUND THEN
--there is a table that ends before this point
table_end = chunk_row.start_time - 1;
END IF;
END IF;
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.get_chunk(
partition_id INT,
time_point BIGINT
)
RETURNS chunk LANGUAGE SQL VOLATILE AS
$BODY$
SELECT *
FROM chunk c
WHERE c.partition_id = get_chunk.partition_id AND
(c.start_time <= time_point OR c.start_time IS NULL) AND
(c.end_time >= time_point OR c.end_time IS NULL)
FOR SHARE;
$BODY$;
--creates chunk. Must be called after aquiring a lock on partition.
CREATE OR REPLACE FUNCTION _sysinternal.create_chunk(
partition_id INT,
time_point BIGINT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
table_start BIGINT;
table_end BIGINT;
BEGIN
SELECT *
INTO table_start, table_end
FROM _sysinternal.calculate_new_chunk_times(partition_id, time_point);
INSERT INTO chunk(partition_id, start_time, end_time)
VALUES(partition_id, table_start, table_end);
END
$BODY$;
--gets or creates chunk. If creating chunk takes a lock on the corresponding partition.
--This prevents concurrently creating chunks on same partitions.
CREATE OR REPLACE FUNCTION get_or_create_chunk(
partition_id INT,
time_point BIGINT
)
RETURNS chunk LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
chunk_row chunk;
partition_row partition;
BEGIN
chunk_row := _sysinternal.get_chunk(partition_id, time_point);
--uses double-checked locking
IF chunk_row IS NULL THEN
--get lock
SELECT * INTO partition_row
FROM partition
WHERE id = partition_id FOR UPDATE;
--recheck:
chunk_row := _sysinternal.get_chunk(partition_id, time_point);
IF chunk_row IS NULL THEN
PERFORM _sysinternal.create_chunk(partition_id, time_point);
END IF;
chunk_row := _sysinternal.get_chunk(partition_id, time_point);
IF chunk_row IS NULL THEN --recheck
RAISE EXCEPTION 'Should never happen'
USING ERRCODE = 'IO501';
END IF;
END IF;
RETURN chunk_row;
END
$BODY$;
CREATE OR REPLACE FUNCTION close_chunk_end(
chunk_id INT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
crn_node_row RECORD;
node_row node;
max_time_replica BIGINT;
max_time BIGINT = 0;
table_end BIGINT;
chunk_row chunk;
BEGIN
SELECT *
INTO chunk_row
FROM chunk c
WHERE c.id = chunk_id
FOR UPDATE;
--PHASE 1: lock chunk row on all rows (prevents concurrent chunk insert)
FOR node_row IN
SELECT *
FROM node n
LOOP
PERFORM dblink_connect(node_row.server_name, node_row.server_name);
PERFORM dblink_exec(node_row.server_name, 'BEGIN');
PERFORM 1 FROM
dblink(node_row.server_name, format('SELECT * FROM lock_for_chunk_close(%L)', chunk_id)) AS t(x text);
END LOOP;
--PHASE 2: get max time for chunk
FOR crn_node_row IN
SELECT crn.*, n.*
FROM chunk_replica_node crn
INNER JOIN node n ON (n.database_name = crn.database_name)
WHERE crn.chunk_id = close_chunk_end.chunk_id
LOOP
SELECT t.max_time
INTO max_time_replica
FROM dblink(crn_node_row.server_name, format('SELECT * FROM max_time_for_chunk_close(%L, %L)', crn_node_row.schema_name, crn_node_row.table_name)) AS t(max_time BIGINT);
IF max_time = 0 THEN
max_time := max_time_replica;
ELSIF max_time <> max_time_replica THEN
RAISE EXCEPTION 'Should never happen: max_time % not equal max_time_replica %', max_time, max_time_replica
USING ERRCODE = 'IO501';
END IF;
END LOOP;
--TODO: is this right?
table_end := ((coalesce(max_time, chunk_row.start_time, 0) :: BIGINT / (1e9 * 60 * 60 * 24) + 1) :: BIGINT) * (1e9 * 60 * 60 * 24) :: BIGINT - 1;
--set time locally
UPDATE chunk SET end_time = table_end WHERE id = chunk_id;
--PHASE 3: set max_time remotely
FOR node_row IN
SELECT *
FROM node n
LOOP
PERFORM 1
FROM dblink(node_row.server_name, format('SELECT * FROM set_end_time_for_chunk_close(%L, %L)', chunk_id, table_end)) AS t(x text);
PERFORM dblink_exec(node_row.server_name, 'COMMIT');
PERFORM dblink_disconnect(node_row.server_name);
END LOOP;
END
$BODY$;

View File

@ -0,0 +1,31 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk_replica_node_meta()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
INSERT INTO distinct_replica_node(hypertable_name, replica_id, database_name, schema_name, table_name)
SELECT pr.hypertable_name, pr.replica_id, NEW.database_name,
h.associated_schema_name,
format('%s_%s_%s_distinct_data', h.associated_table_prefix, pr.replica_id, n.id)
FROM partition_replica pr
INNER JOIN hypertable h ON (h.name = pr.hypertable_name)
INNER JOIN node n ON (n.database_name = NEW.database_name)
WHERE pr.id = NEW.partition_replica_id
ON CONFLICT DO NOTHING;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_chunk_replica_node
ON chunk_replica_node;
CREATE TRIGGER trigger_on_create_chunk_replica_node AFTER INSERT OR UPDATE OR DELETE ON chunk_replica_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk_replica_node_meta();
COMMIT;

View File

@ -0,0 +1,68 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_chunk()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
field_row field;
schema_name NAME;
BEGIN
IF TG_OP = 'UPDATE' THEN
IF (
(OLD.start_time IS NULL AND new.start_time IS NOT NULL)
OR
(OLD.end_time IS NULL AND new.end_time IS NOT NULL)
)
AND (
OLD.id = NEW.id AND
OLD.partition_id = NEW.partition_id
) THEN
NULL;
ELSE
RAISE EXCEPTION 'This type of update not allowed on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
ELSIF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts and updates supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
--sync data on insert
IF TG_OP = 'INSERT' THEN
FOR schema_name IN
SELECT n.schema_name
FROM node AS n
LOOP
EXECUTE format(
$$
INSERT INTO %I.%I SELECT $1.*
$$,
schema_name,
TG_TABLE_NAME
)
USING NEW;
END LOOP;
--do not sync data on update. synced by close_chunk logic.
--TODO: random node picking broken (should make sure replicas are on different nodes). also stickiness.
INSERT INTO chunk_replica_node(chunk_id,partition_replica_id, database_name, schema_name, table_name)
SELECT NEW.id,
pr.id,
(SELECT database_name FROM node ORDER BY random() LIMIT 1),
pr.schema_name,
format('%s_%s_%s_%s_data', h.associated_table_prefix, pr.id, pr.replica_id, NEW.id)
FROM partition_replica pr
INNER JOIN hypertable h ON (h.name = pr.hypertable_name)
WHERE pr.partition_id = NEW.partition_id;
END IF;
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_on_create_chunk
ON chunk;
CREATE TRIGGER trigger_on_create_chunk AFTER INSERT OR UPDATE OR DELETE ON chunk
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_chunk();
COMMIT;

View File

@ -29,32 +29,92 @@ END
$BODY$;
CREATE OR REPLACE FUNCTION add_namespace(
namespace_name NAME
CREATE SEQUENCE IF NOT EXISTS default_hypertable_seq;
CREATE OR REPLACE FUNCTION add_hypertable(
hypertable_name NAME,
partitioning_field NAME,
main_schema_name NAME = 'public',
associated_schema_name NAME = NULL,
associated_table_prefix NAME = NULL,
number_partitions SMALLINT = NULL,
replication_factor SMALLINT = 1
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
id INTEGER;
BEGIN
id := nextval('default_hypertable_seq');
IF associated_schema_name IS NULL THEN
associated_schema_name = format('_sys_%s_%s', id, hypertable_name);
END IF;
IF associated_table_prefix IS NULL THEN
associated_table_prefix = format('_hyper_%s', id);
END IF;
IF number_partitions IS NULL THEN
SELECT COUNT(*) INTO number_partitions
FROM node;
END IF;
INSERT INTO hypertable (
name,
main_schema_name, main_table_name,
associated_schema_name, associated_table_prefix,
root_schema_name, root_table_name,
distinct_schema_name, distinct_table_name,
replication_factor)
VALUES (
hypertable_name,
main_schema_name, hypertable_name,
associated_schema_name, associated_table_prefix,
associated_schema_name, format('%s_root',associated_table_prefix),
associated_schema_name, format('%s_distinct',associated_table_prefix),
replication_factor)
ON CONFLICT DO NOTHING;
IF number_partitions != 0 THEN
PERFORM add_equi_partition_epoch(hypertable_name, number_partitions, partitioning_field);
END IF;
END
$BODY$;
CREATE OR REPLACE FUNCTION add_partition_epoch(
hypertable_name NAME,
keyspace_start SMALLINT[],
partitioning_field NAME
)
RETURNS VOID LANGUAGE SQL VOLATILE AS
$BODY$
INSERT INTO namespace (name, schema_name, cluster_table_name, cluster_distinct_table_name)
VALUES (namespace_name, get_schema_name(namespace_name), get_cluster_table_name(namespace_name),
get_cluster_distinct_table_name(namespace_name))
ON CONFLICT DO NOTHING;
INSERT INTO namespace_node (namespace_name, database_name, master_table_name, remote_table_name,
distinct_local_table_name, distinct_remote_table_name)
SELECT
namespace_name,
n.database_name,
get_master_table_name(namespace_name),
get_remote_table_name(namespace_name, n),
get_local_distinct_table_name(namespace_name),
get_remote_distinct_table_name(namespace_name, n)
FROM node AS n
ON CONFLICT DO NOTHING;
WITH epoch AS (
INSERT INTO partition_epoch (hypertable_name, start_time, end_time, partitioning_func, partitioning_mod, partitioning_field)
VALUES (hypertable_name, NULL, NULL, 'get_partition_for_key', 32768, partitioning_field) RETURNING id
)
INSERT INTO partition(epoch_id, keyspace_start, keyspace_end)
SELECT epoch.id, lag(start,1,0) OVER (), start-1
FROM unnest(keyspace_start::int[] || (32768)::INT) start, epoch
$BODY$;
CREATE OR REPLACE FUNCTION add_equi_partition_epoch(
hypertable_name NAME,
number_partitions SMALLINT,
partitioning_field NAME
)
RETURNS VOID LANGUAGE SQL VOLATILE AS
$BODY$
SELECT add_partition_epoch(
hypertable_name,
(SELECT ARRAY(SELECT start * 32768/(number_partitions) from generate_series(1, number_partitions-1) as start)::SMALLINT[]),
partitioning_field
)
$BODY$;
CREATE OR REPLACE FUNCTION add_field(
namespace_name NAME,
hypertable_name NAME,
field_name NAME,
data_type REGTYPE,
is_partitioning BOOLEAN,
@ -63,8 +123,8 @@ CREATE OR REPLACE FUNCTION add_field(
)
RETURNS VOID LANGUAGE SQL VOLATILE AS
$BODY$
INSERT INTO field (namespace_name, name, data_type, is_partitioning, is_distinct, index_types)
VALUES (namespace_name, field_name, data_type, is_partitioning, is_distinct, idx_types)
INSERT INTO field (hypertable_name, name, data_type, is_partitioning, is_distinct, index_types)
VALUES (hypertable_name, field_name, data_type, is_partitioning, is_distinct, idx_types)
ON CONFLICT DO NOTHING;
$BODY$;

View File

@ -1,33 +0,0 @@
CREATE OR REPLACE FUNCTION _sysinternal.sync_field()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
schema_name NAME;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on field table'
USING ERRCODE = 'IO101';
END IF;
FOR schema_name IN
SELECT n.schema_name
FROM node AS n
LOOP
EXECUTE format(
$$
INSERT INTO %I.field SELECT $1.*
$$,
schema_name
)
USING NEW;
END LOOP;
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_sync_field
ON field;
CREATE TRIGGER trigger_sync_field AFTER INSERT OR UPDATE OR DELETE ON field
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_field();
COMMIT;

View File

@ -0,0 +1,27 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_hypertable()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on hypertable name '
USING ERRCODE = 'IO101';
END IF;
INSERT INTO hypertable_replica
SELECT NEW.name, replica_id,
NEW.associated_schema_name, format('%s_%s', NEW.associated_table_prefix, replica_id),
NEW.associated_schema_name, format('%s_%s_distinct', NEW.associated_table_prefix, replica_id)
FROM generate_series(0, NEW.replication_factor-1) AS replica_id
ON CONFLICT DO NOTHING;
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_create_hypertable ON hypertable;
CREATE TRIGGER trigger_create_hypertable AFTER INSERT OR UPDATE OR DELETE ON hypertable
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_hypertable();
COMMIT;

View File

@ -1,67 +0,0 @@
CREATE OR REPLACE FUNCTION _sysinternal.sync_namespace()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
schema_name NAME;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace table'
USING ERRCODE = 'IO101';
END IF;
FOR schema_name IN
SELECT n.schema_name
FROM node AS n
LOOP
EXECUTE format(
$$
INSERT INTO %I.namespace SELECT $1.*
$$,
schema_name
)
USING NEW;
END LOOP;
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_sync_namespace
ON namespace;
CREATE TRIGGER trigger_sync_namespace AFTER INSERT OR UPDATE OR DELETE ON namespace
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_namespace();
COMMIT;
CREATE OR REPLACE FUNCTION _sysinternal.sync_namespace_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
schema_name NAME;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on namespace_node table'
USING ERRCODE = 'IO101';
END IF;
FOR schema_name IN
SELECT n.schema_name
FROM node AS n
LOOP
EXECUTE format(
$$
INSERT INTO %I.namespace_node SELECT $1.*
$$,
schema_name
)
USING NEW;
END LOOP;
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_sync_namespace_node
ON namespace_node;
CREATE TRIGGER trigger_sync_namespace_node AFTER INSERT OR UPDATE OR DELETE ON namespace_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_namespace_node();
COMMIT;

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION _sysinternal.on_create_partition()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % name ', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
INSERT INTO partition_replica(partition_id, hypertable_name, replica_id, schema_name, table_name)
SELECT
NEW.id,
hypertable_name,
replica_id,
h.associated_schema_name,
format('%s_%s_%s_partition', h.associated_table_prefix, NEW.id, replica_id)
FROM hypertable_replica hr
INNER JOIN hypertable h ON (h.name = hr.hypertable_name)
WHERE hypertable_name = (
SELECT hypertable_name
FROM partition_epoch
WHERE id = NEW.epoch_id
);
RETURN NEW;
END
$BODY$;
BEGIN;
DROP TRIGGER IF EXISTS trigger_create_partition ON partition;
CREATE TRIGGER trigger_create_partition AFTER INSERT OR UPDATE OR DELETE ON partition
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_partition();
COMMIT;

View File

@ -0,0 +1,51 @@
CREATE OR REPLACE FUNCTION _sysinternal.sync_only_insert()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
schema_name NAME;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on % table', TG_TABLE_NAME
USING ERRCODE = 'IO101';
END IF;
FOR schema_name IN
SELECT n.schema_name
FROM node AS n
LOOP
EXECUTE format(
$$
INSERT INTO %I.%I SELECT $1.*
$$,
schema_name,
TG_TABLE_NAME
)
USING NEW;
END LOOP;
RETURN NEW;
END
$BODY$;
DO
$BODY$
DECLARE
table_name NAME;
BEGIN
FOREACH table_name IN ARRAY ARRAY['cluster_user', 'hypertable', 'hypertable_replica',
'distinct_replica_node', 'partition_epoch', 'partition','partition_replica',
'chunk_replica_node', 'field']::NAME[] LOOP
EXECUTE format(
$$
DROP TRIGGER IF EXISTS trigger_0_sync_%1$s ON %1$s
$$,
table_name);
EXECUTE format(
$$
CREATE TRIGGER trigger_0_sync_%1$s AFTER INSERT OR UPDATE OR DELETE ON %1$s
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.sync_only_insert();
$$,
table_name);
END LOOP;
END
$BODY$;

View File

@ -1,23 +1,30 @@
\ir ../../main/table_creation.sql
\ir ../../main/cluster_user_triggers.sql
\ir ../../main/node_triggers.sql
\ir ../../main/namespace_triggers.sql
\ir ../../main/hypertable_triggers.sql
\ir ../../main/hypertable_replica_triggers.sql
\ir ../../main/distinct_replica_node_triggers.sql
\ir ../../main/field_triggers.sql
\ir ../../main/partitioning.sql
\ir ../../main/schema_info.sql
\ir ../../main/names.sql
\ir ../../main/tables.sql
\ir ../../main/data_table_triggers.sql
\ir ../../main/partition_table_triggers.sql
\ir ../../main/data_table_constructors.sql
\ir ../../main/insert.sql
\ir ../../main/partition_replica_triggers.sql
\ir ../../main/chunk_replica_node_index.sql
\ir ../../main/chunk_replica_node_index_triggers.sql
\ir ../../main/chunk_replica_node_triggers.sql
\ir ../../main/chunk_triggers.sql
\ir ../../main/chunk.sql
--\ir ../../main/data_table_constructors.sql
--\ir ../../main/insert.sql
\ir ../../main/ioql.sql
\ir ../../main/ioql_result_schema_nonagg.sql
\ir ../../main/ioql_result_schema_agg.sql
\ir ../../main/ioql_schema_info.sql
\ir ../../main/ioql_sql_gen.sql
\ir ../../main/ioql_remote_calls.sql
\ir ../../main/ioql_optimized_agg.sql
\ir ../../main/ioql_distinct_queries.sql
\ir ../../main/ioql_optimized_nonagg.sql
\ir ../../main/ioql_optimized.sql
--\ir ../../main/ioql.sql
--\ir ../../main/ioql_result_schema_nonagg.sql
--\ir ../../main/ioql_result_schema_agg.sql
--\ir ../../main/ioql_schema_info.sql
--\ir ../../main/ioql_sql_gen.sql
--\ir ../../main/ioql_remote_calls.sql
--\ir ../../main/ioql_optimized_agg.sql
--\ir ../../main/ioql_distinct_queries.sql
--\ir ../../main/ioql_optimized_nonagg.sql
--\ir ../../main/ioql_optimized.sql

View File

@ -2,5 +2,9 @@
\ir ../../meta/cluster.sql
\ir ../../meta/node_triggers.sql
\ir ../../meta/cluster_user_triggers.sql
\ir ../../meta/namespace_triggers.sql
\ir ../../meta/field_triggers.sql
\ir ../../meta/chunk_triggers.sql
\ir ../../meta/chunk_replica_node_triggers.sql
\ir ../../meta/chunk.sql
\ir ../../meta/hypertable_triggers.sql
\ir ../../meta/partition_triggers.sql
\ir ../../meta/sync_triggers.sql

View File

@ -9,58 +9,94 @@ SELECT add_cluster_user('postgres', NULL);
SELECT add_node('Test1' :: NAME, 'localhost');
SELECT add_node('test2' :: NAME, 'localhost');
SELECT add_namespace('testNs' :: NAME);
SELECT add_hypertable('testNs' :: NAME, 'Device_id');
SELECT *
FROM partition_replica;
SELECT add_field('testNs' :: NAME, 'Device_id', 'text', TRUE, TRUE, ARRAY['TIME-VALUE'] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'temp', 'double precision', FALSE, FALSE, ARRAY['VALUE-TIME'] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'occupied', 'boolean', FALSE, FALSE, ARRAY[] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'latitude', 'bigint', FALSE, FALSE, ARRAY[] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'really_long_field_goes_on_and_on_and_on_and_on_and_on_and_on_and_on_and_on', 'bigint', FALSE, FALSE, ARRAY['TIME-VALUE','VALUE-TIME'] :: field_index_type []);
SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint);
SELECT *
FROM node;
SELECT *
FROM namespace;
FROM hypertable;
SELECT *
FROM namespace_node;
FROM hypertable_replica;
SELECT *
FROM distinct_replica_node;
SELECT *
FROM partition_epoch;
SELECT *
FROM partition;
SELECT *
FROM partition_replica;
SELECT *
FROM chunk;
SELECT *
FROM chunk_replica_node;
SELECT *
FROM field;
\echo *********************************************************************************************************ß
\c Test1
SELECT *
FROM node;
SELECT *
FROM namespace;
FROM hypertable;
SELECT *
FROM namespace_node;
FROM hypertable_replica;
SELECT *
FROM distinct_replica_node;
SELECT *
FROM partition_epoch;
SELECT *
FROM partition;
SELECT *
FROM partition_replica;
SELECT *
FROM chunk;
SELECT *
FROM chunk_replica_node;
SELECT *
FROM field;
\dt "testNs".*
\det "testNs".*
\d+ "testNs".distinct
\d+ "testNs".cluster
\d+ "_sys_1_testNs".*
--\d+ "_sys_1_testNs"."_sys_1_testNs_1_0_partition"
--\d+ "_sys_1_testNs"."_sys_1_testNs_2_0_partition"
--\det "_sys_1_testNs".*
--\d+ "testNs".distinct
--test idempotence
\c meta
SELECT add_namespace('testNs' :: NAME);
SELECT add_hypertable('testNs' :: NAME, 'Device_id');
SELECT add_field('testNs' :: NAME, 'Device_id', 'text', TRUE, TRUE, ARRAY['TIME-VALUE'] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'temp', 'double precision', FALSE, FALSE, ARRAY['VALUE-TIME'] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'occupied', 'boolean', FALSE, FALSE, ARRAY[] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'latitude', 'bigint', FALSE, FALSE, ARRAY[] :: field_index_type []);
SELECT add_field('testNs' :: NAME, 'really_long_field_goes_on_and_on_and_on_and_on_and_on_and_on_and_on_and_on', 'bigint', FALSE, FALSE, ARRAY['TIME-VALUE','VALUE-TIME'] :: field_index_type []);
SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint);
\c Test1
\d+ "testNs".cluster
\d+ "_sys_1_testNs".*
SELECT get_or_create_data_table((1477075243*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT);
\dt "testNs".*
\det "testNs".*
\d+ "testNs".data_0_10_1477008000
\d+ "testNs".partition_0_10
\c meta
SELECT close_chunk_end(1);
SELECT *
FROM chunk;
SELECT * FROM get_or_create_chunk(1,10::bigint);
SELECT * FROM get_or_create_chunk(1,1257894000000000000::bigint);
SELECT *
FROM chunk;
\c Test1
\d+ "_sys_1_testNs".*
SELECT close_data_table_end('"testNs".data_0_10_1477008000');
\d+ "testNs".data_0_10_1477008000
SELECT get_or_create_data_table((1477075243*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT);
\dt "testNs".*
SELECT get_or_create_data_table(((1477075243+(60*60*25))*1e9)::bigint, 'testNs'::NAME, 0::SMALLINT, 10::SMALLINT);
\dt "testNs".*