Remove kafka-related code

This commit is contained in:
Erik Nordström 2017-01-23 14:04:33 +01:00
parent 188b1b39dd
commit 661b7cdc94
12 changed files with 2 additions and 465 deletions

View File

@ -57,7 +57,3 @@ sql/main/ioql_optimized_nonagg.sql
sql/main/ioql_sqlgen_cluster_nonagg.sql
sql/main/ioql_optimized.sql
sql/main/setup_main.sql
sql/main/kafka_offset_table.sql
sql/main/kafka_offset_node_trigger.sql
sql/main/kafka_offset_functions.sql
sql/main/setup_kafka.sql

View File

@ -1,64 +0,0 @@
CREATE OR REPLACE FUNCTION kafka_get_start_and_next_offset(
topic TEXT,
partition_number SMALLINT,
default_start_offset INTEGER,
OUT start_offset INTEGER,
OUT next_offset INTEGER
)
LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
BEGIN
SELECT clust.next_offset
INTO next_offset
FROM _iobeamdb_catalog.kafka_offset_cluster clust
WHERE clust.topic = kafka_get_start_and_next_offset.topic AND
clust.partition_number = kafka_get_start_and_next_offset.partition_number
ORDER BY next_offset DESC
LIMIT 1;
IF next_offset IS NULL THEN
next_offset := default_start_offset;
ELSE
SELECT clust.start_offset
INTO start_offset
FROM _iobeamdb_catalog.kafka_offset_cluster clust
WHERE clust.topic = kafka_get_start_and_next_offset.topic AND
clust.partition_number = kafka_get_start_and_next_offset.partition_number AND
clust.next_offset = kafka_get_start_and_next_offset.next_offset AND
clust.database_name = current_database();
IF start_offset IS NOT NULL THEN
RETURN;
END IF;
END IF;
start_offset := next_offset;
INSERT INTO _iobeamdb_catalog.kafka_offset_local (topic, partition_number, start_offset, next_offset, database_name)
VALUES (topic, partition_number, start_offset, next_offset, current_database());
RETURN;
END
$BODY$;
CREATE OR REPLACE FUNCTION kafka_set_next_offset(
topic TEXT,
partition_number SMALLINT,
start_offset INTEGER,
next_offset INTEGER
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
affected INTEGER;
BEGIN
UPDATE _iobeamdb_catalog.kafka_offset_local AS loc
SET next_offset = kafka_set_next_offset.next_offset
WHERE loc.topic = kafka_set_next_offset.topic AND
loc.partition_number = kafka_set_next_offset.partition_number AND
loc.start_offset = kafka_set_next_offset.start_offset;
GET DIAGNOSTICS affected = ROW_COUNT;
IF affected <> 1 THEN
RAISE EXCEPTION 'Rows affected not = 1. Affected: %', affected
USING ERRCODE = 'IO501';
END IF;
END
$BODY$;

View File

@ -1,67 +0,0 @@
CREATE OR REPLACE FUNCTION get_kafka_offset_remote_table_name(
remote_node _iobeamdb_catalog.node
)
RETURNS NAME LANGUAGE SQL IMMUTABLE AS
$BODY$
SELECT format('kafka_offset_remote_%s', remote_node.database_name) :: NAME;
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_node_insert_kafka_offset_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on node table'
USING ERRCODE = 'IO101';
END IF;
INSERT INTO _iobeamdb_catalog.kafka_offset_node (database_name, local_table_name, remote_table_name)
VALUES (NEW.database_name, 'kafka_offset_local', get_kafka_offset_remote_table_name(NEW));
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';
CREATE OR REPLACE FUNCTION create_remote_kafka_offset_table(
remote_node _iobeamdb_catalog.node,
remote_table_name NAME,
local_table_name NAME
)
RETURNS VOID LANGUAGE PLPGSQL VOLATILE AS
$BODY$
BEGIN
EXECUTE format(
$$
CREATE FOREIGN TABLE IF NOT EXISTS _iobeamdb_catalog.%1$I ()
INHERITS(_iobeamdb_catalog.kafka_offset_cluster) SERVER %2$I OPTIONS (schema_name '_iobeamdb_catalog', table_name '%3$I')
$$,
remote_table_name, remote_node.server_name, local_table_name);
END
$BODY$;
CREATE OR REPLACE FUNCTION _sysinternal.on_create_kafka_offset_node()
RETURNS TRIGGER LANGUAGE PLPGSQL AS
$BODY$
DECLARE
remote_node _iobeamdb_catalog.node;
BEGIN
IF TG_OP <> 'INSERT' THEN
RAISE EXCEPTION 'Only inserts supported on kafka offset table'
USING ERRCODE = 'IO101';
END IF;
IF NEW.database_name <> current_database() THEN
SELECT *
INTO STRICT remote_node
FROM _iobeamdb_catalog.node n
WHERE n.database_name = NEW.database_name;
PERFORM create_remote_kafka_offset_table(remote_node, NEW.remote_table_name, NEW.local_table_name);
END IF;
RETURN NEW;
END
$BODY$
SET SEARCH_PATH = 'public';

View File

@ -1,20 +0,0 @@
CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_cluster (
topic TEXT NOT NULL,
partition_number SMALLINT NOT NULL,
start_offset INTEGER NOT NULL,
next_offset INTEGER,
database_name NAME REFERENCES _iobeamdb_catalog.node (database_name) NOT NULL,
PRIMARY KEY (topic, partition_number, start_offset, database_name)
);
CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_local (
PRIMARY KEY (topic, partition_number, start_offset, database_name)
)
INHERITS (_iobeamdb_catalog.kafka_offset_cluster);
CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.kafka_offset_node (
database_name NAME REFERENCES _iobeamdb_catalog.node (database_name) NOT NULL,
local_table_name NAME,
remote_table_name NAME UNIQUE,
PRIMARY KEY (database_name)
);

View File

@ -1,21 +0,0 @@
-- Initializes kafka-related triggers on data nodes
CREATE OR REPLACE FUNCTION setup_kafka()
RETURNS void LANGUAGE PLPGSQL AS
$BODY$
DECLARE
BEGIN
DROP TRIGGER IF EXISTS trigger_on_create_node_insert_kafka_offset_node
ON _iobeamdb_catalog.node;
CREATE TRIGGER trigger_on_create_node_insert_kafka_offset_node AFTER INSERT OR UPDATE OR DELETE ON _iobeamdb_catalog.node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_node_insert_kafka_offset_node();
DROP TRIGGER IF EXISTS trigger_on_create_kafka_offset_node
ON _iobeamdb_catalog.kafka_offset_node;
CREATE TRIGGER trigger_on_create_kafka_offset_node AFTER INSERT OR UPDATE OR DELETE ON _iobeamdb_catalog.kafka_offset_node
FOR EACH ROW EXECUTE PROCEDURE _sysinternal.on_create_kafka_offset_node();
END
$BODY$
SET client_min_messages = WARNING --suppress if trigger does not exist on drop notices
;

View File

@ -1,29 +0,0 @@
#!/bin/bash
# To avoid pw writing, add localhost:5432:*:postgres:test to ~/.pgpass
set -u
set -e
PWD=`pwd`
DIR=`dirname $0`
POSTGRES_HOST=${POSTGRES_HOST:-localhost}
POSTGRES_USER=${POSTGRES_USER:-postgres}
if [ "$#" -ne 1 ]; then
echo "usage: $0 nodename"
exit 1
fi
INSTALL_DB=$1
echo "Connecting to $POSTGRES_HOST as user $POSTGRES_USER and with db $INSTALL_DB"
cd $DIR
psql -U $POSTGRES_USER -h $POSTGRES_HOST -v ON_ERROR_STOP=1 <<EOF
\c $INSTALL_DB
CREATE EXTENSION IF NOT EXISTS iobeamdb CASCADE;
select setup_kafka();
EOF
cd $PWD

View File

@ -1,4 +0,0 @@
\ir ../../main/kafka_offset_table.sql
\ir ../../main/kafka_offset_node_trigger.sql
\ir ../../main/kafka_offset_functions.sql
\ir ../../main/setup_kafka.sql

View File

@ -374,15 +374,12 @@ SELECT * FROM _iobeamdb_catalog.hypertable;
_iobeamdb_catalog | hypertable_column | table | postgres
_iobeamdb_catalog | hypertable_index | table | postgres
_iobeamdb_catalog | hypertable_replica | table | postgres
_iobeamdb_catalog | kafka_offset_cluster | table | postgres
_iobeamdb_catalog | kafka_offset_local | table | postgres
_iobeamdb_catalog | kafka_offset_node | table | postgres
_iobeamdb_catalog | meta | table | postgres
_iobeamdb_catalog | node | table | postgres
_iobeamdb_catalog | partition | table | postgres
_iobeamdb_catalog | partition_epoch | table | postgres
_iobeamdb_catalog | partition_replica | table | postgres
(21 rows)
(18 rows)
\dt+ "_sysinternal".*
List of relations
@ -431,15 +428,12 @@ SELECT * FROM _iobeamdb_catalog.hypertable;
_iobeamdb_catalog | hypertable_column | table | postgres
_iobeamdb_catalog | hypertable_index | table | postgres
_iobeamdb_catalog | hypertable_replica | table | postgres
_iobeamdb_catalog | kafka_offset_cluster | table | postgres
_iobeamdb_catalog | kafka_offset_local | table | postgres
_iobeamdb_catalog | kafka_offset_node | table | postgres
_iobeamdb_catalog | meta | table | postgres
_iobeamdb_catalog | node | table | postgres
_iobeamdb_catalog | partition | table | postgres
_iobeamdb_catalog | partition_epoch | table | postgres
_iobeamdb_catalog | partition_replica | table | postgres
(21 rows)
(18 rows)
\dt+ "_sysinternal".*
List of relations

View File

@ -1,132 +0,0 @@
psql:include/create_clustered_db.sql:12: NOTICE: installing required extension "dblink"
psql:include/create_clustered_db.sql:12: NOTICE: installing required extension "postgres_fdw"
psql:include/create_clustered_db.sql:12: NOTICE: installing required extension "hstore"
psql:include/create_clustered_db.sql:16: NOTICE: installing required extension "dblink"
psql:include/create_clustered_db.sql:16: NOTICE: installing required extension "postgres_fdw"
psql:include/create_clustered_db.sql:16: NOTICE: installing required extension "hstore"
psql:include/create_clustered_db.sql:20: NOTICE: installing required extension "dblink"
psql:include/create_clustered_db.sql:20: NOTICE: installing required extension "postgres_fdw"
psql:include/create_clustered_db.sql:20: NOTICE: installing required extension "hstore"
setup_kafka
-------------
(1 row)
setup_kafka
-------------
(1 row)
\c meta
SELECT add_cluster_user('postgres', NULL);
add_cluster_user
------------------
(1 row)
SELECT add_node('Test1' :: NAME, 'localhost');
add_node
----------
(1 row)
SELECT add_node('test2' :: NAME, 'localhost');
add_node
----------
(1 row)
\c Test1
\dt public.*
List of relations
Schema | Name | Type | Owner
--------+------+------+-------
(0 rows)
\det+ public.*
List of foreign tables
Schema | Table | Server | FDW Options | Description
--------+-------+--------+-------------+-------------
(0 rows)
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
start_offset | next_offset
--------------+-------------
0 | 0
(1 row)
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
start_offset | next_offset
--------------+-------------
0 | 0
(1 row)
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 0, 100);
kafka_set_next_offset
-----------------------
(1 row)
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
start_offset | next_offset
--------------+-------------
0 | 100
(1 row)
SELECT *
FROM kafka_get_start_and_next_offset('topic', 1 :: SMALLINT, 25);
start_offset | next_offset
--------------+-------------
25 | 25
(1 row)
SELECT *
FROM kafka_get_start_and_next_offset('topic2', 0 :: SMALLINT, 29);
start_offset | next_offset
--------------+-------------
29 | 29
(1 row)
\c test2
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
start_offset | next_offset
--------------+-------------
100 | 100
(1 row)
\set ON_ERROR_STOP 0
SET client_min_messages TO FATAL;
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 0, 101);
SET client_min_messages TO NOTICE;
\set ON_ERROR_STOP 1
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 100, 101);
kafka_set_next_offset
-----------------------
(1 row)
\c Test1
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
start_offset | next_offset
--------------+-------------
101 | 101
(1 row)
SELECT *
FROM _iobeamdb_catalog.kafka_offset_local;
topic | partition_number | start_offset | next_offset | database_name
--------+------------------+--------------+-------------+---------------
topic | 0 | 0 | 100 | Test1
topic | 1 | 25 | 25 | Test1
topic2 | 0 | 29 | 29 | Test1
topic | 0 | 101 | 101 | Test1
(4 rows)

View File

@ -1,52 +0,0 @@
\set ON_ERROR_STOP 1
\o /dev/null
\ir include/create_clustered_db.sql
\o
\c Test1
select setup_kafka();
\c test2
select setup_kafka();
\set ECHO ALL
\c meta
SELECT add_cluster_user('postgres', NULL);
SELECT add_node('Test1' :: NAME, 'localhost');
SELECT add_node('test2' :: NAME, 'localhost');
\c Test1
\dt public.*
\det+ public.*
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 0, 100);
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
SELECT *
FROM kafka_get_start_and_next_offset('topic', 1 :: SMALLINT, 25);
SELECT *
FROM kafka_get_start_and_next_offset('topic2', 0 :: SMALLINT, 29);
\c test2
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
\set ON_ERROR_STOP 0
SET client_min_messages TO FATAL;
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 0, 101);
SET client_min_messages TO NOTICE;
\set ON_ERROR_STOP 1
SELECT *
FROM kafka_set_next_offset('topic', 0 :: SMALLINT, 100, 101);
\c Test1
SELECT *
FROM kafka_get_start_and_next_offset('topic', 0 :: SMALLINT, 0);
SELECT *
FROM _iobeamdb_catalog.kafka_offset_local;

View File

@ -24,7 +24,6 @@ if [ $RESET_POSTGRES_DB == "true" ]; then
for node in $NODES; do
$DB_SETUPDIR/setup_node.sh $node
$DB_SETUPDIR/setup_kafka.sh $node
$DB_SETUPDIR/add_node.sh $node $POSTGRES_HOST
done

View File

@ -1,63 +0,0 @@
DROP FUNCTION IF EXISTS unit_tests.kafka_get_start_and_next_offset_test();
CREATE FUNCTION unit_tests.kafka_get_start_and_next_offset_test()
RETURNS test_result
AS
$$
DECLARE
message test_result;
start_offset_var integer;
next_offset_var integer;
DEFAULT_START_OFFSET integer;
BEGIN
DEFAULT_START_OFFSET := 42;
SELECT start_offset, next_offset FROM kafka_get_start_and_next_offset('topic'::text, 0::SMALLINT, DEFAULT_START_OFFSET)
INTO start_offset_var, next_offset_var;
IF start_offset_var != DEFAULT_START_OFFSET THEN
SELECT assert.fail('Bad default start offset.') INTO message;
RETURN message;
END IF;
IF next_offset_var != DEFAULT_START_OFFSET THEN
SELECT assert.fail('Bad initial next_offset.') INTO message;
RETURN message;
END IF;
PERFORM kafka_set_next_offset(
topic => 'topic'::text,
partition_number => 0::SMALLINT,
start_offset => DEFAULT_START_OFFSET,
next_offset => DEFAULT_START_OFFSET + 1
);
SELECT start_offset, next_offset FROM kafka_get_start_and_next_offset('topic'::text, 0::SMALLINT, DEFAULT_START_OFFSET)
INTO start_offset_var, next_offset_var;
IF start_offset_var != DEFAULT_START_OFFSET THEN
SELECT assert.fail('Bad start offset after update.') INTO message;
RETURN message;
END IF;
IF next_offset_var != DEFAULT_START_OFFSET + 1 THEN
SELECT assert.fail('Bad next offset after update.') INTO message;
RETURN message;
END IF;
BEGIN
PERFORM kafka_set_next_offset(
topic => 'newtopic'::text,
partition_number => 0::SMALLINT,
start_offset => DEFAULT_START_OFFSET,
next_offset => DEFAULT_START_OFFSET + 1
);
EXCEPTION
WHEN sqlstate 'IO501' THEN
RAISE NOTICE 'right exception thrown';
END;
SELECT assert.ok('End of test.') INTO message;
RETURN message;
END
$$
LANGUAGE plpgsql;