mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 11:03:36 +08:00
Add native scan for the chunk table
- The chunk table can now be scanned in the C code - Rename DimensionAxis to DimensionVec
This commit is contained in:
parent
fc68baa8cc
commit
fe51d8d7fc
@ -42,11 +42,11 @@
|
||||
-- The name and type of the time column (used to partition on time) are defined
|
||||
-- in `time_column_name` and `time_column_type`.
|
||||
CREATE TABLE IF NOT EXISTS _timescaledb_catalog.hypertable (
|
||||
id SERIAL PRIMARY KEY,
|
||||
schema_name NAME NOT NULL CHECK (schema_name != '_timescaledb_catalog'),
|
||||
table_name NAME NOT NULL,
|
||||
associated_schema_name NAME NOT NULL,
|
||||
associated_table_prefix NAME NOT NULL,
|
||||
id SERIAL PRIMARY KEY,
|
||||
schema_name NAME NOT NULL CHECK (schema_name != '_timescaledb_catalog'),
|
||||
table_name NAME NOT NULL,
|
||||
associated_schema_name NAME NOT NULL,
|
||||
associated_table_prefix NAME NOT NULL,
|
||||
UNIQUE (schema_name, table_name),
|
||||
UNIQUE (associated_schema_name, associated_table_prefix)
|
||||
);
|
||||
@ -65,7 +65,7 @@ CREATE TABLE _timescaledb_catalog.dimension (
|
||||
-- open dimensions (e.g., time)
|
||||
interval_length BIGINT NULL CHECK(interval_length IS NULL OR interval_length > 0),
|
||||
CHECK (
|
||||
(partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR
|
||||
(partitioning_func_schema IS NULL AND partitioning_func IS NULL) OR
|
||||
(partitioning_func_schema IS NOT NULL AND partitioning_func IS NOT NULL)
|
||||
),
|
||||
CHECK (
|
||||
@ -77,7 +77,7 @@ SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.dimension', '')
|
||||
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.dimension','id'), '');
|
||||
|
||||
|
||||
CREATE TABLE _timescaledb_catalog.dimension_slice (
|
||||
CREATE TABLE _timescaledb_catalog.dimension_slice (
|
||||
id SERIAL NOT NULL PRIMARY KEY,
|
||||
dimension_id INTEGER NOT NULL REFERENCES _timescaledb_catalog.dimension(id) ON DELETE CASCADE,
|
||||
range_start BIGINT NOT NULL CHECK (range_start >= 0),
|
||||
@ -103,9 +103,9 @@ CREATE INDEX ON _timescaledb_catalog.chunk(hypertable_id);
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk', '');
|
||||
SELECT pg_catalog.pg_extension_config_dump(pg_get_serial_sequence('_timescaledb_catalog.chunk','id'), '');
|
||||
|
||||
CREATE TABLE _timescaledb_catalog.chunk_constraint (
|
||||
dimension_slice_id INTEGER NOT NULL REFERENCES _timescaledb_catalog.dimension_slice(id) ON DELETE CASCADE,
|
||||
CREATE TABLE _timescaledb_catalog.chunk_constraint (
|
||||
chunk_id INTEGER NOT NULL REFERENCES _timescaledb_catalog.chunk(id) ON DELETE CASCADE,
|
||||
dimension_slice_id INTEGER NOT NULL REFERENCES _timescaledb_catalog.dimension_slice(id) ON DELETE CASCADE,
|
||||
PRIMARY KEY(chunk_id, dimension_slice_id)
|
||||
);
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_constraint', '');
|
||||
|
@ -53,7 +53,7 @@ const static TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
|
||||
[CHUNK_CONSTRAINT] = {
|
||||
.length = _MAX_CHUNK_CONSTRAINT_INDEX,
|
||||
.names = (char *[]) {
|
||||
[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_ID_IDX] = "chunk_constraint_pkey",
|
||||
[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX] = "chunk_constraint_pkey",
|
||||
}
|
||||
}
|
||||
};
|
||||
|
@ -251,8 +251,8 @@ enum
|
||||
|
||||
enum Anum_chunk_constraint
|
||||
{
|
||||
Anum_chunk_constraint_dimension_slice_id = 1,
|
||||
Anum_chunk_constraint_chunk_id,
|
||||
Anum_chunk_constraint_chunk_id = 1,
|
||||
Anum_chunk_constraint_dimension_slice_id,
|
||||
_Anum_chunk_constraint_max,
|
||||
};
|
||||
|
||||
@ -261,19 +261,19 @@ enum Anum_chunk_constraint
|
||||
|
||||
typedef struct FormData_chunk_constraint
|
||||
{
|
||||
int32 dimension_slice_id;
|
||||
int32 chunk_id;
|
||||
int32 dimension_slice_id;
|
||||
} FormData_chunk_constraint;
|
||||
|
||||
typedef FormData_chunk_constraint *Form_chunk_constraint;
|
||||
|
||||
enum
|
||||
{
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_ID_IDX = 0,
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX = 0,
|
||||
_MAX_CHUNK_CONSTRAINT_INDEX,
|
||||
};
|
||||
|
||||
enum Anum_chunk_constraint_chunk_id_dimension_id_idx
|
||||
enum Anum_chunk_constraint_chunk_id_dimension_slice_id_idx
|
||||
{
|
||||
Anum_chunk_constraint_chunk_id_dimension_id_idx_chunk_id = 1,
|
||||
Anum_chunk_constraint_chunk_id_dimension_id_idx_dimension_slice_id,
|
||||
@ -281,7 +281,6 @@ enum Anum_chunk_constraint_chunk_id_dimension_id_idx
|
||||
};
|
||||
|
||||
|
||||
|
||||
#define MAX(a, b) \
|
||||
((long)(a) > (long)(b) ? (a) : (b))
|
||||
|
||||
|
232
src/chunk.c
232
src/chunk.c
@ -3,22 +3,27 @@
|
||||
#include <fmgr.h>
|
||||
#include <utils/builtins.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <utils/hsearch.h>
|
||||
#include <utils/memutils.h>
|
||||
#include <access/htup_details.h>
|
||||
|
||||
#include "chunk.h"
|
||||
#include "catalog.h"
|
||||
#include "dimension.h"
|
||||
#include "dimension_slice.h"
|
||||
#include "partitioning.h"
|
||||
#include "metadata_queries.h"
|
||||
#include "scanner.h"
|
||||
|
||||
Chunk *
|
||||
chunk_create(HeapTuple tuple, int16 num_constraints)
|
||||
chunk_create_from_tuple(HeapTuple tuple, int16 num_constraints)
|
||||
{
|
||||
Chunk *chunk;
|
||||
|
||||
chunk = palloc0(CHUNK_SIZE(num_constraints));
|
||||
memcpy(&chunk->fd, GETSTRUCT(tuple), sizeof(FormData_chunk));
|
||||
chunk->num_constraints = num_constraints;
|
||||
chunk->num_constraint_slots = num_constraints;
|
||||
chunk->num_constraints = 0;
|
||||
chunk->table_id = get_relname_relid(chunk->fd.table_name.data,
|
||||
get_namespace_oid(chunk->fd.schema_name.data, false));
|
||||
|
||||
@ -26,19 +31,222 @@ chunk_create(HeapTuple tuple, int16 num_constraints)
|
||||
}
|
||||
|
||||
Chunk *
|
||||
chunk_get_or_create(Hyperspace *hs, Point *p)
|
||||
chunk_create_new(Hyperspace *hs, Point *p)
|
||||
{
|
||||
Chunk *chunk;
|
||||
|
||||
/* NOTE: Currently supports only two dimensions */
|
||||
Assert(hs->num_open_dimensions == 1 && hs->num_closed_dimensions <= 1);
|
||||
|
||||
if (hs->num_closed_dimensions == 1)
|
||||
return spi_chunk_get_or_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0],
|
||||
hs->closed_dimensions[0]->fd.id,
|
||||
p->coordinates[1],
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
|
||||
return spi_chunk_get_or_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0], 0, 0,
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
chunk = spi_chunk_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0],
|
||||
hs->closed_dimensions[0]->fd.id,
|
||||
p->coordinates[1],
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
else
|
||||
chunk = spi_chunk_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0], 0, 0,
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
Assert(chunk != NULL);
|
||||
|
||||
chunk_constraint_scan(chunk);
|
||||
|
||||
chunk->cube = hypercube_from_constraints(chunk->constraints, chunk->num_constraints);
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
Chunk *
|
||||
chunk_get_or_create_new(Hyperspace *hs, Point *p)
|
||||
{
|
||||
Chunk *chunk;
|
||||
|
||||
/* NOTE: Currently supports only two dimensions */
|
||||
Assert(hs->num_open_dimensions == 1 && hs->num_closed_dimensions <= 1);
|
||||
|
||||
if (hs->num_closed_dimensions == 1)
|
||||
chunk = spi_chunk_get_or_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0],
|
||||
hs->closed_dimensions[0]->fd.id,
|
||||
p->coordinates[1],
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
else
|
||||
chunk = spi_chunk_get_or_create(hs->open_dimensions[0]->fd.id,
|
||||
p->coordinates[0], 0, 0,
|
||||
HYPERSPACE_NUM_DIMENSIONS(hs));
|
||||
Assert(chunk != NULL);
|
||||
|
||||
chunk_constraint_scan(chunk);
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
static bool
|
||||
chunk_tuple_found(TupleInfo *ti, void *arg)
|
||||
{
|
||||
Chunk *chunk = arg;
|
||||
memcpy(&chunk->fd, GETSTRUCT(ti->tuple), sizeof(FormData_chunk));
|
||||
return false;
|
||||
}
|
||||
|
||||
static Chunk *
|
||||
chunk_scan(Chunk *chunk_stub, bool tuplock)
|
||||
{
|
||||
ScanKeyData scankey[1];
|
||||
Catalog *catalog = catalog_get();
|
||||
int num_found;
|
||||
ScannerCtx ctx = {
|
||||
.table = catalog->tables[CHUNK].id,
|
||||
.index = catalog->tables[CHUNK].index_ids[CHUNK_ID_INDEX],
|
||||
.scantype = ScannerTypeIndex,
|
||||
.nkeys = 1,
|
||||
.scankey = scankey,
|
||||
.data = chunk_stub,
|
||||
.tuple_found = chunk_tuple_found,
|
||||
.lockmode = AccessShareLock,
|
||||
.tuplock = {
|
||||
.lockmode = LockTupleShare,
|
||||
.enabled = tuplock,
|
||||
},
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
/*
|
||||
* Perform an index scan on chunk ID.
|
||||
*/
|
||||
ScanKeyInit(&scankey[0], Anum_chunk_id, BTEqualStrategyNumber,
|
||||
F_INT4EQ, Int32GetDatum(chunk_stub->fd.id));
|
||||
|
||||
num_found = scanner_scan(&ctx);
|
||||
|
||||
if (num_found != 1)
|
||||
elog(ERROR, "No chunk found with ID %d", chunk_stub->fd.id);
|
||||
|
||||
return chunk_stub;
|
||||
}
|
||||
|
||||
bool
|
||||
chunk_add_constraint(Chunk *chunk, ChunkConstraint *constraint)
|
||||
{
|
||||
if (chunk->num_constraint_slots == chunk->num_constraints)
|
||||
return false;
|
||||
|
||||
memcpy(&chunk->constraints[chunk->num_constraints++], constraint, sizeof(ChunkConstraint));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool
|
||||
chunk_add_constraint_from_tuple(Chunk *chunk, HeapTuple constraint_tuple)
|
||||
{
|
||||
if (chunk->num_constraint_slots == chunk->num_constraints)
|
||||
return false;
|
||||
|
||||
memcpy(&chunk->constraints[chunk->num_constraints++],
|
||||
GETSTRUCT(constraint_tuple), sizeof(FormData_chunk_constraint));
|
||||
return true;
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_scan_ctx_init(ChunkScanState *ctx, int16 num_dimensions, MemoryContext elm_mctx)
|
||||
{
|
||||
struct HASHCTL hctl = {
|
||||
.keysize = sizeof(int32),
|
||||
.entrysize = sizeof(ChunkScanEntry),
|
||||
.hcxt = AllocSetContextCreate(CurrentMemoryContext,
|
||||
"chunk-scan",
|
||||
ALLOCSET_DEFAULT_SIZES),
|
||||
};
|
||||
|
||||
ctx->htab = hash_create("chunk-scan", 20, &hctl, HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
|
||||
ctx->elm_mctx = elm_mctx;
|
||||
ctx->num_dimensions = num_dimensions;
|
||||
ctx->num_elements = 0;
|
||||
}
|
||||
|
||||
static void
|
||||
chunk_scan_ctx_destroy(ChunkScanState *cs)
|
||||
{
|
||||
hash_destroy(cs->htab);
|
||||
}
|
||||
|
||||
static Chunk *
|
||||
chunk_scan_ctx_find_chunk(ChunkScanState *ctx)
|
||||
{
|
||||
HASH_SEQ_STATUS status;
|
||||
ChunkScanEntry *entry;
|
||||
|
||||
hash_seq_init(&status, ctx->htab);
|
||||
|
||||
for (entry = hash_seq_search(&status);
|
||||
entry != NULL;
|
||||
entry = hash_seq_search(&status))
|
||||
{
|
||||
Chunk *chunk = entry->chunk;
|
||||
|
||||
if (chunk->num_constraints == ctx->num_dimensions)
|
||||
{
|
||||
elog(NOTICE, "Found chunk %d", chunk->fd.id);
|
||||
hash_seq_term(&status);
|
||||
return chunk;
|
||||
}
|
||||
|
||||
elog(NOTICE, "Found non-matching chunk %d with %d constraints",
|
||||
chunk->fd.id, chunk->num_constraints);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Chunk *
|
||||
chunk_find(Hyperspace *hs, Point *p)
|
||||
{
|
||||
Chunk *chunk;
|
||||
ChunkScanState scanCtx;
|
||||
int16 num_dimensions = HYPERSPACE_NUM_DIMENSIONS(hs);
|
||||
int i, j;
|
||||
|
||||
chunk_scan_ctx_init(&scanCtx, num_dimensions, CurrentMemoryContext);
|
||||
|
||||
elog(NOTICE, "##### Scanning for chunk");
|
||||
|
||||
for (i = 0; i < hs->num_open_dimensions; i++)
|
||||
{
|
||||
DimensionVec *vec;
|
||||
|
||||
elog(NOTICE, "Scanning dimension %s", NameStr(hs->open_dimensions[i]->fd.column_name));
|
||||
|
||||
vec = dimension_slice_scan(hs->open_dimensions[i]->fd.id, p->coordinates[i]);
|
||||
|
||||
elog(NOTICE, "Found %d slices", vec->num_slices);
|
||||
|
||||
for (j = 0; j < vec->num_slices; j++)
|
||||
chunk_constraint_scan_by_dimension_slice(vec->slices[j], &scanCtx);
|
||||
}
|
||||
|
||||
for (i = 0; i < hs->num_closed_dimensions; i++)
|
||||
{
|
||||
DimensionVec *vec;
|
||||
|
||||
elog(NOTICE, "Scanning dimension %s", NameStr(hs->closed_dimensions[i]->fd.column_name));
|
||||
|
||||
vec = dimension_slice_scan(hs->closed_dimensions[i]->fd.id,
|
||||
p->coordinates[hs->num_open_dimensions + i]);
|
||||
|
||||
elog(NOTICE, "Found %d slices", vec->num_slices);
|
||||
|
||||
for (j = 0; j < vec->num_slices; j++)
|
||||
chunk_constraint_scan_by_dimension_slice(vec->slices[j], &scanCtx);
|
||||
}
|
||||
|
||||
chunk = chunk_scan_ctx_find_chunk(&scanCtx);
|
||||
|
||||
chunk_scan_ctx_destroy(&scanCtx);
|
||||
|
||||
if (NULL != chunk) {
|
||||
chunk->cube = hypercube_from_constraints(chunk->constraints, chunk->num_constraints);
|
||||
chunk_scan(chunk, false);
|
||||
}
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
28
src/chunk.h
28
src/chunk.h
@ -32,14 +32,34 @@ typedef struct Chunk
|
||||
* table.
|
||||
*/
|
||||
Hypercube *cube;
|
||||
int16 num_constraint_slots;
|
||||
int16 num_constraints;
|
||||
ChunkConstraint constraints[0];
|
||||
} Chunk;
|
||||
|
||||
#define CHUNK_SIZE(num_constraints) \
|
||||
(sizeof(Chunk) + sizeof(ChunkConstraint) * num_constraints)
|
||||
#define CHUNK_SIZE(num_constraints) \
|
||||
(sizeof(Chunk) + sizeof(ChunkConstraint) * (num_constraints))
|
||||
|
||||
extern Chunk *chunk_create(HeapTuple tuple, int16 num_constraints);
|
||||
extern Chunk *chunk_get_or_create(Hyperspace *hs, Point *p);
|
||||
typedef struct ChunkScanState
|
||||
{
|
||||
HTAB *htab;
|
||||
MemoryContext elm_mctx;
|
||||
DimensionSlice *slice;
|
||||
int16 num_dimensions;
|
||||
int16 num_elements;
|
||||
} ChunkScanState;
|
||||
|
||||
typedef struct ChunkScanEntry
|
||||
{
|
||||
int32 chunk_id;
|
||||
Chunk *chunk;
|
||||
} ChunkScanEntry;
|
||||
|
||||
extern Chunk *chunk_create_from_tuple(HeapTuple tuple, int16 num_constraints);
|
||||
extern Chunk *chunk_create_new(Hyperspace *hs, Point *p);
|
||||
extern Chunk *chunk_get_or_create_new(Hyperspace *hs, Point *p);
|
||||
extern bool chunk_add_constraint(Chunk *chunk, ChunkConstraint *constraint);
|
||||
extern bool chunk_add_constraint_from_tuple(Chunk *chunk, HeapTuple constraint_tuple);
|
||||
extern Chunk *chunk_find(Hyperspace *hs, Point *p);
|
||||
|
||||
#endif /* TIMESCALEDB_CHUNK_H */
|
||||
|
@ -1,7 +1,9 @@
|
||||
#include <postgres.h>
|
||||
#include <utils/hsearch.h>
|
||||
|
||||
#include "scanner.h"
|
||||
#include "chunk_constraint.h"
|
||||
#include "dimension_slice.h"
|
||||
#include "chunk.h"
|
||||
|
||||
static inline ChunkConstraint *
|
||||
@ -29,7 +31,6 @@ chunk_constraint_from_tuple(HeapTuple tuple)
|
||||
typedef struct ChunkConstraintCtx
|
||||
{
|
||||
Chunk *chunk;
|
||||
int16 num_constraints_found;
|
||||
} ChunkConstraintCtx;
|
||||
|
||||
static bool
|
||||
@ -37,11 +38,11 @@ chunk_constraint_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
ChunkConstraintCtx *ctx = data;
|
||||
|
||||
chunk_constraint_fill(&ctx->chunk->constraints[ctx->num_constraints_found++], ti->tuple);
|
||||
chunk_constraint_fill(&ctx->chunk->constraints[ctx->chunk->num_constraints++], ti->tuple);
|
||||
|
||||
if (ctx->num_constraints_found == ctx->chunk->num_constraints)
|
||||
if (ctx->chunk->num_constraint_slots == ctx->chunk->num_constraints)
|
||||
return false;
|
||||
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@ -52,12 +53,11 @@ chunk_constraint_scan(Chunk *chunk)
|
||||
ScanKeyData scankey[1];
|
||||
ChunkConstraintCtx data = {
|
||||
.chunk = chunk,
|
||||
.num_constraints_found = 0,
|
||||
};
|
||||
int num_found;
|
||||
ScannerCtx scanCtx = {
|
||||
.table = catalog->tables[CHUNK_CONSTRAINT].id,
|
||||
.index = catalog->tables[CHUNK_CONSTRAINT].index_ids[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_ID_IDX],
|
||||
.index = catalog->tables[CHUNK_CONSTRAINT].index_ids[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX],
|
||||
.scantype = ScannerTypeIndex,
|
||||
.nkeys = 1,
|
||||
.scankey = scankey,
|
||||
@ -66,6 +66,8 @@ chunk_constraint_scan(Chunk *chunk)
|
||||
.lockmode = AccessShareLock,
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
chunk->num_constraints = 0;
|
||||
|
||||
ScanKeyInit(&scankey[0], Anum_chunk_constraint_chunk_id_dimension_id_idx_chunk_id,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(chunk->fd.id));
|
||||
@ -77,3 +79,64 @@ chunk_constraint_scan(Chunk *chunk)
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
static bool
|
||||
chunk_constraint_dimension_id_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
ChunkScanState *ctx = data;
|
||||
ChunkConstraint constraint;
|
||||
Chunk *chunk;
|
||||
ChunkScanEntry *entry;
|
||||
bool found;
|
||||
|
||||
chunk_constraint_fill(&constraint, ti->tuple);
|
||||
|
||||
elog(NOTICE, "Finding chunk %d", constraint.fd.chunk_id);
|
||||
|
||||
entry = hash_search(ctx->htab, &constraint.fd.chunk_id, HASH_ENTER, &found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
chunk = MemoryContextAlloc(ctx->elm_mctx, CHUNK_SIZE(ctx->num_dimensions));
|
||||
chunk->fd.id = constraint.fd.chunk_id;
|
||||
chunk->num_constraint_slots = ctx->num_dimensions;
|
||||
entry->chunk = chunk;
|
||||
} else {
|
||||
chunk = entry->chunk;
|
||||
}
|
||||
|
||||
chunk_add_constraint(chunk, &constraint);
|
||||
elog(NOTICE, "Added constraint (%d,%d) for chunk %d num_constraints=%d",
|
||||
constraint.fd.chunk_id, constraint.fd.dimension_slice_id, chunk->fd.id,
|
||||
chunk->num_constraints);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int
|
||||
chunk_constraint_scan_by_dimension_slice(DimensionSlice *slice, ChunkScanState *ctx)
|
||||
{
|
||||
Catalog *catalog = catalog_get();
|
||||
ScanKeyData scankey[1];
|
||||
int num_found;
|
||||
ScannerCtx scanCtx = {
|
||||
.table = catalog->tables[CHUNK_CONSTRAINT].id,
|
||||
.index = catalog->tables[CHUNK_CONSTRAINT].index_ids[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX],
|
||||
.scantype = ScannerTypeIndex,
|
||||
.nkeys = 1,
|
||||
.scankey = scankey,
|
||||
.data = ctx,
|
||||
.tuple_found = chunk_constraint_dimension_id_tuple_found,
|
||||
.lockmode = AccessShareLock,
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
ctx->slice = slice;
|
||||
|
||||
ScanKeyInit(&scankey[0], Anum_chunk_constraint_chunk_id_dimension_id_idx_dimension_slice_id,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(slice->fd.id));
|
||||
|
||||
num_found = scanner_scan(&scanCtx);
|
||||
|
||||
return num_found;
|
||||
}
|
||||
|
@ -12,9 +12,19 @@ typedef struct ChunkConstraint
|
||||
FormData_chunk_constraint fd;
|
||||
} ChunkConstraint;
|
||||
|
||||
|
||||
typedef struct ChunkConstraintVec
|
||||
{
|
||||
int16 num_constraints;
|
||||
ChunkConstraint constraints[0];
|
||||
} ChunkConstraintVec;
|
||||
|
||||
typedef struct Chunk Chunk;
|
||||
typedef struct DimensionSlice DimensionSlice;
|
||||
typedef struct ChunkScanState ChunkScanState;
|
||||
|
||||
extern Chunk *chunk_constraint_scan(Chunk *chunk);
|
||||
extern int chunk_constraint_scan_by_dimension_slice(DimensionSlice *slice, ChunkScanState *ctx);
|
||||
|
||||
#endif /* TIMESCALEDB_CHUNK_CONSTRAINT_H */
|
||||
|
||||
|
@ -1,7 +1,6 @@
|
||||
#include <postgres.h>
|
||||
#include <access/relscan.h>
|
||||
#include <utils/lsyscache.h>
|
||||
#include <inttypes.h>
|
||||
|
||||
#include "catalog.h"
|
||||
#include "dimension.h"
|
||||
@ -108,7 +107,7 @@ point_to_string(Point *p)
|
||||
buf[0] = '(';
|
||||
|
||||
for (i = 0; i < p->cardinality; i++)
|
||||
j += snprintf(buf + j, 100, "%" PRId64 ",", p->coordinates[i]);
|
||||
j += snprintf(buf + j, 100, "" INT64_FORMAT ",", p->coordinates[i]);
|
||||
|
||||
buf[j-1] = ')';
|
||||
|
||||
|
@ -9,6 +9,8 @@
|
||||
#include "dimension.h"
|
||||
#include "chunk_constraint.h"
|
||||
|
||||
static DimensionVec *dimension_vec_expand(DimensionVec *vec, int32 new_size);
|
||||
|
||||
static inline DimensionSlice *
|
||||
dimension_slice_from_form_data(Form_dimension_slice fd)
|
||||
{
|
||||
@ -46,18 +48,21 @@ hypercube_free(Hypercube *hc)
|
||||
}
|
||||
|
||||
static bool
|
||||
dimension_slice_tuple_found(TupleInfo *ti, void *data)
|
||||
dimension_vec_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
DimensionSlice **ds = data;
|
||||
*ds = dimension_slice_from_tuple(ti->tuple);
|
||||
return false;
|
||||
DimensionVec **vecptr = data;
|
||||
DimensionSlice *slice = dimension_slice_from_tuple(ti->tuple);
|
||||
elog(NOTICE, "Found dimension slice [" INT64_FORMAT ", " INT64_FORMAT ")",
|
||||
slice->fd.range_start, slice->fd.range_end);
|
||||
dimension_vec_add_slice(vecptr, slice);
|
||||
return true;
|
||||
}
|
||||
|
||||
DimensionSlice *
|
||||
DimensionVec *
|
||||
dimension_slice_scan(int32 dimension_id, int64 coordinate)
|
||||
{
|
||||
Catalog *catalog = catalog_get();
|
||||
DimensionSlice *slice = NULL;
|
||||
DimensionVec *vec = dimension_vec_create(DIMENSION_VEC_DEFAULT_SIZE);
|
||||
ScanKeyData scankey[3];
|
||||
ScannerCtx scanCtx = {
|
||||
.table = catalog->tables[DIMENSION_SLICE].id,
|
||||
@ -65,27 +70,35 @@ dimension_slice_scan(int32 dimension_id, int64 coordinate)
|
||||
.scantype = ScannerTypeIndex,
|
||||
.nkeys = 3,
|
||||
.scankey = scankey,
|
||||
.data = &slice,
|
||||
.tuple_found = dimension_slice_tuple_found,
|
||||
.data = &vec,
|
||||
.tuple_found = dimension_vec_tuple_found,
|
||||
.lockmode = AccessShareLock,
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
elog(NOTICE, "Scanning dimension %d for coordinate " INT64_FORMAT "",
|
||||
dimension_id, coordinate);
|
||||
|
||||
/* Perform an index scan for slice matching the dimension's ID and which
|
||||
* encloses the coordinate */
|
||||
/* FIXME MAT: I don't think this is right BTGreaterEqualStrategyNumber searches for rows >= target
|
||||
* I think. Also I don't think BTLessStrategyNumber can be used with a forward scan
|
||||
* */
|
||||
ScanKeyInit(&scankey[0], Anum_dimension_slice_dimension_id_range_start_range_end_idx_dimension_id,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(dimension_id));
|
||||
ScanKeyInit(&scankey[1], Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_start,
|
||||
BTGreaterEqualStrategyNumber, F_INT8GE, Int64GetDatum(coordinate));
|
||||
BTLessEqualStrategyNumber, F_INT8LE, Int64GetDatum(coordinate));
|
||||
ScanKeyInit(&scankey[2], Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_end,
|
||||
BTLessStrategyNumber, F_INT8LT, Int64GetDatum(coordinate));
|
||||
BTGreaterStrategyNumber, F_INT8GT, Int64GetDatum(coordinate));
|
||||
|
||||
scanner_scan(&scanCtx);
|
||||
|
||||
return slice;
|
||||
return vec;
|
||||
}
|
||||
|
||||
static bool
|
||||
dimension_slice_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
DimensionSlice **slice = data;
|
||||
*slice = dimension_slice_from_tuple(ti->tuple);
|
||||
return false;
|
||||
}
|
||||
|
||||
static DimensionSlice *
|
||||
@ -138,7 +151,7 @@ hypercube_from_constraints(ChunkConstraint constraints[], int16 num_constraints)
|
||||
{
|
||||
Hypercube *hc = hypercube_alloc(num_constraints);
|
||||
int i;
|
||||
|
||||
|
||||
for (i = 0; i < num_constraints; i++)
|
||||
{
|
||||
DimensionSlice *slice = dimension_slice_scan_by_id(constraints[i].fd.dimension_slice_id);
|
||||
@ -146,136 +159,10 @@ hypercube_from_constraints(ChunkConstraint constraints[], int16 num_constraints)
|
||||
hc->slices[hc->num_slices++] = slice;
|
||||
}
|
||||
|
||||
Assert(hc->num_slices == hc->num_dimensions);
|
||||
hypercube_slice_sort(hc);
|
||||
return hc;
|
||||
}
|
||||
|
||||
static inline bool
|
||||
scan_dimensions(Hypercube *hc, Dimension *dimensions[], int16 num_dimensions, int64 point[])
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < num_dimensions; i++)
|
||||
{
|
||||
Dimension *d = dimensions[i];
|
||||
DimensionSlice *slice = dimension_slice_scan(d->fd.id, point[i]);
|
||||
|
||||
if (slice == NULL)
|
||||
return false;
|
||||
|
||||
hc->slices[hc->num_slices++] = slice;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
Hypercube *
|
||||
dimension_slice_point_scan(Hyperspace *space, int64 point[])
|
||||
{
|
||||
Hypercube *cube = hypercube_alloc(HYPERSPACE_NUM_DIMENSIONS(space));
|
||||
|
||||
if (!scan_dimensions(cube, space->open_dimensions, space->num_open_dimensions, point)) {
|
||||
hypercube_free(cube);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!scan_dimensions(cube, space->closed_dimensions, space->num_closed_dimensions, point)) {
|
||||
hypercube_free(cube);
|
||||
return NULL;
|
||||
}
|
||||
return cube;
|
||||
}
|
||||
|
||||
typedef struct PointScanCtx
|
||||
{
|
||||
Hyperspace *hs;
|
||||
Hypercube *hc;
|
||||
int64 *point;
|
||||
} PointScanCtx;
|
||||
|
||||
static inline
|
||||
DimensionSlice *match_dimension_slice(Form_dimension_slice slice, int64 point[],
|
||||
Dimension *dimensions[], int16 num_dimensions)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < num_dimensions; i++)
|
||||
{
|
||||
int32 dimension_id = dimensions[i]->fd.id;
|
||||
int64 coordinate = point[i];
|
||||
|
||||
if (slice->dimension_id == dimension_id && point_coordinate_is_in_slice(slice, coordinate))
|
||||
return dimension_slice_from_form_data(slice);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static bool
|
||||
point_filter(TupleInfo *ti, void *data)
|
||||
{
|
||||
PointScanCtx *ctx = data;
|
||||
Hyperspace *hs = ctx->hs;
|
||||
Hypercube *hc = ctx->hc;
|
||||
DimensionSlice *slice;
|
||||
|
||||
/* Match open dimension */
|
||||
slice = match_dimension_slice((Form_dimension_slice) GETSTRUCT(ti->tuple), ctx->point,
|
||||
hs->open_dimensions, hs->num_open_dimensions);
|
||||
|
||||
if (slice != NULL)
|
||||
{
|
||||
hc->slices[hc->num_slices++] = slice;
|
||||
return HYPERSPACE_NUM_DIMENSIONS(hs) == hc->num_slices;
|
||||
}
|
||||
|
||||
/* Match closed dimension */
|
||||
slice = match_dimension_slice((Form_dimension_slice) GETSTRUCT(ti->tuple), ctx->point,
|
||||
hs->closed_dimensions, hs->num_closed_dimensions);
|
||||
|
||||
if (slice != NULL)
|
||||
{
|
||||
hc->slices[hc->num_slices++] = slice;
|
||||
return HYPERSPACE_NUM_DIMENSIONS(hs) == hc->num_slices;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Given a N-dimensional point, scan for the hypercube that encloses it.
|
||||
*
|
||||
* NOTE: This assumes non-overlapping slices.
|
||||
*/
|
||||
Hypercube *
|
||||
dimension_slice_point_scan_heap(Hyperspace *space, int64 point[])
|
||||
{
|
||||
Catalog *catalog = catalog_get();
|
||||
Hypercube *cube = hypercube_alloc(HYPERSPACE_NUM_DIMENSIONS(space));
|
||||
PointScanCtx ctx = {
|
||||
.hs = space,
|
||||
.hc = cube,
|
||||
.point = point,
|
||||
};
|
||||
ScannerCtx scanCtx = {
|
||||
.table = catalog->tables[DIMENSION_SLICE].id,
|
||||
.scantype = ScannerTypeHeap,
|
||||
.nkeys = 0,
|
||||
.data = &ctx,
|
||||
.filter = point_filter,
|
||||
.tuple_found = dimension_slice_tuple_found,
|
||||
.lockmode = AccessShareLock,
|
||||
.scandirection = ForwardScanDirection,
|
||||
};
|
||||
|
||||
cube->num_slices = 0;
|
||||
|
||||
scanner_scan(&scanCtx);
|
||||
|
||||
return cube;
|
||||
}
|
||||
|
||||
void dimension_slice_free(DimensionSlice *slice)
|
||||
void dimension_slice_free(DimensionSlice *slice)
|
||||
{
|
||||
if (slice->storage_free != NULL)
|
||||
slice->storage_free(slice->storage);
|
||||
@ -320,66 +207,69 @@ cmp_coordinate_and_slice(const void *left, const void *right)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static DimensionAxis *
|
||||
dimension_axis_expand(DimensionAxis *axis, int32 new_size)
|
||||
static DimensionVec *
|
||||
dimension_vec_expand(DimensionVec *vec, int32 new_size)
|
||||
{
|
||||
if (axis != NULL && axis->num_slots >= new_size)
|
||||
return axis;
|
||||
if (vec != NULL && vec->num_slots >= new_size)
|
||||
return vec;
|
||||
|
||||
if (axis == NULL)
|
||||
{
|
||||
axis = palloc(sizeof(DimensionAxis) + sizeof(DimensionSlice *) * new_size);
|
||||
}
|
||||
if (NULL == vec)
|
||||
vec = palloc(DIMENSION_VEC_SIZE(new_size));
|
||||
else
|
||||
{
|
||||
axis = repalloc(axis, sizeof(DimensionAxis) + sizeof(DimensionSlice *) * new_size);
|
||||
}
|
||||
axis->num_slots = new_size;
|
||||
return axis;
|
||||
vec = repalloc(vec, DIMENSION_VEC_SIZE(new_size));
|
||||
|
||||
vec->num_slots = new_size;
|
||||
|
||||
return vec;
|
||||
}
|
||||
|
||||
DimensionAxis *
|
||||
dimension_axis_create(DimensionType type, int32 num_slices)
|
||||
DimensionVec *
|
||||
dimension_vec_create(int32 initial_num_slices)
|
||||
{
|
||||
DimensionAxis *axis = dimension_axis_expand(NULL, num_slices);
|
||||
axis->type = type;
|
||||
axis->num_slices = 0;
|
||||
return axis;
|
||||
DimensionVec *vec = dimension_vec_expand(NULL, initial_num_slices);
|
||||
vec->num_slots = initial_num_slices;
|
||||
vec->num_slices = 0;
|
||||
return vec;
|
||||
}
|
||||
|
||||
int32
|
||||
dimension_axis_add_slice(DimensionAxis **axis, DimensionSlice *slice)
|
||||
DimensionVec *
|
||||
dimension_vec_add_slice(DimensionVec **vecptr, DimensionSlice *slice)
|
||||
{
|
||||
if ((*axis)->num_slices + 1 > (*axis)->num_slots)
|
||||
*axis = dimension_axis_expand(*axis, (*axis)->num_slots + 10);
|
||||
DimensionVec *vec = *vecptr;
|
||||
|
||||
(*axis)->slices[(*axis)->num_slices++] = slice;
|
||||
if (vec->num_slices + 1 > vec->num_slots)
|
||||
*vecptr = vec = dimension_vec_expand(vec, vec->num_slots + 10);
|
||||
|
||||
return (*axis)->num_slices;
|
||||
vec->slices[vec->num_slices++] = slice;
|
||||
|
||||
return vec;
|
||||
}
|
||||
|
||||
int32
|
||||
dimension_axis_add_slice_sort(DimensionAxis **axis, DimensionSlice *slice)
|
||||
DimensionVec *
|
||||
dimension_vec_add_slice_sort(DimensionVec **vecptr, DimensionSlice *slice)
|
||||
{
|
||||
dimension_axis_add_slice(axis, slice);
|
||||
qsort((*axis)->slices, (*axis)->num_slices, sizeof(DimensionSlice *), cmp_slices);
|
||||
return (*axis)->num_slices;
|
||||
DimensionVec *vec = *vecptr;
|
||||
*vecptr = vec = dimension_vec_add_slice(vecptr, slice);
|
||||
qsort(vec->slices, vec->num_slices, sizeof(DimensionSlice *), cmp_slices);
|
||||
return vec;
|
||||
}
|
||||
|
||||
DimensionSlice *
|
||||
dimension_axis_find_slice(DimensionAxis *axis, int64 coordinate)
|
||||
dimension_vec_find_slice(DimensionVec *vec, int64 coordinate)
|
||||
{
|
||||
DimensionSlice ** res = bsearch(&coordinate, axis->slices, axis->num_slices, sizeof(DimensionSlice *), cmp_coordinate_and_slice);
|
||||
DimensionSlice **res = bsearch(&coordinate, vec->slices, vec->num_slices,
|
||||
sizeof(DimensionSlice *), cmp_coordinate_and_slice);
|
||||
if (res == NULL)
|
||||
return NULL;
|
||||
|
||||
return *res;
|
||||
}
|
||||
|
||||
void dimension_axis_free(DimensionAxis *axis)
|
||||
void dimension_vec_free(DimensionVec *vec)
|
||||
{
|
||||
int i;
|
||||
|
||||
for (i = 0; i < axis->num_slices; i++)
|
||||
dimension_slice_free(axis->slices[i]);
|
||||
pfree(axis);
|
||||
|
||||
for (i = 0; i < vec->num_slices; i++)
|
||||
dimension_slice_free(vec->slices[i]);
|
||||
pfree(vec);
|
||||
}
|
||||
|
@ -28,30 +28,36 @@ typedef struct Hypercube
|
||||
DimensionSlice *slices[0];
|
||||
} Hypercube;
|
||||
|
||||
#define HYPERCUBE_SIZE(num_dimensions) \
|
||||
#define HYPERCUBE_NUM_SLICES(hc) \
|
||||
((hc)->num_open_slices + (hc)->num_closed_slices)
|
||||
|
||||
#define HYPERCUBE_SIZE(num_dimensions) \
|
||||
(sizeof(Hypercube) + sizeof(DimensionSlice *) * num_dimensions)
|
||||
|
||||
/*
|
||||
* DimensionAxis is a collection of all slices (ranges) along one dimension for
|
||||
* a time range.
|
||||
* DimensionVec is a collection of slices (ranges) along one dimension for a
|
||||
* time range.
|
||||
*/
|
||||
typedef struct DimensionAxis
|
||||
typedef struct DimensionVec
|
||||
{
|
||||
DimensionType type;
|
||||
int32 num_slots; /* The allocated num slots in slices array */
|
||||
int32 num_slices; /* The current number of slices in slices array */
|
||||
DimensionSlice *slices[0];
|
||||
} DimensionAxis;
|
||||
} DimensionVec;
|
||||
|
||||
extern DimensionSlice *dimension_slice_scan(int32 dimension_id, int64 coordinate);
|
||||
#define DIMENSION_VEC_SIZE(num_slices) \
|
||||
(sizeof(DimensionVec) + sizeof(DimensionSlice *) * num_slices)
|
||||
|
||||
#define DIMENSION_VEC_DEFAULT_SIZE 10
|
||||
|
||||
extern DimensionVec *dimension_slice_scan(int32 dimension_id, int64 coordinate);
|
||||
extern Hypercube *dimension_slice_point_scan(Hyperspace *space, int64 point[]);
|
||||
extern Hypercube *dimension_slice_point_scan_heap(Hyperspace *space, int64 point[]);
|
||||
extern void dimension_slice_free(DimensionSlice *slice);
|
||||
extern DimensionAxis *dimension_axis_create(DimensionType type, int32 num_slices);
|
||||
extern int32 dimension_axis_add_slice(DimensionAxis **axis, DimensionSlice *slice);
|
||||
extern int32 dimension_axis_add_slice_sort(DimensionAxis **axis, DimensionSlice *slice);
|
||||
extern DimensionSlice *dimension_axis_find_slice(DimensionAxis *axis, int64 coordinate);
|
||||
extern void dimension_axis_free(DimensionAxis *axis);
|
||||
extern DimensionVec *dimension_vec_create(int32 initial_num_slices);
|
||||
extern DimensionVec *dimension_vec_add_slice(DimensionVec **vec, DimensionSlice *slice);
|
||||
extern DimensionVec *dimension_vec_add_slice_sort(DimensionVec **vec, DimensionSlice *slice);
|
||||
extern DimensionSlice *dimension_vec_find_slice(DimensionVec *vec, int64 coordinate);
|
||||
extern void dimension_vec_free(DimensionVec *vec);
|
||||
extern Hypercube *hypercube_from_constraints(ChunkConstraint constraints[], int16 num_constraints);
|
||||
|
||||
#endif /* TIMESCALEDB_DIMENSION_SLICE_H */
|
||||
|
@ -12,12 +12,12 @@ hypertable_from_tuple(HeapTuple tuple)
|
||||
{
|
||||
Hypertable *h;
|
||||
Oid namespace_oid;
|
||||
|
||||
|
||||
h = palloc0(sizeof(Hypertable));
|
||||
memcpy(&h->fd, GETSTRUCT(tuple), sizeof(FormData_hypertable));
|
||||
namespace_oid = get_namespace_oid(NameStr(h->fd.schema_name), false);
|
||||
h->main_table_relid = get_relname_relid(NameStr(h->fd.table_name), namespace_oid);
|
||||
h->space = dimension_scan(h->fd.id, h->main_table_relid);
|
||||
h->space = dimension_scan(h->fd.id, h->main_table_relid);
|
||||
h->chunk_cache = subspace_store_init(h->space->num_closed_dimensions + h->space->num_open_dimensions);
|
||||
|
||||
return h;
|
||||
@ -27,7 +27,7 @@ Dimension *
|
||||
hypertable_get_open_dimension(Hypertable *h)
|
||||
{
|
||||
if (h->space->num_open_dimensions == 0)
|
||||
return NULL;
|
||||
return NULL;
|
||||
|
||||
Assert(h->space->num_open_dimensions == 1);
|
||||
return h->space->open_dimensions[0];
|
||||
@ -38,26 +38,32 @@ hypertable_get_closed_dimension(Hypertable *h)
|
||||
{
|
||||
if (h->space->num_closed_dimensions == 0)
|
||||
return NULL;
|
||||
|
||||
|
||||
Assert(h->space->num_closed_dimensions == 1);
|
||||
return h->space->closed_dimensions[0];
|
||||
}
|
||||
|
||||
Chunk *hypertable_get_chunk(Hypertable *h, Point *point)
|
||||
Chunk *hypertable_get_chunk(Hypertable *h, Point *point)
|
||||
{
|
||||
Chunk * chunk = subspace_store_get(h->chunk_cache, point);
|
||||
Chunk *chunk = subspace_store_get(h->chunk_cache, point);
|
||||
|
||||
if (NULL == chunk)
|
||||
if (NULL == chunk)
|
||||
{
|
||||
Hypercube *hc;
|
||||
chunk = chunk_get_or_create(h->space, point);
|
||||
|
||||
if (NULL == chunk)
|
||||
elog(ERROR, "No chunk found or created");
|
||||
|
||||
chunk_constraint_scan(chunk);
|
||||
chunk = chunk_find(h->space, point);
|
||||
|
||||
if (NULL == chunk)
|
||||
{
|
||||
chunk = chunk_create_new(h->space, point);
|
||||
elog(NOTICE, "Created new chunk %d", chunk->fd.id);
|
||||
}
|
||||
|
||||
Assert(NULL != chunk);
|
||||
|
||||
hc = hypercube_from_constraints(chunk->constraints, chunk->num_constraints);
|
||||
subspace_store_add(h->chunk_cache, hc, chunk, pfree);
|
||||
chunk->cube = hc;
|
||||
subspace_store_add(h->chunk_cache, hc, chunk, pfree);
|
||||
}
|
||||
|
||||
return chunk;
|
||||
|
@ -28,6 +28,4 @@ extern Dimension *hypertable_get_closed_dimension(Hypertable *h);
|
||||
|
||||
extern Chunk *hypertable_get_chunk(Hypertable *h, Point *point);
|
||||
|
||||
|
||||
|
||||
#endif /* TIMESCALEDB_HYPERTABLE_H */
|
||||
|
@ -85,7 +85,7 @@ insert_statement_state_is_valid_for_point(InsertStatementState *state, Point *p)
|
||||
return true;
|
||||
}
|
||||
|
||||
static void destroy_ics(void *ics_ptr)
|
||||
static void destroy_ics(void *ics_ptr)
|
||||
{
|
||||
InsertChunkState *ics = ics_ptr;
|
||||
insert_chunk_state_destroy(ics);
|
||||
@ -100,29 +100,26 @@ insert_statement_state_get_insert_chunk_state(InsertStatementState *state, Hyper
|
||||
{
|
||||
|
||||
InsertChunkState *ics;
|
||||
|
||||
|
||||
if (NULL == state->cache)
|
||||
{
|
||||
state->cache = subspace_store_init(point->cardinality);
|
||||
}
|
||||
|
||||
ics = subspace_store_get(state->cache, point);
|
||||
|
||||
if (NULL == ics)
|
||||
if (NULL == ics)
|
||||
{
|
||||
Chunk * new_chunk;
|
||||
Hypercube *hc;
|
||||
Chunk *new_chunk;
|
||||
|
||||
new_chunk = hypertable_get_chunk(state->hypertable, point);
|
||||
|
||||
if (NULL == new_chunk)
|
||||
elog(ERROR, "No chunk found or created");
|
||||
|
||||
|
||||
dimension_slice_scan(hs->open_dimensions[0]->fd.id, point->coordinates[0]);
|
||||
|
||||
ics = insert_chunk_state_new(new_chunk);
|
||||
chunk_constraint_scan(new_chunk);
|
||||
hc = hypercube_from_constraints(new_chunk->constraints, new_chunk->num_constraints);
|
||||
subspace_store_add(state->cache, hc, ics, destroy_ics);
|
||||
subspace_store_add(state->cache, new_chunk->cube, ics, destroy_ics);
|
||||
}
|
||||
|
||||
|
||||
return ics;
|
||||
}
|
||||
|
@ -13,6 +13,7 @@ typedef struct DimensionSlice DimensionSlice;
|
||||
typedef struct DimensionAxis DimensionAxis;
|
||||
typedef struct Point Point;
|
||||
typedef struct InsertStateCache InsertStateCache;
|
||||
|
||||
/* State used for every tuple in an insert statement */
|
||||
typedef struct
|
||||
{
|
||||
@ -32,7 +33,4 @@ InsertStatementState *insert_statement_state_new(Oid);
|
||||
void insert_statement_state_destroy(InsertStatementState *);
|
||||
InsertChunkState *insert_statement_state_get_insert_chunk_state(InsertStatementState *cache, Hyperspace *hs, Point *point);
|
||||
|
||||
|
||||
|
||||
|
||||
#endif /* TIMESCALEDB_INSERT_STATEMENT_STATE_H */
|
||||
|
@ -67,6 +67,14 @@ prepare_plan(const char *src, int nargs, Oid *argtypes)
|
||||
/* plan for getting a chunk via get_or_create_chunk(). */
|
||||
DEFINE_PLAN(get_chunk_plan, CHUNK_QUERY, 4, CHUNK_QUERY_ARGS)
|
||||
|
||||
|
||||
#define CHUNK_CREATE_ARGS (Oid[]) {INT4OID, INT8OID, INT4OID, INT8OID}
|
||||
#define CHUNK_CREATE "SELECT * \
|
||||
FROM _timescaledb_internal.chunk_create($1, $2, $3, $4)"
|
||||
|
||||
/* plan for creating a chunk via create_chunk(). */
|
||||
DEFINE_PLAN(create_chunk_plan, CHUNK_CREATE, 4, CHUNK_CREATE_ARGS)
|
||||
|
||||
static HeapTuple
|
||||
chunk_tuple_create_spi_connected(int32 time_dimension_id, int64 time_value,
|
||||
int32 space_dimension_id, int64 space_value,
|
||||
@ -119,7 +127,35 @@ spi_chunk_get_or_create(int32 time_dimension_id, int64 time_value,
|
||||
&desc, plan);
|
||||
|
||||
old = MemoryContextSwitchTo(top);
|
||||
chunk = chunk_create(tuple, num_constraints);
|
||||
chunk = chunk_create_from_tuple(tuple, num_constraints);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
SPI_finish();
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
|
||||
Chunk *
|
||||
spi_chunk_create(int32 time_dimension_id, int64 time_value,
|
||||
int32 space_dimension_id, int64 space_value,
|
||||
int16 num_constraints)
|
||||
{
|
||||
HeapTuple tuple;
|
||||
TupleDesc desc;
|
||||
Chunk *chunk;
|
||||
MemoryContext old, top = CurrentMemoryContext;
|
||||
SPIPlanPtr plan = create_chunk_plan();
|
||||
|
||||
if (SPI_connect() < 0)
|
||||
elog(ERROR, "Got an SPI connect error");
|
||||
|
||||
tuple = chunk_tuple_create_spi_connected(time_dimension_id, time_value,
|
||||
space_dimension_id, space_value,
|
||||
&desc, plan);
|
||||
|
||||
old = MemoryContextSwitchTo(top);
|
||||
chunk = chunk_create_from_tuple(tuple, num_constraints);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
SPI_finish();
|
||||
|
@ -9,4 +9,8 @@ extern Chunk *spi_chunk_get_or_create(int32 time_dimension_id, int64 time_value,
|
||||
int32 space_dimension_id, int64 space_value,
|
||||
int16 num_constraints);
|
||||
|
||||
extern Chunk *spi_chunk_create(int32 time_dimension_id, int64 time_value,
|
||||
int32 space_dimension_id, int64 space_value,
|
||||
int16 num_constraints);
|
||||
|
||||
#endif /* TIMESCALEDB_METADATA_QUERIES_H */
|
||||
|
@ -5,22 +5,20 @@
|
||||
#include "subspace_store.h"
|
||||
|
||||
typedef struct SubspaceStore {
|
||||
int16 num_dimensions;
|
||||
DimensionAxis *origin; //origin of the tree
|
||||
int16 num_dimensions;
|
||||
DimensionVec *origin; /* origin of the tree */
|
||||
} SubspaceStore;
|
||||
|
||||
static DimensionAxis *
|
||||
static DimensionVec *
|
||||
subspace_store_dimension_create()
|
||||
{
|
||||
/* TODO remove type from axis */
|
||||
return dimension_axis_create(DIMENSION_TYPE_OPEN, 10);
|
||||
return dimension_vec_create(10);
|
||||
}
|
||||
|
||||
SubspaceStore *
|
||||
subspace_store_init(int16 num_dimensions)
|
||||
{
|
||||
SubspaceStore *sst = palloc(sizeof(SubspaceStore));
|
||||
|
||||
sst->origin = subspace_store_dimension_create();
|
||||
sst->num_dimensions = num_dimensions;
|
||||
return sst;
|
||||
@ -29,13 +27,13 @@ subspace_store_init(int16 num_dimensions)
|
||||
static void
|
||||
subspace_store_free_internal_node(void * node)
|
||||
{
|
||||
dimension_axis_free((DimensionAxis *)node);
|
||||
dimension_vec_free((DimensionVec *)node);
|
||||
}
|
||||
|
||||
void subspace_store_add(SubspaceStore *cache, Hypercube *hc,
|
||||
void *end_store, void (*end_store_free)(void *))
|
||||
void *end_store, void (*end_store_free)(void *))
|
||||
{
|
||||
DimensionAxis *axis = cache->origin;
|
||||
DimensionVec *vec = cache->origin;
|
||||
DimensionSlice *last = NULL;
|
||||
int i;
|
||||
|
||||
@ -48,27 +46,28 @@ void subspace_store_add(SubspaceStore *cache, Hypercube *hc,
|
||||
|
||||
Assert(target->storage == NULL);
|
||||
|
||||
if (axis == NULL)
|
||||
if (vec == NULL)
|
||||
{
|
||||
last->storage = subspace_store_dimension_create();
|
||||
last->storage_free = subspace_store_free_internal_node;
|
||||
axis = last->storage;
|
||||
}
|
||||
if(axis->num_slices > 0)
|
||||
{
|
||||
Assert(axis->slices[0]->fd.dimension_id = target->fd.dimension_id);
|
||||
vec = last->storage;
|
||||
}
|
||||
|
||||
match = dimension_axis_find_slice(axis, target->fd.range_start);
|
||||
if (vec->num_slices > 0)
|
||||
{
|
||||
Assert(vec->slices[0]->fd.dimension_id == target->fd.dimension_id);
|
||||
}
|
||||
|
||||
match = dimension_vec_find_slice(vec, target->fd.range_start);
|
||||
|
||||
if (match == NULL)
|
||||
{
|
||||
dimension_axis_add_slice_sort(&axis, target);
|
||||
dimension_vec_add_slice_sort(&vec, target);
|
||||
match = target;
|
||||
}
|
||||
|
||||
last = match;
|
||||
axis = last->storage; /* Internal nodes point to the next Dimension's Axis */
|
||||
vec = last->storage; /* internal nodes point to the next dimension's vector */
|
||||
}
|
||||
|
||||
Assert(last->storage == NULL);
|
||||
@ -79,20 +78,20 @@ void subspace_store_add(SubspaceStore *cache, Hypercube *hc,
|
||||
void *
|
||||
subspace_store_get(SubspaceStore *cache, Point *target)
|
||||
{
|
||||
int16 i;
|
||||
DimensionAxis *axis = cache->origin;
|
||||
int i;
|
||||
DimensionVec *vec = cache->origin;
|
||||
DimensionSlice *match = NULL;
|
||||
|
||||
Assert(target->cardinality == cache->num_dimensions);
|
||||
|
||||
for (i = 0; i < target->cardinality; i++)
|
||||
{
|
||||
match = dimension_axis_find_slice(axis, target->coordinates[i]);
|
||||
match = dimension_vec_find_slice(vec, target->coordinates[i]);
|
||||
|
||||
if (NULL == match)
|
||||
return NULL;
|
||||
|
||||
axis = match->storage;
|
||||
vec = match->storage;
|
||||
}
|
||||
return match->storage;
|
||||
}
|
||||
@ -101,13 +100,13 @@ static bool
|
||||
subspace_store_match_first(SubspaceStore *cache, Point *target)
|
||||
{
|
||||
Assert(target->cardinality == cache->num_dimensions);
|
||||
return (dimension_axis_find_slice(cache->origin, target->coordinates[0]) != NULL);
|
||||
return dimension_vec_find_slice(cache->origin, target->coordinates[0]) != NULL;
|
||||
}
|
||||
|
||||
void
|
||||
subspace_store_free(SubspaceStore *cache)
|
||||
{
|
||||
dimension_axis_free(cache->origin);
|
||||
dimension_vec_free(cache->origin);
|
||||
pfree(cache);
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user