Separate out subspace_store and add it to the hypertable object as well

This commit is contained in:
Matvey Arye 2017-06-17 09:17:09 -04:00 committed by Erik Nordström
parent c8124b8b95
commit fc68baa8cc
15 changed files with 252 additions and 145 deletions

View File

@ -40,6 +40,7 @@ SRCS = \
src/insert_chunk_state.c \
src/insert_statement_state.c \
src/agg_bookend.c \
src/subspace_store.c \
src/guc.c
OBJS = $(SRCS:.c=.o)

View File

@ -1,5 +1,5 @@
-- get a chunk if it exists
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_get(
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_get_2_dim(
time_dimension_id INTEGER,
time_value BIGINT,
space_dimension_id INTEGER,
@ -25,6 +25,40 @@ WHERE ds.dimension_id = space_dimension_id and ds.range_start <= space_dimensio
)
$BODY$;
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_get_1_dim(
time_dimension_id INTEGER,
time_value BIGINT
)
RETURNS _timescaledb_catalog.chunk LANGUAGE SQL STABLE AS
$BODY$
SELECT *
FROM _timescaledb_catalog.chunk
WHERE
id = (
SELECT cc.chunk_id
FROM _timescaledb_catalog.dimension_slice ds
INNER JOIN _timescaledb_catalog.chunk_constraint cc ON (ds.id = cc.dimension_slice_id)
WHERE ds.dimension_id = time_dimension_id and ds.range_start <= time_value and ds.range_end > time_value
)
$BODY$;
CREATE OR REPLACE FUNCTION _timescaledb_internal.chunk_get(
time_dimension_id INTEGER,
time_value BIGINT,
space_dimension_id INTEGER,
space_value BIGINT
)
RETURNS _timescaledb_catalog.chunk LANGUAGE SQL STABLE AS
$BODY$
SELECT
CASE WHEN space_dimension_id IS NOT NULL AND space_dimension_id <> 0 THEN
_timescaledb_internal.chunk_get_2_dim(time_dimension_id, time_value, space_dimension_id, space_value)
ELSE
_timescaledb_internal.chunk_get_1_dim(time_dimension_id, time_value)
END;
$BODY$;
--todo: unit test
CREATE OR REPLACE FUNCTION _timescaledb_internal.dimension_calculate_default_range(
dimension_id INTEGER,
@ -197,16 +231,19 @@ BEGIN
INTO time_start, time_end
FROM _timescaledb_internal.chunk_calculate_new_ranges(time_dimension_id, time_value, space_dimension_id, space_value, true);
SELECT *
INTO space_start, space_end
FROM _timescaledb_internal.chunk_calculate_new_ranges(space_dimension_id, space_value, time_dimension_id, time_value, false);
--do not use RETURNING here (ON CONFLICT DO NOTHING)
INSERT INTO _timescaledb_catalog.dimension_slice (dimension_id, range_start, range_end)
VALUES(time_dimension_id, time_start, time_end) ON CONFLICT DO NOTHING;
INSERT INTO _timescaledb_catalog.dimension_slice (dimension_id, range_start, range_end)
VALUES(space_dimension_id, space_start, space_end) ON CONFLICT DO NOTHING;
IF space_dimension_id IS NOT NULL AND space_dimension_id > 0 THEN
SELECT *
INTO space_start, space_end
FROM _timescaledb_internal.chunk_calculate_new_ranges(space_dimension_id, space_value, time_dimension_id, time_value, false);
INSERT INTO _timescaledb_catalog.dimension_slice (dimension_id, range_start, range_end)
VALUES(space_dimension_id, space_start, space_end) ON CONFLICT DO NOTHING;
END IF;
WITH chunk AS (
INSERT INTO _timescaledb_catalog.chunk (id, hypertable_id, schema_name, table_name)

View File

@ -15,7 +15,8 @@ GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA _timescaledb_catalog TO PUBLIC;
-- MUST DOCUMENT TODO: remove these permissions. Have c-based workaround.
-- Everything below this line is suspect.
GRANT INSERT ON TABLE
_timescaledb_catalog.hypertable, _timescaledb_catalog.chunk
_timescaledb_catalog.hypertable, _timescaledb_catalog.chunk,
_timescaledb_catalog.dimension, _timescaledb_catalog.dimension_slice, _timescaledb_catalog.chunk_constraint
TO PUBLIC;
-- needed for inserts to hypertable

View File

@ -67,8 +67,6 @@ chunk_constraint_scan(Chunk *chunk)
.scandirection = ForwardScanDirection,
};
/* Perform an index scan for slice matching the dimension's ID and which
* encloses the coordinate */
ScanKeyInit(&scankey[0], Anum_chunk_constraint_chunk_id_dimension_id_idx_chunk_id,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(chunk->fd.id));

View File

@ -2,6 +2,8 @@
#define TIMESCALEDB_DIMENSION_H
#include <postgres.h>
#include <access/attnum.h>
#include <access/htup_details.h>
#include "catalog.h"

View File

@ -39,7 +39,7 @@ hypercube_free(Hypercube *hc)
{
int i;
for (i = 0; i < hc->num_dimensions; i++)
for (i = 0; i < hc->num_slices; i++)
pfree(hc->slices[i]);
pfree(hc);
@ -113,6 +113,26 @@ dimension_slice_scan_by_id(int32 dimension_slice_id)
return slice;
}
static int
cmp_slices_by_dimension_id(const void *left, const void *right)
{
const DimensionSlice *left_slice = *((DimensionSlice **) left);
const DimensionSlice *right_slice = *((DimensionSlice **) right);
if (left_slice->fd.dimension_id == right_slice->fd.dimension_id)
return 0;
if (left_slice->fd.dimension_id < right_slice->fd.dimension_id)
return -1;
return 1;
}
static void
hypercube_slice_sort(Hypercube *hc)
{
qsort(hc->slices, hc->num_slices, sizeof(DimensionSlice *), cmp_slices_by_dimension_id);
}
Hypercube *
hypercube_from_constraints(ChunkConstraint constraints[], int16 num_constraints)
{
@ -125,7 +145,9 @@ hypercube_from_constraints(ChunkConstraint constraints[], int16 num_constraints)
Assert(slice != NULL);
hc->slices[hc->num_slices++] = slice;
}
Assert(hc->num_slices == hc->num_dimensions);
hypercube_slice_sort(hc);
return hc;
}

View File

@ -22,15 +22,12 @@ typedef struct DimensionSlice
*/
typedef struct Hypercube
{
int16 num_dimensions;
int16 num_slices;
int16 num_dimensions; /* capacity of slices[] */
int16 num_slices; /* actual number of slices (should equal num_dimensions after create) */
/* Open slices are stored before closed slices */
DimensionSlice *slices[0];
} Hypercube;
#define HYPERCUBE_NUM_SLICES(hc) \
((hc)->num_open_slices + (hc)->num_closed_slices)
#define HYPERCUBE_SIZE(num_dimensions) \
(sizeof(Hypercube) + sizeof(DimensionSlice *) * num_dimensions)

View File

@ -5,6 +5,7 @@
#include "hypertable.h"
#include "dimension.h"
#include "chunk.h"
Hypertable *
hypertable_from_tuple(HeapTuple tuple)
@ -16,6 +17,8 @@ hypertable_from_tuple(HeapTuple tuple)
memcpy(&h->fd, GETSTRUCT(tuple), sizeof(FormData_hypertable));
namespace_oid = get_namespace_oid(NameStr(h->fd.schema_name), false);
h->main_table_relid = get_relname_relid(NameStr(h->fd.table_name), namespace_oid);
h->space = dimension_scan(h->fd.id, h->main_table_relid);
h->chunk_cache = subspace_store_init(h->space->num_closed_dimensions + h->space->num_open_dimensions);
return h;
}
@ -26,6 +29,7 @@ hypertable_get_open_dimension(Hypertable *h)
if (h->space->num_open_dimensions == 0)
return NULL;
Assert(h->space->num_open_dimensions == 1);
return h->space->open_dimensions[0];
}
@ -34,6 +38,27 @@ hypertable_get_closed_dimension(Hypertable *h)
{
if (h->space->num_closed_dimensions == 0)
return NULL;
Assert(h->space->num_closed_dimensions == 1);
return h->space->closed_dimensions[0];
}
Chunk *hypertable_get_chunk(Hypertable *h, Point *point)
{
Chunk * chunk = subspace_store_get(h->chunk_cache, point);
if (NULL == chunk)
{
Hypercube *hc;
chunk = chunk_get_or_create(h->space, point);
if (NULL == chunk)
elog(ERROR, "No chunk found or created");
chunk_constraint_scan(chunk);
hc = hypercube_from_constraints(chunk->constraints, chunk->num_constraints);
subspace_store_add(h->chunk_cache, hc, chunk, pfree);
}
return chunk;
}

View File

@ -4,6 +4,7 @@
#include <postgres.h>
#include "catalog.h"
#include "subspace_store.h"
typedef struct PartitionEpoch PartitionEpoch;
typedef struct Hyperspace Hyperspace;
@ -15,10 +16,8 @@ typedef struct Hypertable
{
FormData_hypertable fd;
Oid main_table_relid;
int num_epochs;
/* Array of PartitionEpoch. Order by start_time */
PartitionEpoch *epochs[MAX_EPOCHS_PER_HYPERTABLE];
Hyperspace *space;
SubspaceStore *chunk_cache;
} Hypertable;
typedef struct HeapTupleData *HeapTuple;
@ -27,4 +26,8 @@ extern Hypertable *hypertable_from_tuple(HeapTuple tuple);
extern Dimension *hypertable_get_open_dimension(Hypertable *h);
extern Dimension *hypertable_get_closed_dimension(Hypertable *h);
extern Chunk *hypertable_get_chunk(Hypertable *h, Point *point);
#endif /* TIMESCALEDB_HYPERTABLE_H */

View File

@ -78,8 +78,6 @@ insert_main_table_trigger(PG_FUNCTION_ARGS)
/* Calculate the tuple's point in the N-dimensional hyperspace */
point = hyperspace_calculate_point(ht->space, tuple, tupdesc);
elog(NOTICE, "Point is %s", point_to_string(point));
/* Find or create the insert state matching the point */
cstate = insert_statement_state_get_insert_chunk_state(insert_statement_state,
ht->space, point);

View File

@ -13,110 +13,6 @@
#include "hypertable.h"
#include "chunk_constraint.h"
/*
* TODO
*/
typedef struct InsertStateCache {
int16 num_dimensions;
DimensionAxis *origin; //origin of the tree
} InsertStateCache;
static DimensionAxis *
insert_state_cache_dimension_create()
{
/* TODO remove type from axis */
return dimension_axis_create(DIMENSION_TYPE_OPEN, 10);
}
static void
insert_state_cache_init(InsertStateCache *cache, int16 num_dimensions)
{
cache->origin = insert_state_cache_dimension_create();
cache->num_dimensions = num_dimensions;
}
static void
insert_state_cache_free_internal_node(void * node)
{
dimension_axis_free((DimensionAxis *)node);
}
static void insert_state_cache_add(InsertStateCache *cache, Hypercube *hc,
void *end_store, void (*end_store_free)(void *))
{
DimensionAxis *axis = cache->origin;
DimensionSlice *last = NULL;
int i;
Assert(hc->num_slices == cache->num_dimensions);
for (i = 0; i < hc->num_slices; i++)
{
DimensionSlice *target = hc->slices[i];
DimensionSlice *match;
Assert(target->storage == NULL);
if (axis == NULL)
{
last->storage = insert_state_cache_dimension_create();
last->storage_free = insert_state_cache_free_internal_node;
axis = last->storage;
}
match = dimension_axis_find_slice(axis, target->fd.range_start);
if (match == NULL)
{
dimension_axis_add_slice_sort(&axis, target);
match = target;
}
last = match;
axis = last->storage; /* Internal nodes point to the next Dimension's Axis */
}
last->storage = end_store; /* at the end we store the object */
last->storage_free = end_store_free;
}
static void *
insert_state_cache_get(InsertStateCache *cache, Point *target)
{
int16 i;
DimensionAxis *axis = cache->origin;
DimensionSlice *match = NULL;
Assert(target->cardinality == cache->num_dimensions);
for (i = 0; i < target->cardinality; i++)
{
match = dimension_axis_find_slice(axis, target->coordinates[i]);
if (NULL == match)
return NULL;
axis = match->storage;
}
return match->storage;
}
static bool
insert_state_cache_match_first(InsertStateCache *cache, Point *target)
{
Assert(target->cardinality == cache->num_dimensions);
return (dimension_axis_find_slice(cache->origin, target->coordinates[0]) != NULL);
}
static void
insert_state_cache_free(InsertStateCache *cache)
{
dimension_axis_free(cache->origin);
pfree(cache);
}
InsertStatementState *
insert_statement_state_new(Oid relid)
{
@ -161,7 +57,7 @@ insert_statement_state_destroy(InsertStatementState *state)
insert_chunk_state_destroy(state->cstates[i]);
}
insert_state_cache_free(state->cache);
subspace_store_free(state->cache);
cache_release(state->chunk_cache);
cache_release(state->hypertable_cache);
@ -207,21 +103,17 @@ insert_statement_state_get_insert_chunk_state(InsertStatementState *state, Hyper
if (NULL == state->cache)
{
state->cache = palloc(sizeof(InsertStateCache));
insert_state_cache_init(state->cache, point->cardinality);
state->cache = subspace_store_init(point->cardinality);
}
ics = insert_state_cache_get(state->cache, point);
ics = subspace_store_get(state->cache, point);
if (NULL == ics)
{
Chunk * new_chunk;
Hypercube *hc;
elog(WARNING, "LOOKUP");
/* NOTE: assumes 1 or 2 dims */
new_chunk = chunk_get_or_create(hs, point);
new_chunk = hypertable_get_chunk(state->hypertable, point);
if (NULL == new_chunk)
elog(ERROR, "No chunk found or created");
@ -229,7 +121,7 @@ insert_statement_state_get_insert_chunk_state(InsertStatementState *state, Hyper
ics = insert_chunk_state_new(new_chunk);
chunk_constraint_scan(new_chunk);
hc = hypercube_from_constraints(new_chunk->constraints, new_chunk->num_constraints);
insert_state_cache_add(state->cache, hc, ics, destroy_ics);
subspace_store_add(state->cache, hc, ics, destroy_ics);
}
return ics;

View File

@ -5,6 +5,7 @@
#include "insert_chunk_state.h"
#include "hypertable_cache.h"
#include "cache.h"
#include "subspace_store.h"
typedef struct Hyperspace Hyperspace;
@ -24,7 +25,7 @@ typedef struct
int num_partitions;
int num_open_dimensions;
DimensionSlice *open_dimensions_slices[0];
InsertStateCache *cache;
SubspaceStore *cache;
} InsertStatementState;
InsertStatementState *insert_statement_state_new(Oid);

View File

@ -23,6 +23,7 @@
#include "extension.h"
#include "utils.h"
#include "guc.h"
#include "dimension.h"
void _planner_init(void);
void _planner_fini(void);
@ -120,15 +121,11 @@ get_partitioning_info_for_partition_column_var(Var *var_expr, Query *parse, Cach
if (rte->relid == hentry->main_table_relid)
{
/* get latest partition epoch: TODO scan all pe */
// FIXME: commented out to compile
/*PartitionEpoch *eps = hypertable_cache_get_partition_epoch(hcache, hentry, OPEN_END_TIME - 1, rte->relid);
if (eps->partitioning != NULL &&
strncmp(eps->partitioning->column, varname, NAMEDATALEN) == 0)
Dimension *closed_dim = hypertable_get_closed_dimension(hentry);
if (closed_dim != NULL)
{
return eps->partitioning;
}*/
return closed_dim->partitioning;
}
}
return NULL;
}

114
src/subspace_store.c Normal file
View File

@ -0,0 +1,114 @@
#include <postgres.h>
#include "dimension.h"
#include "dimension_slice.h"
#include "subspace_store.h"
typedef struct SubspaceStore {
int16 num_dimensions;
DimensionAxis *origin; //origin of the tree
} SubspaceStore;
static DimensionAxis *
subspace_store_dimension_create()
{
/* TODO remove type from axis */
return dimension_axis_create(DIMENSION_TYPE_OPEN, 10);
}
SubspaceStore *
subspace_store_init(int16 num_dimensions)
{
SubspaceStore *sst = palloc(sizeof(SubspaceStore));
sst->origin = subspace_store_dimension_create();
sst->num_dimensions = num_dimensions;
return sst;
}
static void
subspace_store_free_internal_node(void * node)
{
dimension_axis_free((DimensionAxis *)node);
}
void subspace_store_add(SubspaceStore *cache, Hypercube *hc,
void *end_store, void (*end_store_free)(void *))
{
DimensionAxis *axis = cache->origin;
DimensionSlice *last = NULL;
int i;
Assert(hc->num_slices == cache->num_dimensions);
for (i = 0; i < hc->num_slices; i++)
{
DimensionSlice *target = hc->slices[i];
DimensionSlice *match;
Assert(target->storage == NULL);
if (axis == NULL)
{
last->storage = subspace_store_dimension_create();
last->storage_free = subspace_store_free_internal_node;
axis = last->storage;
}
if(axis->num_slices > 0)
{
Assert(axis->slices[0]->fd.dimension_id = target->fd.dimension_id);
}
match = dimension_axis_find_slice(axis, target->fd.range_start);
if (match == NULL)
{
dimension_axis_add_slice_sort(&axis, target);
match = target;
}
last = match;
axis = last->storage; /* Internal nodes point to the next Dimension's Axis */
}
Assert(last->storage == NULL);
last->storage = end_store; /* at the end we store the object */
last->storage_free = end_store_free;
}
void *
subspace_store_get(SubspaceStore *cache, Point *target)
{
int16 i;
DimensionAxis *axis = cache->origin;
DimensionSlice *match = NULL;
Assert(target->cardinality == cache->num_dimensions);
for (i = 0; i < target->cardinality; i++)
{
match = dimension_axis_find_slice(axis, target->coordinates[i]);
if (NULL == match)
return NULL;
axis = match->storage;
}
return match->storage;
}
static bool
subspace_store_match_first(SubspaceStore *cache, Point *target)
{
Assert(target->cardinality == cache->num_dimensions);
return (dimension_axis_find_slice(cache->origin, target->coordinates[0]) != NULL);
}
void
subspace_store_free(SubspaceStore *cache)
{
dimension_axis_free(cache->origin);
pfree(cache);
}

19
src/subspace_store.h Normal file
View File

@ -0,0 +1,19 @@
#ifndef TIMESCALEDB_SUBSPACE_STORE_H
#define TIMESCALEDB_SUBSPACE_STORE_H
#include <postgres.h>
#include "dimension.h"
#include "dimension_slice.h"
typedef struct SubspaceStore SubspaceStore;
extern SubspaceStore *subspace_store_init(int16 num_dimensions);
extern void subspace_store_add(SubspaceStore *cache, Hypercube *hc,
void *end_store, void (*end_store_free)(void *));
extern void *subspace_store_get(SubspaceStore *cache, Point *target);
extern void subspace_store_free(SubspaceStore *cache);
#endif /* TIMESCALEDB_SUBSPACE_STORE_H */