Convert insert_data to only deal with one hypertable batch at a time. Remove unused insert_data_one_partition.

This commit is contained in:
Rob Kiefer 2016-12-21 17:45:26 -05:00
parent 0405a3ebb4
commit e3d9ce6d89
15 changed files with 102 additions and 215 deletions

View File

@ -44,7 +44,6 @@ BEGIN
EXECUTE format(
$$
CREATE TEMP TABLE "%s" (
hypertable_name name NOT NULL,
value jsonb
) ON COMMIT DROP
$$, table_name);
@ -53,100 +52,10 @@ BEGIN
END
$BODY$;
--creates fields from project_series, not namespaces
CREATE OR REPLACE FUNCTION insert_data_one_partition(
copy_table_oid REGCLASS,
partition_id INT
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
crn_record RECORD;
distinct_table_oid REGCLASS;
time_point BIGINT;
hypertable_point NAME;
BEGIN
time_point := 1;
EXECUTE format(
$$
SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1
$$,
copy_table_oid)
INTO time_point, hypertable_point;
WHILE time_point IS NOT NULL LOOP
FOR crn_record IN
SELECT
crn.database_name,
crn.schema_name,
crn.table_name,
c.start_time,
c.end_time,
pr.hypertable_name,
pr.replica_id
FROM get_or_create_chunk(insert_data_one_partition.partition_id, time_point) c
INNER JOIN chunk_replica_node crn ON (crn.chunk_id = c.id)
INNER JOIN partition_replica pr ON (pr.id = crn.partition_replica_id)
LOOP
SELECT *
INTO distinct_table_oid
FROM get_distinct_table_oid(hypertable_point, crn_record.replica_id, crn_record.database_name);
BEGIN
LOOP
EXECUTE format(
$$
WITH selected AS
(
DELETE FROM %2$s
WHERE ("time" >= %3$L OR %3$L IS NULL) and ("time" <= %4$L OR %4$L IS NULL)
AND hypertable_name = %5$L
RETURNING *
),
distinct_field AS (
SELECT name
FROM field
WHERE hypertable_name = %5$L AND is_distinct = TRUE
),
insert_distinct AS (
INSERT INTO %6$s as distinct_table
SELECT distinct_field.name, value->>distinct_field.name
FROM distinct_field
CROSS JOIN selected
WHERE value ? distinct_field.name
GROUP BY distinct_field.name, (value->>distinct_field.name)
ON CONFLICT
DO NOTHING
)
INSERT INTO %1$s (%7$s) SELECT %8$s FROM selected;
$$,
format('%I.%I', crn_record.schema_name, crn_record.table_name) :: REGCLASS,
copy_table_oid, crn_record.start_time, crn_record.end_time,
hypertable_point, distinct_table_oid,
get_field_list(hypertable_point),
get_field_from_json_list(hypertable_point));
EXIT;
END LOOP;
EXCEPTION WHEN deadlock_detected THEN
--do nothing, rerun loop (deadlock can be caused by concurrent updates to distinct table)
--TODO: try to get rid of this by ordering the insert
END;
END LOOP;
EXECUTE format(
$$
SELECT "time", hypertable_name FROM %s ORDER BY hypertable_name LIMIT 1
$$,
copy_table_oid)
INTO time_point, hypertable_point;
END LOOP;
END
$BODY$;
--creates fields from project_series, not namespaces
-- Inserts rows from a temporary table into correct hypertable child tables.
CREATE OR REPLACE FUNCTION insert_data(
copy_table_oid REGCLASS
hypertable_name NAME,
copy_table_oid REGCLASS
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
@ -154,26 +63,24 @@ DECLARE
crn_record RECORD;
distinct_table_oid REGCLASS;
time_point BIGINT;
hypertable_point NAME;
time_field_name_point NAME;
time_field_name_point NAME;
partition_id INT;
BEGIN
time_point := 1;
EXECUTE format(
$$
SELECT value->>h.time_field_name, ct.hypertable_name, h.time_field_name, p.id
FROM %s ct
LEFT JOIN hypertable h ON (h.NAME = ct.hypertable_name)
SELECT value->>h.time_field_name, h.time_field_name, p.id
FROM %1$s ct
LEFT JOIN hypertable h ON (h.NAME = %2$L)
LEFT JOIN partition_epoch pe ON (
pe.hypertable_name = ct.hypertable_name AND
(pe.start_time <= (value->>h.time_field_name)::bigint OR pe.start_time IS NULL) AND
pe.hypertable_name = %2$L AND
(pe.start_time <= (value->>h.time_field_name)::bigint OR pe.start_time IS NULL) AND
(pe.end_time >= (value->>h.time_field_name)::bigint OR pe.end_time IS NULL)
)
LEFT JOIN get_partition_for_epoch(pe, value->>pe.partitioning_field) AS p ON(true)
ORDER BY hypertable_name
LIMIT 1
$$, copy_table_oid)
INTO time_point, hypertable_point, time_field_name_point, partition_id;
$$, copy_table_oid, hypertable_name)
INTO time_point, time_field_name_point, partition_id;
IF time_point IS NOT NULL AND partition_id IS NULL THEN
RAISE EXCEPTION 'Should never happen: could not find partition for insert'
USING ERRCODE = 'IO501';
@ -195,67 +102,55 @@ BEGIN
LOOP
SELECT *
INTO distinct_table_oid
FROM get_distinct_table_oid(hypertable_point, crn_record.replica_id, crn_record.database_name);
FROM get_distinct_table_oid(hypertable_name, crn_record.replica_id, crn_record.database_name);
BEGIN
LOOP
EXECUTE format(
$$
WITH selected AS
(
DELETE FROM %2$s
WHERE ((value->>%9$L)::bigint >= %3$L OR %3$L IS NULL) and ((value->>%9$L)::bigint <= %4$L OR %4$L IS NULL)
AND hypertable_name = %5$L
RETURNING *
),
distinct_field AS (
SELECT name
FROM field
WHERE hypertable_name = %5$L AND is_distinct = TRUE
),
insert_distinct AS (
INSERT INTO %6$s as distinct_table
SELECT distinct_field.name, value->>distinct_field.name
FROM distinct_field
CROSS JOIN selected
WHERE value ? distinct_field.name
GROUP BY distinct_field.name, (value->>distinct_field.name)
ON CONFLICT
DO NOTHING
)
INSERT INTO %1$s (%7$s) SELECT %8$s FROM selected;
$$,
format('%I.%I', crn_record.schema_name, crn_record.table_name) :: REGCLASS,
copy_table_oid, crn_record.start_time, crn_record.end_time,
hypertable_point, distinct_table_oid,
get_field_list(hypertable_point),
get_field_from_json_list(hypertable_point),
time_field_name_point
);
EXIT;
END LOOP;
EXCEPTION WHEN deadlock_detected THEN
--do nothing, rerun loop (deadlock can be caused by concurrent updates to distinct table)
--TODO: try to get rid of this by ordering the insert
END;
EXECUTE format(
$$
WITH selected AS
(
DELETE FROM %2$s
WHERE ((value->>%8$L)::bigint >= %3$L OR %3$L IS NULL) and ((value->>%8$L)::bigint <= %4$L OR %4$L IS NULL)
RETURNING *
),
distinct_field AS (
SELECT name
FROM field
WHERE is_distinct = TRUE
),
insert_distinct AS (
INSERT INTO %5$s as distinct_table
SELECT distinct_field.name, value->>distinct_field.name
FROM distinct_field
CROSS JOIN selected
WHERE value ? distinct_field.name
GROUP BY distinct_field.name, (value->>distinct_field.name)
ON CONFLICT
DO NOTHING
)
INSERT INTO %1$s (%6$s) SELECT %7$s FROM selected;
$$,
format('%I.%I', crn_record.schema_name, crn_record.table_name) :: REGCLASS,
copy_table_oid, crn_record.start_time, crn_record.end_time,
distinct_table_oid,
get_field_list(hypertable_name),
get_field_from_json_list(hypertable_name),
time_field_name_point);
END LOOP;
EXECUTE format(
$$
SELECT value->>h.time_field_name, ct.hypertable_name, h.time_field_name, p.id
FROM %s ct
LEFT JOIN hypertable h ON (h.NAME = ct.hypertable_name)
LEFT JOIN partition_epoch pe ON (
pe.hypertable_name = ct.hypertable_name AND
(pe.start_time <= (value->>h.time_field_name)::bigint OR pe.start_time IS NULL) AND
(pe.end_time >= (value->>h.time_field_name)::bigint OR pe.end_time IS NULL)
)
LEFT JOIN get_partition_for_epoch(pe, value->>pe.partitioning_field) AS p ON(true)
ORDER BY hypertable_name
LIMIT 1
$$, copy_table_oid)
INTO time_point, hypertable_point, time_field_name_point, partition_id;
EXECUTE format(
$$
SELECT value->>h.time_field_name, h.time_field_name, p.id
FROM %1$s ct
LEFT JOIN hypertable h ON (h.NAME = %2$L)
LEFT JOIN partition_epoch pe ON (
pe.hypertable_name = %2$L AND
(pe.start_time <= (value->>h.time_field_name)::bigint OR pe.start_time IS NULL) AND
(pe.end_time >= (value->>h.time_field_name)::bigint OR pe.end_time IS NULL)
)
LEFT JOIN get_partition_for_epoch(pe, value->>pe.partitioning_field) AS p ON(true)
LIMIT 1
$$, copy_table_oid, hypertable_name)
INTO time_point, time_field_name_point, partition_id;
IF time_point IS NOT NULL AND partition_id IS NULL THEN
RAISE EXCEPTION 'Should never happen: could not find partition for insert'
USING ERRCODE = 'IO501';
@ -263,5 +158,3 @@ BEGIN
END LOOP;
END
$BODY$;

View File

@ -1,5 +1,5 @@
testNs {"device_id":"dev1","series_0":1.5,"series_1":1,"series_2":2,"series_bool":true,"time":1257894000000000000}
testNs {"device_id":"dev1","series_0":1.5,"series_1":2,"time":1257894000000000000}
testNs {"device_id":"dev1","series_0":2.5,"series_1":3,"time":1257894000000001000}
testNs {"device_id":"dev1","series_0":3.5,"series_1":4,"time":1257894001000000000}
testNs {"device_id":"dev1","series_0":4.5,"series_1":5,"series_bool":false,"time":1257897600000000000}
{"device_id":"dev1","series_0":1.5,"series_1":1,"series_2":2,"series_bool":true,"time":1257894000000000000}
{"device_id":"dev1","series_0":1.5,"series_1":2,"time":1257894000000000000}
{"device_id":"dev1","series_0":2.5,"series_1":3,"time":1257894000000001000}
{"device_id":"dev1","series_0":3.5,"series_1":4,"time":1257894001000000000}
{"device_id":"dev1","series_0":4.5,"series_1":5,"series_bool":false,"time":1257897600000000000}

Can't render this file because it contains an unexpected character in line 1 and column 9.

View File

@ -1,3 +1,3 @@
testNs {"device_id":"dev1","series_0":1.5,"series_1":1,"series_3":3,"series_4":4,"time":1257987600000000000}
testNs {"device_id":"dev1","series_0":1.5,"series_1":2,"time":1257987600000000000}
testNs {"device_id":"dev1","series_0":2.5,"series_1":3,"series_4":4,"time":1257894002000000000}
{"device_id":"dev1","series_0":1.5,"series_1":1,"series_3":3,"series_4":4,"time":1257987600000000000}
{"device_id":"dev1","series_0":1.5,"series_1":2,"time":1257987600000000000}
{"device_id":"dev1","series_0":2.5,"series_1":3,"series_4":4,"time":1257894002000000000}

Can't render this file because it contains an unexpected character in line 1 and column 9.

View File

@ -1,2 +1,2 @@
testNs {"series_1":1,"series_5":5,"series_6":6,"device_id":"dev2","series_0":1.5,"time":1257894000000000000}
testNs {"series_1":2,"series_6":6.1,"device_id":"dev2","series_0":1.5,"time":1257894000000000000}
{"series_1":1,"series_5":5,"series_6":6,"device_id":"dev2","series_0":1.5,"time":1257894000000000000}
{"series_1":2,"series_6":6.1,"device_id":"dev2","series_0":1.5,"time":1257894000000000000}

Can't render this file because it contains an unexpected character in line 1 and column 9.

View File

@ -1 +1 @@
public."Hypertable_1" {"time": 1257894000000000000, "temp_c": 30, "humidity": 70, "sensor_1": 1, "sensor_2": "2", "sensor_3": 3, "sensor_4": 100, "Device_id": "dev1"}
{"time": 1257894000000000000, "temp_c": 30, "humidity": 70, "sensor_1": 1, "sensor_2": "2", "sensor_3": 3, "sensor_4": 100, "Device_id": "dev1"}

Can't render this file because it contains an unexpected character in line 1 and column 8.

View File

@ -41,7 +41,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds2_ddl_1.tsv';
SELECT * FROM copy_t;
SELECT *
FROM insert_data('copy_t');
FROM insert_data('public."Hypertable_1"', 'copy_t');
COMMIT;
--TODO: make following work.
@ -53,7 +53,7 @@ COMMIT;
SELECT * FROM PUBLIC.default_replica_node;
\c test2
\c test2
\d+ PUBLIC."Hypertable_1"
\d+ "_sys_1_"."_hyper_1_root"
@ -73,7 +73,7 @@ ALTER TABLE PUBLIC."Hypertable_1" RENAME COLUMN sensor_3 TO sensor_3_renamed;
DROP INDEX "ind_sensor_1";
--expect error cases
\set ON_ERROR_STOP 0
\set ON_ERROR_STOP 0
ALTER TABLE PUBLIC."Hypertable_1" ALTER COLUMN sensor_2_renamed SET DATA TYPE int;
ALTER INDEX "ind_humidity" RENAME TO "ind_humdity2";
\set ON_ERROR_STOP 1
@ -89,9 +89,7 @@ ALTER TABLE PUBLIC."Hypertable_1" ADD COLUMN sensor_4 BIGINT NOT NULL DEFAULT 13
SELECT * FROM _sys_1_._hyper_1_0_1_distinct_data;
\c Test1
\c Test1
\d+ PUBLIC."Hypertable_1"
\d+ "_sys_1_"."_hyper_1_root"
\d+ _sys_1_._hyper_1_1_0_1_data

View File

@ -93,13 +93,13 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds2_ddl_1.tsv';
SELECT * FROM copy_t;
hypertable_name | value
-----------------------+--------------------------------------------------------------------------------------------------------------------------------------------------
public."Hypertable_1" | {"time": 1257894000000000000, "temp_c": 30, "humidity": 70, "sensor_1": 1, "sensor_2": "2", "sensor_3": 3, "sensor_4": 100, "Device_id": "dev1"}
value
--------------------------------------------------------------------------------------------------------------------------------------------------
{"time": 1257894000000000000, "temp_c": 30, "humidity": 70, "sensor_1": 1, "sensor_2": "2", "sensor_3": 3, "sensor_4": 100, "Device_id": "dev1"}
(1 row)
SELECT *
FROM insert_data('copy_t');
FROM insert_data('public."Hypertable_1"', 'copy_t');
insert_data
-------------
@ -168,7 +168,7 @@ SELECT * FROM PUBLIC.default_replica_node;
test2 | public."Hypertable_1" | 0
(2 rows)
\c test2
\c test2
\d+ PUBLIC."Hypertable_1"
Table "public.Hypertable_1"
Column | Type | Modifiers | Storage | Stats target | Description
@ -225,7 +225,7 @@ ALTER TABLE PUBLIC."Hypertable_1" RENAME COLUMN sensor_2 TO sensor_2_renamed;
ALTER TABLE PUBLIC."Hypertable_1" RENAME COLUMN sensor_3 TO sensor_3_renamed;
DROP INDEX "ind_sensor_1";
--expect error cases
\set ON_ERROR_STOP 0
\set ON_ERROR_STOP 0
ALTER TABLE PUBLIC."Hypertable_1" ALTER COLUMN sensor_2_renamed SET DATA TYPE int;
ALTER INDEX "ind_humidity" RENAME TO "ind_humdity2";
\set ON_ERROR_STOP 1
@ -272,7 +272,7 @@ SELECT * FROM _sys_1_._hyper_1_0_1_distinct_data;
Device_id | dev1
(2 rows)
\c Test1
\c Test1
\d+ PUBLIC."Hypertable_1"
Table "public.Hypertable_1"
Column | Type | Modifiers | Storage | Stats target | Description

View File

@ -75,7 +75,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------
@ -101,7 +101,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_2.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------
@ -119,7 +119,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev2_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------

View File

@ -75,7 +75,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------
@ -101,7 +101,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_2.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------
@ -119,7 +119,7 @@ FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev2_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
insert_data
-------------

View File

@ -36,7 +36,7 @@ SELECT *
FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
COMMIT;
SELECT close_chunk_end(c.id)
@ -49,7 +49,7 @@ SELECT *
FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev1_2.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
COMMIT;
\c test2
@ -58,7 +58,7 @@ SELECT *
FROM create_temp_copy_table('copy_t');
\COPY copy_t FROM 'data/ds1_dev2_1.tsv';
SELECT *
FROM insert_data('copy_t');
FROM insert_data('testNs', 'copy_t');
COMMIT;
\c Test1
@ -70,5 +70,3 @@ SELECT *
FROM "testNs"._hyper_1_0_replica;
SELECT *
FROM "testNs"._hyper_1_0_distinct;

View File

@ -1,5 +1,5 @@
33_testNs {"string_2":"one","device_id":"dev1","nUm_1":1.5,"num_2":1,"bool_1":true,"string_1":"const","time":1257894000000000000}
33_testNs {"string_2":"two","device_id":"dev1","nUm_1":1.5,"num_2":2,"bool_1":false,"string_1":"const","time":1257894000000000000}
33_testNs {"string_2":"three","device_id":"dev1","nUm_1":1.5,"num_2":3,"bool_1":true,"string_1":"const","time":1257894000000001000}
33_testNs {"string_2":"four","device_id":"dev1","nUm_1":1.5,"num_2":4,"bool_1":true,"string_1":"const","time":1257894001000000000}
33_testNs {"string_2":"five","device_id":"dev1","nUm_1":1.5,"num_2":5,"bool_1":false,"string_1":"const","time":1257897600000000000}
{"string_2":"one","device_id":"dev1","nUm_1":1.5,"num_2":1,"bool_1":true,"string_1":"const","time":1257894000000000000}
{"string_2":"two","device_id":"dev1","nUm_1":1.5,"num_2":2,"bool_1":false,"string_1":"const","time":1257894000000000000}
{"string_2":"three","device_id":"dev1","nUm_1":1.5,"num_2":3,"bool_1":true,"string_1":"const","time":1257894000000001000}
{"string_2":"four","device_id":"dev1","nUm_1":1.5,"num_2":4,"bool_1":true,"string_1":"const","time":1257894001000000000}
{"string_2":"five","device_id":"dev1","nUm_1":1.5,"num_2":5,"bool_1":false,"string_1":"const","time":1257897600000000000}

Can't render this file because it contains an unexpected character in line 1 and column 12.

View File

@ -1,2 +1,2 @@
33_testNs {"bool_1":true,"string_1":"const","string_2":"one","field_only_dev2":3,"device_id":"dev2","nUm_1":1.5,"num_2":9,"time":1257894000000000000}
33_testNs {"bool_1":true,"string_1":"const","string_2":"two","device_id":"dev2","nUm_1":1.5,"num_2":10,"time":1257894000000000000}
{"bool_1":true,"string_1":"const","string_2":"one","field_only_dev2":3,"device_id":"dev2","nUm_1":1.5,"num_2":9,"time":1257894000000000000}
{"bool_1":true,"string_1":"const","string_2":"two","device_id":"dev2","nUm_1":1.5,"num_2":10,"time":1257894000000000000}

Can't render this file because it contains an unexpected character in line 1 and column 12.

View File

@ -1,3 +1,3 @@
33_testNs {"num_2":6,"bool_1":false,"string_1":"const","string_2":"one","field_only_ref2":"one","device_id":"dev1","nUm_1":1.5,"time":1257987600000000000}
33_testNs {"num_2":7,"bool_1":true,"string_1":"const","string_2":"two","field_only_ref2":"two","device_id":"dev1","nUm_1":1.5,"time":1257987600000000000}
33_testNs {"num_2":8,"bool_1":true,"string_1":"const","string_2":"three","device_id":"dev1","nUm_1":1.5,"time":1257894002000000000}
{"num_2":6,"bool_1":false,"string_1":"const","string_2":"one","field_only_ref2":"one","device_id":"dev1","nUm_1":1.5,"time":1257987600000000000}
{"num_2":7,"bool_1":true,"string_1":"const","string_2":"two","field_only_ref2":"two","device_id":"dev1","nUm_1":1.5,"time":1257987600000000000}
{"num_2":8,"bool_1":true,"string_1":"const","string_2":"three","device_id":"dev1","nUm_1":1.5,"time":1257894002000000000}

Can't render this file because it contains an unexpected character in line 1 and column 12.

View File

@ -1,3 +1,3 @@
SELECT insert_data('test_input_data.batch1_dev1');
SELECT insert_data('test_input_data.batch1_dev2');
SELECT insert_data('test_input_data.batch2_dev1');
SELECT insert_data('33_testNs', 'test_input_data.batch1_dev1');
SELECT insert_data('33_testNs', 'test_input_data.batch1_dev2');
SELECT insert_data('33_testNs', 'test_input_data.batch2_dev1');

View File

@ -21,8 +21,6 @@ echo "Connecting to $POSTGRES_HOST as user $POSTGRES_USER and with db $INSTALL_D
cd $DIR
psql -U $POSTGRES_USER -h $POSTGRES_HOST -d $INSTALL_DB_MAIN -v ON_ERROR_STOP=1 <<EOF
SELECT insert_data('$1');
SELECT insert_data('33_testNs', '$1');
EOF
cd $PWD