Allow partitioning in only time dimension

This allows creating hypertables as follows:
```
CREATE TABLE only_1dim(time timestamp, temp float);
SELECT create_hypertable('only_1dim', 'time');
INSERT INTO only_1dim VALUES('2017-01-12T08:11:03', 23.4);
```

It is implemented by making the specification of a partitioning column
optional, and when NULL the number of partitions will be set to 1.
This commit is contained in:
Erik Nordström 2017-01-24 12:53:26 +01:00
parent 4a6e707bdd
commit d87a6ee5a0
12 changed files with 193 additions and 87 deletions

View File

@ -152,9 +152,9 @@ CREATE TABLE IF NOT EXISTS _iobeamdb_catalog.partition_epoch (
hypertable_id INTEGER NOT NULL REFERENCES _iobeamdb_catalog.hypertable(id) ON DELETE CASCADE,
start_time BIGINT NULL CHECK (start_time > 0),
end_time BIGINT NULL CHECK (end_time > 0),
partitioning_func NAME NOT NULL, --function name of a function of the form func(data_value, partitioning_mod) -> [0, partitioning_mod)
partitioning_func NAME NULL, --function name of a function of the form func(data_value, partitioning_mod) -> [0, partitioning_mod)
partitioning_mod INT NOT NULL CHECK (partitioning_mod < 65536),
partitioning_column NAME NOT NULL,
partitioning_column NAME NULL,
UNIQUE (hypertable_id, start_time),
UNIQUE (hypertable_id, end_time),
CHECK (start_time < end_time)

View File

@ -12,7 +12,7 @@
CREATE OR REPLACE FUNCTION create_hypertable(
main_table REGCLASS,
time_column_name NAME,
partitioning_column NAME,
partitioning_column NAME = NULL,
replication_factor SMALLINT = 1,
number_partitions SMALLINT = NULL,
associated_schema_name NAME = NULL,
@ -46,19 +46,21 @@ BEGIN
RAISE EXCEPTION 'column "%" does not exist', time_column_name
USING ERRCODE = 'IO102';
END;
IF time_column_type NOT IN ('BIGINT', 'INTEGER', 'SMALLINT', 'TIMESTAMP', 'TIMESTAMPTZ') THEN
RAISE EXCEPTION 'illegal type for time column "%": %', time_column_name, time_column_type
USING ERRCODE = 'IO102';
END IF;
PERFORM atttypid
FROM pg_attribute
WHERE attrelid = main_table AND attname = partitioning_column;
IF partitioning_column IS NOT NULL THEN
PERFORM atttypid
FROM pg_attribute
WHERE attrelid = main_table AND attname = partitioning_column;
IF NOT FOUND THEN
RAISE EXCEPTION 'column "%" does not exist', partitioning_column
USING ERRCODE = 'IO102';
IF NOT FOUND THEN
RAISE EXCEPTION 'column "%" does not exist', partitioning_column
USING ERRCODE = 'IO102';
END IF;
END IF;
EXECUTE format('SELECT TRUE FROM %s LIMIT 1', main_table) INTO main_table_has_items;

View File

@ -24,19 +24,25 @@ $BODY$
DECLARE
partition_row _iobeamdb_catalog.partition;
BEGIN
EXECUTE format(
$$
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = %L AND
%s((SELECT row.%I FROM (SELECT (%L::%s).*) as row), %L)
BETWEEN p.keyspace_start AND p.keyspace_end
$$,
epoch.id, epoch.partitioning_func,
epoch.partitioning_column,
copy_record, copy_table_name, epoch.partitioning_mod)
INTO STRICT partition_row;
IF epoch.partitioning_func IS NULL THEN
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = epoch.id
INTO STRICT partition_row;
ELSE
EXECUTE format(
$$
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = %L AND
%s((SELECT row.%I FROM (SELECT (%L::%s).*) as row)::TEXT, %L)
BETWEEN p.keyspace_start AND p.keyspace_end
$$,
epoch.id, epoch.partitioning_func,
epoch.partitioning_column,
copy_record, copy_table_name, epoch.partitioning_mod)
INTO STRICT partition_row;
END IF;
RETURN partition_row;
END
$BODY$;
@ -184,30 +190,45 @@ BEGIN
END LOOP;
PERFORM set_config('io.ignore_delete_in_trigger', 'true', true);
EXECUTE format(
$$
WITH selected AS
(
DELETE FROM ONLY %2$s
WHERE (%7$I >= %3$s OR %3$s IS NULL) AND (%7$I <= %4$s OR %4$s IS NULL) AND
(%8$s(%9$I, %10$L) BETWEEN %11$L AND %12$L)
RETURNING *
)%5$s
INSERT INTO %1$s (%6$s) SELECT %6$s FROM selected;
$$,
format('%I.%I', crn_record.schema_name, crn_record.table_name) :: REGCLASS,
copy_table_oid,
_iobeamdb_internal.time_literal_sql(chunk_row.start_time, point_record.time_column_type),
_iobeamdb_internal.time_literal_sql(chunk_row.end_time, point_record.time_column_type),
distinct_clauses,
_iobeamdb_internal.get_column_list(hypertable_id),
point_record.time_column_name,
point_record.partitioning_func,
point_record.partitioning_column,
point_record.partitioning_mod,
point_record.keyspace_start,
point_record.keyspace_end
);
DECLARE
partition_constraint_where_clause TEXT = '';
BEGIN
IF point_record.partitioning_column IS NOT NULL THEN
--if we are inserting across more than one partition,
--construct a WHERE clause constraint that SELECTs only
--values from copy table that match the current partition
partition_constraint_where_clause := format(
$$
WHERE (%1$I >= %2$s OR %2$s IS NULL) AND (%1$I <= %3$s OR %3$s IS NULL) AND
(%4$s(%5$I::TEXT, %6$L) BETWEEN %7$L AND %8$L)
$$,
point_record.time_column_name,
_iobeamdb_internal.time_literal_sql(chunk_row.start_time, point_record.time_column_type),
_iobeamdb_internal.time_literal_sql(chunk_row.end_time, point_record.time_column_type),
point_record.partitioning_func,
point_record.partitioning_column,
point_record.partitioning_mod,
point_record.keyspace_start,
point_record.keyspace_end
);
END IF;
EXECUTE format(
$$
WITH selected AS
(
DELETE FROM ONLY %1$s %2$s
RETURNING *
)%3$s
INSERT INTO %4$s (%5$s) SELECT %5$s FROM selected;
$$,
copy_table_oid,
partition_constraint_where_clause,
distinct_clauses,
format('%I.%I', crn_record.schema_name, crn_record.table_name) :: REGCLASS,
_iobeamdb_internal.get_column_list(hypertable_id)
);
END;
END LOOP;
EXECUTE point_record_query_sql

View File

@ -60,15 +60,21 @@ $BODY$
DECLARE
partition_row _iobeamdb_catalog.partition;
BEGIN
EXECUTE format(
$$
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = %L AND
%s(%L, %L) BETWEEN p.keyspace_start AND p.keyspace_end
$$, epoch.id, epoch.partitioning_func, key_value, epoch.partitioning_mod)
INTO STRICT partition_row;
IF epoch.partitioning_column IS NULL THEN
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = epoch.id
INTO STRICT partition_row;
ELSE
EXECUTE format(
$$
SELECT p.*
FROM _iobeamdb_catalog.partition p
WHERE p.epoch_id = %L AND
%s(%L::TEXT, %L) BETWEEN p.keyspace_start AND p.keyspace_end
$$, epoch.id, epoch.partitioning_func, key_value, epoch.partitioning_mod)
INTO STRICT partition_row;
END IF;
RETURN partition_row;
END
$BODY$;

View File

@ -231,15 +231,16 @@ BEGIN
FROM _iobeamdb_catalog.partition_epoch pe
WHERE pe.id = epoch_id;
EXECUTE format(
$$
ALTER TABLE %1$I.%2$I
ADD CONSTRAINT partition CHECK(%3$s(%4$I::text, %5$L) BETWEEN %6$L AND %7$L)
$$,
schema_name, table_name,
epoch_row.partitioning_func, epoch_row.partitioning_column,
epoch_row.partitioning_mod, keyspace_start, keyspace_end);
IF epoch_row.partitioning_column IS NOT NULL THEN
EXECUTE format(
$$
ALTER TABLE %1$I.%2$I
ADD CONSTRAINT partition CHECK(%3$s(%4$I::text, %5$L) BETWEEN %6$L AND %7$L)
$$,
schema_name, table_name,
epoch_row.partitioning_func, epoch_row.partitioning_column,
epoch_row.partitioning_mod, keyspace_start, keyspace_end);
END IF;
END
$BODY$;

View File

@ -74,16 +74,17 @@ $BODY$;
CREATE OR REPLACE FUNCTION add_partition_epoch(
hypertable_id INTEGER,
keyspace_start SMALLINT [],
partitioning_column NAME
partitioning_column NAME,
partitioning_func TEXT
)
RETURNS VOID LANGUAGE SQL VOLATILE AS
$BODY$
WITH epoch AS (
INSERT INTO _iobeamdb_catalog.partition_epoch (hypertable_id, start_time, end_time, partitioning_func, partitioning_mod, partitioning_column)
VALUES (hypertable_id, NULL, NULL, 'get_partition_for_key', 32768, partitioning_column)
RETURNING id
)
INSERT INTO _iobeamdb_catalog.partition (epoch_id, keyspace_start, keyspace_end)
WITH epoch AS (
INSERT INTO _iobeamdb_catalog.partition_epoch (hypertable_id, start_time, end_time, partitioning_func, partitioning_mod, partitioning_column)
VALUES (hypertable_id, NULL, NULL, partitioning_func, 32768, partitioning_column)
RETURNING id
)
INSERT INTO _iobeamdb_catalog.partition (epoch_id, keyspace_start, keyspace_end)
SELECT
epoch.id,
lag(start, 1, 0)
@ -95,7 +96,8 @@ $BODY$;
CREATE OR REPLACE FUNCTION add_equi_partition_epoch(
hypertable_id INTEGER,
number_partitions SMALLINT,
partitioning_column NAME
partitioning_column NAME,
partitioning_func TEXT
)
RETURNS VOID LANGUAGE SQL VOLATILE AS
$BODY$
@ -103,6 +105,7 @@ SELECT add_partition_epoch(
hypertable_id,
(SELECT ARRAY(SELECT start * 32768 / (number_partitions)
FROM generate_series(1, number_partitions - 1) AS start) :: SMALLINT []),
partitioning_column
partitioning_column,
partitioning_func
)
$BODY$;

View File

@ -19,8 +19,9 @@ CREATE OR REPLACE FUNCTION _iobeamdb_meta.create_hypertable(
RETURNS _iobeamdb_catalog.hypertable LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
id INTEGER;
hypertable_row _iobeamdb_catalog.hypertable;
id INTEGER;
hypertable_row _iobeamdb_catalog.hypertable;
partitioning_func TEXT = 'get_partition_for_key';
BEGIN
id := nextval('_iobeamdb_catalog.default_hypertable_seq');
@ -33,7 +34,15 @@ BEGIN
associated_table_prefix = format('_hyper_%s', id);
END IF;
IF number_partitions IS NULL THEN
IF partitioning_column IS NULL THEN
IF number_partitions IS NULL THEN
number_partitions := 1;
partitioning_func := NULL;
ELSIF number_partitions <> 1 THEN
RAISE EXCEPTION 'The number of partitions must be 1 without a partitioning column'
USING ERRCODE ='IO101';
END IF;
ELSIF number_partitions IS NULL THEN
SELECT COUNT(*)
INTO number_partitions
FROM _iobeamdb_catalog.node;
@ -63,7 +72,7 @@ BEGIN
RETURNING * INTO hypertable_row;
IF number_partitions != 0 THEN
PERFORM add_equi_partition_epoch(hypertable_row.id, number_partitions, partitioning_column);
PERFORM add_equi_partition_epoch(hypertable_row.id, number_partitions, partitioning_column, partitioning_func);
END IF;
RETURN hypertable_row;
END

View File

@ -22,3 +22,9 @@ select * from create_hypertable('test_schema.test_table', 'time', 'device_id');
\C test2
\dt "test_schema".*
--test partitioning in only time dimension
create table test_schema.test_1dim(time timestamp, temp float);
select create_hypertable('test_schema.test_1dim', 'time');
\dt "test_schema".*

View File

@ -80,3 +80,20 @@ select * from create_hypertable('test_schema.test_table', 'time', 'device_id');
test_schema | test_table | table | postgres
(1 row)
--test partitioning in only time dimension
create table test_schema.test_1dim(time timestamp, temp float);
select create_hypertable('test_schema.test_1dim', 'time');
test2
create_hypertable
-------------------
(1 row)
\dt "test_schema".*
List of relations
Schema | Name | Type | Owner
-------------+------------+-------+----------
test_schema | test_1dim | table | postgres
test_schema | test_table | table | postgres
(2 rows)

View File

@ -333,3 +333,22 @@ SELECT * FROM "testNs";
1257987600000000000 | dev1 | 1.5 | 2 | |
(10 rows)
--test that we can insert data into a 1-dimensional table (only time partitioning)
CREATE TABLE "1dim"(time timestamp, temp float);
SELECT create_hypertable('"1dim"', 'time');
create_hypertable
-------------------
(1 row)
INSERT INTO "1dim" VALUES('2017-01-20T09:00:01', 22.5);
INSERT INTO "1dim" VALUES('2017-01-20T09:00:21', 21.2);
INSERT INTO "1dim" VALUES('2017-01-20T09:00:47', 25.1);
SELECT * FROM "1dim";
time | temp
---------------------+------
2017-01-20 09:00:01 | 22.5
2017-01-20 09:00:21 | 21.2
2017-01-20 09:00:47 | 25.1
(3 rows)

View File

@ -8,3 +8,11 @@
SELECT *
FROM "testNs"._hyper_1_0_distinct;
SELECT * FROM "testNs";
--test that we can insert data into a 1-dimensional table (only time partitioning)
CREATE TABLE "1dim"(time timestamp, temp float);
SELECT create_hypertable('"1dim"', 'time');
INSERT INTO "1dim" VALUES('2017-01-20T09:00:01', 22.5);
INSERT INTO "1dim" VALUES('2017-01-20T09:00:21', 21.2);
INSERT INTO "1dim" VALUES('2017-01-20T09:00:47', 25.1);
SELECT * FROM "1dim";

View File

@ -241,18 +241,30 @@ get_hypertable_info(Oid mainRelationOid)
for (j = 0; j < total_rows; j++)
{
HeapTuple tuple = SPI_tuptable->vals[j];
Name partitioning_column = DatumGetName(SPI_getbinval(tuple, tupdesc, 2, &isnull));
Name partitioning_func = DatumGetName(SPI_getbinval(tuple, tupdesc, 3, &isnull));
int32 partitioning_mod = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 4, &isnull));
Name partitioning_func, partitioning_column;
int32 partitioning_mod;
partitioning_info* info = (partitioning_info *) SPI_palloc(sizeof(partitioning_info));
info->partitioning_column = SPI_palloc(sizeof(NameData));
memcpy(info->partitioning_column, partitioning_column, sizeof(NameData));
partitioning_column = DatumGetName(SPI_getbinval(tuple, tupdesc, 2, &isnull));
info->partitioning_func = SPI_palloc(sizeof(NameData));
memcpy(info->partitioning_func, partitioning_func, sizeof(NameData));
if (!isnull) {
info->partitioning_column = SPI_palloc(sizeof(NameData));
memcpy(info->partitioning_column, partitioning_column, sizeof(NameData));
}
info->partitioning_mod = partitioning_mod;
partitioning_func = DatumGetName(SPI_getbinval(tuple, tupdesc, 3, &isnull));
if (!isnull) {
info->partitioning_func = SPI_palloc(sizeof(NameData));
memcpy(info->partitioning_func, partitioning_func, sizeof(NameData));
}
partitioning_mod = DatumGetInt32(SPI_getbinval(tuple, tupdesc, 4, &isnull));
if (!isnull) {
info->partitioning_mod = partitioning_mod;
}
partitioning_info_array[j] = info;
}
@ -346,7 +358,9 @@ add_partitioning_func_qual_mutator(Node *node, add_partitioning_func_qual_contex
context->parse,
context->hypertable_info_list);
if (pi != NULL) {
if (pi != NULL
&& pi->partitioning_column != NULL
&& pi->partitioning_func != NULL) {
/* The var is a partitioning column */
Expr * partitioning_clause = create_partition_func_equals_const(var_expr, const_expr,
pi->partitioning_func,