diff --git a/sql/main/chunk.sql b/sql/main/chunk.sql index 3fa56e217..7f44b4a70 100644 --- a/sql/main/chunk.sql +++ b/sql/main/chunk.sql @@ -60,7 +60,7 @@ $BODY$; --closes the given chunk if it is over the size limit set for the hypertable --it belongs to. CREATE OR REPLACE FUNCTION _iobeamdb_internal.close_chunk_if_needed( - chunk_row _iobeamdb_catalog.chunk + chunk_id INTEGER ) RETURNS boolean LANGUAGE PLPGSQL VOLATILE AS $BODY$ @@ -68,17 +68,19 @@ DECLARE chunk_size BIGINT; chunk_max_size BIGINT; BEGIN - chunk_size := _iobeamdb_data_api.get_chunk_size(chunk_row.id); - chunk_max_size := _iobeamdb_internal.get_chunk_max_size(chunk_row.id); + chunk_size := _iobeamdb_data_api.get_chunk_size(chunk_id); + chunk_max_size := _iobeamdb_internal.get_chunk_max_size(chunk_id); - IF chunk_row.end_time IS NOT NULL OR (NOT chunk_size >= chunk_max_size) THEN - RETURN FALSE; + IF chunk_size >= chunk_max_size THEN + --This should use the non-transactional rpc because this needs to + --commit before we can take a lock for writing on the closed chunk. + --That means this operation is not transactional with the insert + --and will not be rolled back. + PERFORM _iobeamdb_meta_api.close_chunk_end_immediate(chunk_id); + RETURN TRUE; END IF; - --This should use the non-transactional rpc because this needs to commit before we can take a lock - --for writing on the closed chunk. That means this operation is not transactional with the insert and will not be rolled back. - PERFORM _iobeamdb_meta_api.close_chunk_end_immediate(chunk_row.id); - return TRUE; + RETURN FALSE; END $BODY$; diff --git a/sql/main/insert.sql b/sql/main/insert.sql index f16fa592e..6559c78be 100644 --- a/sql/main/insert.sql +++ b/sql/main/insert.sql @@ -93,6 +93,7 @@ DECLARE point_record_query_sql TEXT; point_record RECORD; chunk_row _iobeamdb_catalog.chunk; + chunk_id INT; crn_record RECORD; hypertable_row RECORD; partition_constraint_where_clause TEXT = ''; @@ -140,15 +141,19 @@ BEGIN USING ERRCODE = 'IO501'; END IF; + --Create a temp table to collect all the chunks we insert into. We might + --need to close the chunks at the end of the transaction. + CREATE TEMP TABLE IF NOT EXISTS insert_chunks(LIKE _iobeamdb_catalog.chunk) ON COMMIT DROP; + + --We need to truncate the table if it already existed due to calling this + --function twice in a single transaction. + TRUNCATE TABLE insert_chunks; + WHILE point_record.time IS NOT NULL LOOP - --Get the chunk we should insert into - chunk_row := get_or_create_chunk(point_record.partition_id, point_record.time); - - --Check if the chunk should be closed (must be done without lock on chunk). - PERFORM _iobeamdb_internal.close_chunk_if_needed(chunk_row); - - --Get a chunk with lock - chunk_row := get_or_create_chunk(point_record.partition_id, point_record.time, TRUE); + --Get a chunk with SHARE lock + INSERT INTO insert_chunks + SELECT * FROM get_or_create_chunk(point_record.partition_id, point_record.time, TRUE) + RETURNING * INTO chunk_row; IF point_record.partitioning_column IS NOT NULL THEN --if we are inserting across more than one partition, @@ -172,17 +177,16 @@ BEGIN END IF; --Do insert on all chunk replicas - - SELECT string_agg(insert_stmt, ',') - INTO insert_sql - FROM ( + SELECT string_agg(insert_stmt, ',') + INTO insert_sql + FROM ( SELECT format('i_%s AS (INSERT INTO %I.%I (%s) SELECT * FROM selected)', row_number() OVER(), crn.schema_name, crn.table_name, column_list) insert_stmt FROM _iobeamdb_catalog.chunk_replica_node crn WHERE (crn.chunk_id = chunk_row.id) - ) AS parts; + ) AS parts; - EXECUTE format( + EXECUTE format( $$ WITH selected AS ( @@ -206,6 +210,16 @@ BEGIN USING ERRCODE = 'IO501'; END IF; END LOOP; + + --Loop through all open chunks that were inserted into, closing + --if needed. Do it in ID order to avoid deadlocks. + FOR chunk_id IN + SELECT c.id FROM insert_chunks cl + INNER JOIN _iobeamdb_catalog.chunk c ON cl.id = c.id + WHERE c.end_time IS NULL ORDER BY cl.id DESC + LOOP + PERFORM _iobeamdb_internal.close_chunk_if_needed(chunk_id); + END LOOP; END $BODY$; diff --git a/sql/meta/chunk.sql b/sql/meta/chunk.sql index a498bde1e..c4c020fba 100644 --- a/sql/meta/chunk.sql +++ b/sql/meta/chunk.sql @@ -56,10 +56,10 @@ BEGIN END $BODY$; ---creates chunk. Must be called after aquiring a lock on partition. +--creates chunk. CREATE OR REPLACE FUNCTION _iobeamdb_meta.create_chunk_unlocked( - partition_id INT, - time_point BIGINT + part_id INT, + time_point BIGINT ) RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS $BODY$ @@ -69,10 +69,14 @@ DECLARE BEGIN SELECT * INTO table_start, table_end - FROM _iobeamdb_meta.calculate_new_chunk_times(partition_id, time_point); + FROM _iobeamdb_meta.calculate_new_chunk_times(part_id, time_point); + --INSERT on chunk implies SHARE lock on partition row due to foreign key. + --If the insert conflicts, it means another transaction created the chunk + --before us, and we can safely ignore the error. INSERT INTO _iobeamdb_catalog.chunk (partition_id, start_time, end_time) - VALUES (partition_id, table_start, table_end); + VALUES (part_id, table_start, table_end) + ON CONFLICT (partition_id, start_time) DO NOTHING; END $BODY$; @@ -87,13 +91,6 @@ DECLARE chunk_row _iobeamdb_catalog.chunk; partition_row _iobeamdb_catalog.partition; BEGIN - --get lock - SELECT * - INTO partition_row - FROM _iobeamdb_catalog.partition - WHERE id = partition_id - FOR UPDATE; - --recheck: chunk_row := _iobeamdb_internal.get_chunk(partition_id, time_point); @@ -169,14 +166,7 @@ BEGIN RETURN; END IF; - --get partition lock - SELECT * - INTO partition_row - FROM _iobeamdb_catalog.partition - WHERE id = chunk_row.partition_id - FOR UPDATE; - - --PHASE 1: lock chunk row on all rows (prevents concurrent chunk insert) + --PHASE 1: lock chunk row on all nodes (prevents concurrent chunk insert) FOR node_row IN SELECT * FROM _iobeamdb_catalog.node n diff --git a/test/expected/drop_chunks.out b/test/expected/drop_chunks.out index 0959f016c..3c2c35e0f 100644 --- a/test/expected/drop_chunks.out +++ b/test/expected/drop_chunks.out @@ -92,13 +92,13 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 7 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_7_data | | 1 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 (12 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; @@ -160,12 +160,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 (10 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; @@ -222,12 +222,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 (9 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; diff --git a/test/expected/drop_hypertable.out b/test/expected/drop_hypertable.out index 4b5d951f9..56e227e7b 100644 --- a/test/expected/drop_hypertable.out +++ b/test/expected/drop_hypertable.out @@ -101,7 +101,7 @@ Inherits: _iobeamdb_internal._hyper_2_2_0_partition device_id | text | | extended | | Check constraints: "partition" CHECK (_iobeamdb_catalog.get_partition_for_key(device_id, 32768) >= '0'::smallint AND _iobeamdb_catalog.get_partition_for_key(device_id, 32768) <= '32767'::smallint) - "time_range" CHECK ("time" >= '3'::bigint) NOT VALID + "time_range" CHECK ("time" >= '3'::bigint AND "time" <= '3'::bigint) NOT VALID Inherits: _iobeamdb_internal._hyper_2_2_0_partition Table "_iobeamdb_internal._hyper_2_2_0_partition" diff --git a/test/expected/insert.out b/test/expected/insert.out index 6ac364625..708d7d4e8 100644 --- a/test/expected/insert.out +++ b/test/expected/insert.out @@ -88,7 +88,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c ----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------ 4 | 3 | | 1 | 4 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_4_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 5 | 3 | 2 | 2 | 5 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_5_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 - 6 | 3 | 3 | | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 + 6 | 3 | 3 | 3 | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 (3 rows) \d+ "_iobeamdb_internal".* @@ -349,7 +349,7 @@ Inherits: _iobeamdb_internal._hyper_2_3_0_partition device_id | text | | extended | | Check constraints: "partition" CHECK (_iobeamdb_catalog.get_partition_for_key(device_id, 32768) >= '0'::smallint AND _iobeamdb_catalog.get_partition_for_key(device_id, 32768) <= '32767'::smallint) - "time_range" CHECK ("time" >= '3'::bigint) NOT VALID + "time_range" CHECK ("time" >= '3'::bigint AND "time" <= '3'::bigint) NOT VALID Inherits: _iobeamdb_internal._hyper_2_3_0_partition Table "_iobeamdb_internal._hyper_2_3_0_partition" @@ -398,7 +398,7 @@ SELECT * FROM _iobeamdb_catalog.chunk; 3 | 2 | | 4 | 3 | | 1 5 | 3 | 2 | 2 - 6 | 3 | 3 | + 6 | 3 | 3 | 3 (6 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; diff --git a/test/expected/insert_single.out b/test/expected/insert_single.out index b8d1188bb..987d0fce7 100644 --- a/test/expected/insert_single.out +++ b/test/expected/insert_single.out @@ -89,7 +89,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c ----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------ 3 | 2 | | 1 | 3 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_3_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 4 | 2 | 2 | 2 | 4 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_4_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 - 5 | 2 | 3 | | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 + 5 | 2 | 3 | 3 | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 (3 rows) \c single diff --git a/test/expected/tablespace.out b/test/expected/tablespace.out index ef1f0bc7a..e88d8a537 100644 --- a/test/expected/tablespace.out +++ b/test/expected/tablespace.out @@ -19,7 +19,7 @@ SELECT setup_single_node(hostname => 'fakehost'); -- fakehost makes sure there i \set ECHO ALL \c single \set ON_ERROR_STOP 0 -SET client_min_messages = WARNING; +SET client_min_messages = ERROR; drop tablespace if exists tspace1; SET client_min_messages = NOTICE; \set VERBOSITY verbose diff --git a/test/sql/tablespace.sql b/test/sql/tablespace.sql index 2fb0f2fa7..32a4e0960 100644 --- a/test/sql/tablespace.sql +++ b/test/sql/tablespace.sql @@ -9,7 +9,7 @@ \set ON_ERROR_STOP 0 -SET client_min_messages = WARNING; +SET client_min_messages = ERROR; drop tablespace if exists tspace1; SET client_min_messages = NOTICE;