From 661b7cdc94c96faaed381fef9b5923a551e06df7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Erik=20Nordstro=CC=88m?= Date: Mon, 23 Jan 2017 14:04:33 +0100 Subject: [PATCH] Remove kafka-related code --- extension/sql/load_order.txt | 4 - extension/sql/main/kafka_offset_functions.sql | 64 --------- .../sql/main/kafka_offset_node_trigger.sql | 67 --------- extension/sql/main/kafka_offset_table.sql | 20 --- extension/sql/main/setup_kafka.sql | 21 --- extension/sql/setup/setup_kafka.sh | 29 ---- extension/sql/setup/sql/load_kafka.sql | 4 - .../regression/expected/drop_hypertable.out | 10 +- .../sql/tests/regression/expected/kafka.out | 132 ------------------ extension/sql/tests/regression/kafka.sql | 52 ------- extension/sql/tests/unit/run.sh | 1 - .../tests/kafka_offset_functions_test.sql | 63 --------- 12 files changed, 2 insertions(+), 465 deletions(-) delete mode 100644 extension/sql/main/kafka_offset_functions.sql delete mode 100644 extension/sql/main/kafka_offset_node_trigger.sql delete mode 100644 extension/sql/main/kafka_offset_table.sql delete mode 100644 extension/sql/main/setup_kafka.sql delete mode 100755 extension/sql/setup/setup_kafka.sh delete mode 100644 extension/sql/setup/sql/load_kafka.sql delete mode 100644 extension/sql/tests/regression/expected/kafka.out delete mode 100644 extension/sql/tests/regression/kafka.sql delete mode 100644 extension/sql/tests/unit/tests/kafka_offset_functions_test.sql diff --git a/extension/sql/load_order.txt b/extension/sql/load_order.txt index 0003b72b4..d11a1e0c8 100644 --- a/extension/sql/load_order.txt +++ b/extension/sql/load_order.txt @@ -57,7 +57,3 @@ sql/main/ioql_optimized_nonagg.sql sql/main/ioql_sqlgen_cluster_nonagg.sql sql/main/ioql_optimized.sql sql/main/setup_main.sql -sql/main/kafka_offset_table.sql -sql/main/kafka_offset_node_trigger.sql -sql/main/kafka_offset_functions.sql -sql/main/setup_kafka.sql diff --git a/extension/sql/main/kafka_offset_functions.sql b/extension/sql/main/kafka_offset_functions.sql deleted file mode 100644 index 2d1cb3c51..000000000 --- a/extension/sql/main/kafka_offset_functions.sql +++ /dev/null @@ -1,64 +0,0 @@ -CREATE OR REPLACE FUNCTION kafka_get_start_and_next_offset( - topic TEXT, - partition_number SMALLINT, - default_start_offset INTEGER, - OUT start_offset INTEGER, - OUT next_offset INTEGER -) -LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -DECLARE -BEGIN - SELECT clust.next_offset - INTO next_offset - FROM _iobeamdb_catalog.kafka_offset_cluster clust - WHERE clust.topic = kafka_get_start_and_next_offset.topic AND - clust.partition_number = kafka_get_start_and_next_offset.partition_number - ORDER BY next_offset DESC - LIMIT 1; - - IF next_offset IS NULL THEN - next_offset := default_start_offset; - ELSE - SELECT clust.start_offset - INTO start_offset - FROM _iobeamdb_catalog.kafka_offset_cluster clust - WHERE clust.topic = kafka_get_start_and_next_offset.topic AND - clust.partition_number = kafka_get_start_and_next_offset.partition_number AND - clust.next_offset = kafka_get_start_and_next_offset.next_offset AND - clust.database_name = current_database(); - IF start_offset IS NOT NULL THEN - RETURN; - END IF; - END IF; - - start_offset := next_offset; - INSERT INTO _iobeamdb_catalog.kafka_offset_local (topic, partition_number, start_offset, next_offset, database_name) - VALUES (topic, partition_number, start_offset, next_offset, current_database()); - RETURN; -END -$BODY$; - -CREATE OR REPLACE FUNCTION kafka_set_next_offset( - topic TEXT, - partition_number SMALLINT, - start_offset INTEGER, - next_offset INTEGER -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -DECLARE - affected INTEGER; -BEGIN - UPDATE _iobeamdb_catalog.kafka_offset_local AS loc - SET next_offset = kafka_set_next_offset.next_offset - WHERE loc.topic = kafka_set_next_offset.topic AND - loc.partition_number = kafka_set_next_offset.partition_number AND - loc.start_offset = kafka_set_next_offset.start_offset; - GET DIAGNOSTICS affected = ROW_COUNT; - IF affected <> 1 THEN - RAISE EXCEPTION 'Rows affected not = 1. Affected: %', affected - USING ERRCODE = 'IO501'; - END IF; -END -$BODY$; diff --git a/extension/sql/main/kafka_offset_node_trigger.sql b/extension/sql/main/kafka_offset_node_trigger.sql deleted file mode 100644 index 141e9f968..000000000 --- a/extension/sql/main/kafka_offset_node_trigger.sql +++ /dev/null @@ -1,67 +0,0 @@ -CREATE OR REPLACE FUNCTION get_kafka_offset_remote_table_name( - remote_node _iobeamdb_catalog.node -) - RETURNS NAME LANGUAGE SQL IMMUTABLE AS -$BODY$ -SELECT format('kafka_offset_remote_%s', remote_node.database_name) :: NAME; -$BODY$; - - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_node_insert_kafka_offset_node() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on node table' - USING ERRCODE = 'IO101'; - END IF; - - INSERT INTO _iobeamdb_catalog.kafka_offset_node (database_name, local_table_name, remote_table_name) - VALUES (NEW.database_name, 'kafka_offset_local', get_kafka_offset_remote_table_name(NEW)); - - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; - -CREATE OR REPLACE FUNCTION create_remote_kafka_offset_table( - remote_node _iobeamdb_catalog.node, - remote_table_name NAME, - local_table_name NAME -) - RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS -$BODY$ -BEGIN - EXECUTE format( - $$ - CREATE FOREIGN TABLE IF NOT EXISTS _iobeamdb_catalog.%1$I () - INHERITS(_iobeamdb_catalog.kafka_offset_cluster) SERVER %2$I OPTIONS (schema_name '_iobeamdb_catalog', table_name '%3$I') - $$, - remote_table_name, remote_node.server_name, local_table_name); -END -$BODY$; - - -CREATE OR REPLACE FUNCTION _sysinternal.on_create_kafka_offset_node() - RETURNS TRIGGER LANGUAGE PLPGSQL AS -$BODY$ -DECLARE - remote_node _iobeamdb_catalog.node; -BEGIN - IF TG_OP <> 'INSERT' THEN - RAISE EXCEPTION 'Only inserts supported on kafka offset table' - USING ERRCODE = 'IO101'; - END IF; - - IF NEW.database_name <> current_database() THEN - SELECT * - INTO STRICT remote_node - FROM _iobeamdb_catalog.node n - WHERE n.database_name = NEW.database_name; - - PERFORM create_remote_kafka_offset_table(remote_node, NEW.remote_table_name, NEW.local_table_name); - END IF; - RETURN NEW; -END -$BODY$ -SET SEARCH_PATH = 'public'; diff --git a/extension/sql/main/kafka_offset_table.sql b/extension/sql/main/kafka_offset_table.sql deleted file mode 100644 index 84683673e..000000000 --- a/extension/sql/main/kafka_offset_table.sql +++ /dev/null @@ -1,20 +0,0 @@ -CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_cluster ( - topic TEXT NOT NULL, - partition_number SMALLINT NOT NULL, - start_offset INTEGER NOT NULL, - next_offset INTEGER, - database_name NAME REFERENCES _iobeamdb_catalog.node (database_name) NOT NULL, - PRIMARY KEY (topic, partition_number, start_offset, database_name) -); - -CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_local ( - PRIMARY KEY (topic, partition_number, start_offset, database_name) -) - INHERITS (_iobeamdb_catalog.kafka_offset_cluster); - -CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_node ( - database_name NAME REFERENCES _iobeamdb_catalog.node (database_name) NOT NULL, - local_table_name NAME, - remote_table_name NAME UNIQUE, - PRIMARY KEY (database_name) -); diff --git a/extension/sql/main/setup_kafka.sql b/extension/sql/main/setup_kafka.sql deleted file mode 100644 index 1aaf1395c..000000000 --- a/extension/sql/main/setup_kafka.sql +++ /dev/null @@ -1,21 +0,0 @@ --- Initializes kafka-related triggers on data nodes -CREATE OR REPLACE FUNCTION setup_kafka() - RETURNS void LANGUAGE PLPGSQL AS -$BODY$ -DECLARE -BEGIN - - DROP TRIGGER IF EXISTS trigger_on_create_node_insert_kafka_offset_node - ON _iobeamdb_catalog.node; - CREATE TRIGGER trigger_on_create_node_insert_kafka_offset_node AFTER INSERT OR UPDATE OR DELETE ON _iobeamdb_catalog.node - FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_node_insert_kafka_offset_node(); - - DROP TRIGGER IF EXISTS trigger_on_create_kafka_offset_node - ON _iobeamdb_catalog.kafka_offset_node; - CREATE TRIGGER trigger_on_create_kafka_offset_node AFTER INSERT OR UPDATE OR DELETE ON _iobeamdb_catalog.kafka_offset_node - FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_kafka_offset_node(); - -END -$BODY$ -SET client_min_messages = WARNING --suppress if trigger does not exist on drop notices -; diff --git a/extension/sql/setup/setup_kafka.sh b/extension/sql/setup/setup_kafka.sh deleted file mode 100755 index 924a45b03..000000000 --- a/extension/sql/setup/setup_kafka.sh +++ /dev/null @@ -1,29 +0,0 @@ -#!/bin/bash - -# To avoid pw writing, add localhost:5432:*:postgres:test to ~/.pgpass -set -u -set -e - -PWD=`pwd` -DIR=`dirname $0` - -POSTGRES_HOST=${POSTGRES_HOST:-localhost} -POSTGRES_USER=${POSTGRES_USER:-postgres} - -if [ "$#" -ne 1 ]; then - echo "usage: $0 nodename" - exit 1 -fi - -INSTALL_DB=$1 - -echo "Connecting to $POSTGRES_HOST as user $POSTGRES_USER and with db $INSTALL_DB" - -cd $DIR -psql -U $POSTGRES_USER -h $POSTGRES_HOST -v ON_ERROR_STOP=1 < 'topic'::text, - partition_number => 0::SMALLINT, - start_offset => DEFAULT_START_OFFSET, - next_offset => DEFAULT_START_OFFSET + 1 - ); - - SELECT start_offset, next_offset FROM kafka_get_start_and_next_offset('topic'::text, 0::SMALLINT, DEFAULT_START_OFFSET) - INTO start_offset_var, next_offset_var; - - IF start_offset_var != DEFAULT_START_OFFSET THEN - SELECT assert.fail('Bad start offset after update.') INTO message; - RETURN message; - END IF; - - IF next_offset_var != DEFAULT_START_OFFSET + 1 THEN - SELECT assert.fail('Bad next offset after update.') INTO message; - RETURN message; - END IF; - - BEGIN - PERFORM kafka_set_next_offset( - topic => 'newtopic'::text, - partition_number => 0::SMALLINT, - start_offset => DEFAULT_START_OFFSET, - next_offset => DEFAULT_START_OFFSET + 1 - ); - EXCEPTION - WHEN sqlstate 'IO501' THEN - RAISE NOTICE 'right exception thrown'; - END; - - SELECT assert.ok('End of test.') INTO message; - RETURN message; -END -$$ -LANGUAGE plpgsql;