This commit is contained in:
Matvey Arye 2017-03-02 22:07:55 -05:00
parent 2404940315
commit 4d4ac78ef5
16 changed files with 235 additions and 327 deletions

View File

@ -1,97 +1,78 @@
#include "cache.h"
void
cache_init(Cache *cache)
{
MemoryContext ctx, old;
if (cache->store != NULL)
if (cache->htab != NULL)
{
elog(ERROR, "Cache %s is already initialized", cache->name);
return;
}
ctx = AllocSetContextCreate(CacheMemoryContext,
cache->name,
ALLOCSET_DEFAULT_SIZES);
old = MemoryContextSwitchTo(ctx);
cache->store = palloc(sizeof(CacheStorage));
Assert(cache->hctl.hcxt == NULL);
cache->hctl.hcxt = ctx;
cache->store->htab = hash_create(cache->name, cache->numelements,
cache->htab = hash_create(cache->name, cache->numelements,
&cache->hctl, cache->flags);
cache->hctl.hcxt = NULL;
cache->store->mcxt = ctx;
cache->store->valid = true;
cache->store->refcount = 0;
cache->store->destroy_storage_hook = cache->destroy_storage_hook;
MemoryContextSwitchTo(old);
cache->refcount = 1;
}
static void
storage_destroy(CacheStorage *store) {
store->valid = false;
if (store->refcount > 0) {
//will be destroyed later
cache_destroy(Cache *cache) {
if (cache->refcount > 0) {
/* will be destroyed later */
return;
}
if (store->destroy_storage_hook != NULL)
store->destroy_storage_hook(store);
if (cache->pre_destroy_hook != NULL)
cache->pre_destroy_hook(cache);
hash_destroy(store->htab);
MemoryContextDelete(store->mcxt);
hash_destroy(cache->htab);
cache->htab = NULL;
MemoryContextDelete(cache->hctl.hcxt);
cache->hctl.hcxt = NULL;
}
void
cache_invalidate(Cache *cache)
{
if (cache->store == NULL)
if (cache == NULL)
return;
storage_destroy(cache->store);
cache->store = NULL;
cache_init(cache); //start new store
cache->refcount--;
cache_destroy(cache);
}
/*
* Pinning storage is needed if any items returned by the cache
* Pinning is needed if any items returned by the cache
* may need to survive invalidation events (i.e. AcceptInvalidationMessages() may be called).
*
* Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open.
* Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open).
*
* Each call to cache_pin_storage MUST BE paired with a call to cache_release_storage.
* Each call to cache_pin MUST BE paired with a call to cache_release.
*
*/
extern CacheStorage *cache_pin_storage(Cache *cache)
extern Cache *cache_pin(Cache *cache)
{
cache->store->refcount++;
return cache->store;
cache->refcount++;
return cache;
}
extern void cache_release_storage(CacheStorage *store)
extern void cache_release(Cache *cache)
{
Assert(store->refcount > 0);
store->refcount--;
if (!store->valid) {
storage_destroy(store);
}
Assert(cache->refcount > 0);
cache->refcount--;
cache_destroy(cache);
}
MemoryContext
cache_memory_ctx(Cache *cache)
{
return cache->store->mcxt;
return cache->hctl.hcxt;
}
MemoryContext
cache_switch_to_memory_context(Cache *cache)
{
return MemoryContextSwitchTo(cache->store->mcxt);
return MemoryContextSwitchTo(cache->hctl.hcxt);
}
void *
@ -99,12 +80,12 @@ cache_fetch(Cache *cache, CacheQueryCtx *ctx)
{
bool found;
if (cache->store->htab == NULL)
if (cache->htab == NULL)
{
elog(ERROR, "Hash %s not initialized", cache->name);
}
ctx->entry = hash_search(cache->store->htab, cache->get_key(ctx), HASH_ENTER, &found);
ctx->entry = hash_search(cache->htab, cache->get_key(ctx), HASH_ENTER, &found);
if (!found && cache->create_entry != NULL)
{

View File

@ -11,25 +11,18 @@ typedef struct CacheQueryCtx
void *private[0];
} CacheQueryCtx;
typedef struct CacheStorage {
HTAB *htab;
MemoryContext mcxt;
int refcount;
bool valid;
void (*destroy_storage_hook) (struct CacheStorage *);
} CacheStorage;
typedef struct Cache
{
HASHCTL hctl;
CacheStorage *store;
const char *name;
HTAB *htab;
int refcount;
const char *name;
long numelements;
int flags;
void *(*get_key) (struct CacheQueryCtx *);
void *(*create_entry) (struct Cache *, CacheQueryCtx *);
void *(*update_entry) (struct Cache *, CacheQueryCtx *);
void (*destroy_storage_hook) (struct CacheStorage *);
void (*pre_destroy_hook) (struct Cache *);
} Cache;
extern void cache_init(Cache *cache);
@ -39,7 +32,7 @@ extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx);
extern MemoryContext cache_memory_ctx(Cache *cache);
extern MemoryContext cache_switch_to_memory_context(Cache *cache);
extern CacheStorage *cache_pin_storage(Cache *cache);
extern void cache_release_storage(CacheStorage *store);
extern Cache *cache_pin(Cache *cache);
extern void cache_release(Cache *cache);
#endif /* _IOBEAMDB_CACHE_H_ */

View File

@ -57,10 +57,10 @@ inval_cache_callback(Datum arg, Oid relid)
return;
if (relid == InvalidOid || relid == hypertable_cache_inval_proxy_oid)
invalidate_hypertable_cache_callback();
hypertable_cache_invalidate_callback();
if (relid == InvalidOid || relid == chunk_cache_inval_proxy_oid)
invalidate_chunk_cache_callback();
chunk_crn_set_cache_invalidate_callback();
}
/*

View File

@ -40,9 +40,6 @@ typedef struct chunk_crn_set_htable_entry
typedef struct ChunkCacheQueryCtx
{
CacheQueryCtx cctx;
hypertable_cache_entry *hci;
epoch_and_partitions_set *pe_entry;
Partition *part;
int32 chunk_id;
int64 chunk_start_time;
int64 chunk_end_time;
@ -57,31 +54,40 @@ chunk_crn_set_cache_get_key(CacheQueryCtx *ctx)
static void *chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx);
static void *chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx);
static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx);
static Cache *chunk_crn_set_cache_create() {
MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext,
CHUNK_CACHE_INVAL_PROXY_TABLE,
ALLOCSET_DEFAULT_SIZES);
static Cache chunk_crn_set_cache = {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(chunk_crn_set_htable_entry),
.hcxt = NULL,
},
.name = CHUNK_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = chunk_crn_set_cache_get_key,
.create_entry = chunk_crn_set_cache_create_entry,
.update_entry = chunk_crn_set_cache_update_entry,
};
Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache));
*cache = (Cache) {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(chunk_crn_set_htable_entry),
.hcxt = ctx,
},
.name = CHUNK_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = chunk_crn_set_cache_get_key,
.create_entry = chunk_crn_set_cache_create_entry,
.update_entry = chunk_crn_set_cache_update_entry,
};
cache_init(cache);
return cache;
}
static Cache *chunk_crn_set_cache_current = NULL;
static void *
chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx)
{
ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx;
chunk_crn_set_htable_entry *pe = ctx->entry;
char *insert_sql;
MemoryContext old;
insert_sql = get_copy_table_insert_sql(cctx);
pe->chunk_id = cctx->chunk_id;
pe->start_time = cctx->chunk_start_time;
pe->end_time = cctx->chunk_end_time;
@ -99,15 +105,12 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx)
{
ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx;
chunk_crn_set_htable_entry *pe = ctx->entry;
char *insert_sql;
MemoryContext old;
if (pe->start_time == cctx->chunk_start_time &&
pe->end_time == cctx->chunk_end_time)
return pe;
insert_sql = get_copy_table_insert_sql(cctx);
old = cache_switch_to_memory_context(cache);
pe->crns = fetch_crn_set(NULL, pe->chunk_id);
MemoryContextSwitchTo(old);
@ -116,29 +119,36 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx)
}
void
invalidate_chunk_cache_callback(void)
chunk_crn_set_cache_invalidate_callback(void)
{
CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache");
cache_invalidate(&chunk_crn_set_cache);
CACHE1_elog(WARNING, "DESTROY chunk_crn_set_cache");
cache_invalidate(chunk_crn_set_cache_current);
chunk_crn_set_cache_current = chunk_crn_set_cache_create();
}
static chunk_crn_set_htable_entry *
get_chunk_crn_set_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int32 chunk_id, int64 chunk_start_time,
int64 chunk_end_time)
chunk_crn_set_cache_get_entry(Cache *cache, int32 chunk_id, int64 chunk_start_time, int64 chunk_end_time)
{
if (cache == NULL)
{
cache = chunk_crn_set_cache_current;
}
ChunkCacheQueryCtx ctx = {
.hci = hci,
.pe_entry = pe_entry,
.part = part,
.chunk_id = chunk_id,
.chunk_start_time = chunk_start_time,
.chunk_end_time = chunk_end_time,
};
return cache_fetch(&chunk_crn_set_cache, &ctx.cctx);
return cache_fetch(cache, &ctx.cctx);
}
extern Cache *
chunk_crn_set_cache_pin()
{
return cache_pin(chunk_crn_set_cache_current);
}
static chunk_row *
chunk_row_create(int32 id, int32 partition_id, int64 starttime, int64 endtime)
{
@ -242,9 +252,12 @@ chunk_scan(int32 partition_id, int64 timepoint, bool tuplock)
return cctx.chunk;
}
/*
* Get chunk cache entry.
* The cache parameter is a chunk_crn_set_cache (can be null to use current cache).
*/
chunk_cache_entry *
get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int64 timepoint, bool lock)
get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock)
{
chunk_crn_set_htable_entry *chunk_crn_cache;
chunk_cache_entry *entry;
@ -260,87 +273,21 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_
entry = palloc(sizeof(chunk_cache_entry));
entry->chunk = chunk;
entry->id = chunk->id;
chunk_crn_cache = get_chunk_crn_set_cache_entry(hci, pe_entry, part, chunk->id,
chunk->start_time, chunk->end_time);
chunk_crn_cache = chunk_crn_set_cache_get_entry(cache, chunk->id,
chunk->start_time, chunk->end_time);
entry->crns = chunk_crn_cache->crns;
return entry;
}
static char *
get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx)
{
StringInfo where_clause = makeStringInfo();
StringInfo insert_clauses = makeStringInfo();
StringInfo sql_insert = makeStringInfo();
ListCell *cell;
int i;
crn_set *crn = fetch_crn_set(NULL, ctx->chunk_id);
appendStringInfo(where_clause, "WHERE TRUE");
if (ctx->pe_entry->num_partitions > 1)
{
appendStringInfo(where_clause, " AND (%s.%s(%s::TEXT, %d) BETWEEN %d AND %d)",
quote_identifier(ctx->pe_entry->partitioning->partfunc.schema),
quote_identifier(ctx->pe_entry->partitioning->partfunc.name),
quote_identifier(ctx->pe_entry->partitioning->column),
ctx->pe_entry->partitioning->partfunc.modulos,
ctx->part->keyspace_start,
ctx->part->keyspace_end);
}
if (ctx->chunk_start_time != OPEN_START_TIME)
{
appendStringInfo(where_clause, " AND (%1$s >= %2$s) ",
quote_identifier(ctx->hci->time_column_name),
internal_time_to_column_literal_sql(ctx->chunk_start_time,
ctx->hci->time_column_type));
}
if (ctx->chunk_end_time != OPEN_END_TIME)
{
appendStringInfo(where_clause, " AND (%1$s <= %2$s) ",
quote_identifier(ctx->hci->time_column_name),
internal_time_to_column_literal_sql(ctx->chunk_end_time,
ctx->hci->time_column_type));
}
i = 0;
foreach(cell, crn->tables)
{
crn_row *tab = lfirst(cell);
i = i + 1;
appendStringInfo(insert_clauses, "i_%d AS (INSERT INTO %s.%s SELECT * FROM selected)",
i,
quote_identifier(tab->schema_name.data),
quote_identifier(tab->table_name.data)
);
}
pfree(crn);
crn = NULL;
appendStringInfo(sql_insert, "\
WITH selected AS ( DELETE FROM ONLY %1$s %2$s RETURNING * ), \
%3$s \
SELECT 1", copy_table_name(ctx->hci->id),
where_clause->data,
insert_clauses->data);
return sql_insert->data;
}
void
_chunk_cache_init(void)
{
CreateCacheMemoryContext();
cache_init(&chunk_crn_set_cache);
chunk_crn_set_cache_current = chunk_crn_set_cache_create();
}
void
_chunk_cache_fini(void)
{
cache_invalidate(&chunk_crn_set_cache);
cache_invalidate(chunk_crn_set_cache_current);
}

View File

@ -5,6 +5,7 @@
#include <executor/spi.h>
#include "metadata_queries.h"
#include "cache.h"
#define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk"
#define CHUNK_CACHE_INVAL_PROXY_OID \
@ -20,14 +21,14 @@ typedef struct chunk_cache_entry
int32 id;
chunk_row *chunk;
crn_set *crns;
SPIPlanPtr move_from_copyt_plan;
} chunk_cache_entry;
extern chunk_cache_entry *get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock);
extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int64 time_pt, bool lock);
extern void invalidate_chunk_cache_callback(void);
extern void chunk_crn_set_cache_invalidate_callback(void);
extern Cache *chunk_crn_set_cache_pin(void);
extern void _chunk_cache_init(void);
extern void _chunk_cache_fini(void);

View File

@ -28,19 +28,33 @@ hypertable_cache_get_key(CacheQueryCtx *ctx)
return &((HypertableCacheQueryCtx *) ctx)->hypertable_id;
}
static Cache hypertable_cache = {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(hypertable_cache_entry),
.hcxt = NULL,
},
.name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = hypertable_cache_get_key,
.create_entry = hypertable_cache_create_entry,
};
static Cache *hypertable_cache_create() {
MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext,
HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
ALLOCSET_DEFAULT_SIZES);
Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache));
*cache = (Cache) {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(hypertable_cache_entry),
.hcxt = ctx,
},
.name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = hypertable_cache_get_key,
.create_entry = hypertable_cache_create_entry,
};
cache_init(cache);
return cache;
}
static Cache *hypertable_cache_current = NULL;
/* Column numbers for 'hypertable' table in sql/common/tables.sql */
#define HT_TBL_COL_ID 1
#define HT_TBL_COL_TIME_COL_NAME 10
@ -101,22 +115,23 @@ hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx)
}
void
invalidate_hypertable_cache_callback(void)
hypertable_cache_invalidate_callback(void)
{
CACHE1_elog(WARNING, "DESTROY hypertable_cache");
cache_invalidate(&hypertable_cache);
cache_invalidate(hypertable_cache_current);
hypertable_cache_current = hypertable_cache_create();
}
/* Get hypertable cache entry. If the entry is not in the cache, add it. */
hypertable_cache_entry *
hypertable_cache_get(int32 hypertable_id)
hypertable_cache_get_entry(Cache *cache, int32 hypertable_id)
{
HypertableCacheQueryCtx ctx = {
.hypertable_id = hypertable_id,
};
return cache_fetch(&hypertable_cache, &ctx.cctx);
return cache_fetch(cache, &ctx.cctx);
}
/* function to compare epochs */
@ -140,7 +155,7 @@ cmp_epochs(const void *time_pt_pointer, const void *test)
}
epoch_and_partitions_set *
hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid)
hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid)
{
MemoryContext old;
epoch_and_partitions_set *epoch,
@ -166,7 +181,7 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt,
return (*cache_entry);
}
old = cache_switch_to_memory_context(&hypertable_cache);
old = cache_switch_to_memory_context(cache);
epoch = partition_epoch_scan(hce->id, time_pt, relid);
/* check if full */
@ -191,21 +206,21 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt,
return epoch;
}
extern CacheStorage *
hypertable_cache_pin_storage()
extern Cache *
hypertable_cache_pin()
{
return cache_pin_storage(&hypertable_cache);
return cache_pin(hypertable_cache_current);
}
void _hypertable_cache_init(void)
{
CreateCacheMemoryContext();
cache_init(&hypertable_cache);
hypertable_cache_current = hypertable_cache_create();
}
void
_hypertable_cache_fini(void)
{
cache_invalidate(&hypertable_cache);
cache_invalidate(hypertable_cache_current);
}

View File

@ -24,14 +24,14 @@ typedef struct hypertable_cache_entry
epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY];
} hypertable_cache_entry;
hypertable_cache_entry *hypertable_cache_get(int32 hypertable_id);
hypertable_cache_entry *hypertable_cache_get_entry(Cache * cache, int32 hypertable_id);
epoch_and_partitions_set *
hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid);
hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid);
void invalidate_hypertable_cache_callback(void);
void hypertable_cache_invalidate_callback(void);
extern CacheStorage *hypertable_cache_pin_storage(void);
extern Cache *hypertable_cache_pin(void);
void _hypertable_cache_init(void);
void _hypertable_cache_fini(void);

View File

@ -46,6 +46,8 @@
#include "metadata_queries.h"
#include "partitioning.h"
#include "scanner.h"
#include "catalog.h"
#include "pgmurmur3.h"
#include <utils/tqual.h>
#include <utils/rls.h>
@ -56,7 +58,8 @@
#define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger"
/* private funcs */
static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions);
static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info,epoch_and_partitions_set *epoch);
static Node *get_keyspace_fn_call(PartitioningInfo *part_info);
/*
@ -133,7 +136,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx)
static void
chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple)
chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple)
{
int hi_options = 0; /* no optimization */
CommandId mycid = GetCurrentCommandId(true);
@ -161,16 +164,19 @@ chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, H
typedef struct ChunkInsertCtx
{
chunk_cache_entry *chunk;
Cache *pinned;
List *ctxs;
} ChunkInsertCtx;
static ChunkInsertCtx *
chunk_insert_ctx_new(chunk_cache_entry *chunk)
chunk_insert_ctx_new(chunk_cache_entry *chunk, Cache *pinned)
{
/* int num_tables = list_length(chunk->crns->tables); */
ListCell *lc;
List *rel_ctx_list = NIL;
ChunkInsertCtx *ctx;
ctx = palloc(sizeof(ChunkInsertCtx));
ctx->pinned = pinned;
foreach(lc, chunk->crns->tables)
{
@ -234,7 +240,6 @@ chunk_insert_ctx_new(chunk_cache_entry *chunk)
rel_ctx_list = lappend(rel_ctx_list, rel_ctx);
}
ctx = palloc(sizeof(ChunkInsertCtx));
ctx->ctxs = rel_ctx_list;
ctx->chunk = chunk;
return ctx;
@ -250,6 +255,8 @@ chunk_insert_ctx_destroy(ChunkInsertCtx *ctx)
return;
}
cache_release(ctx->pinned);
foreach(lc, ctx->ctxs)
{
ChunkInsertCtxRel *rel_ctx = lfirst(lc);
@ -265,7 +272,7 @@ chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup)
foreach(lc, ctx->ctxs)
{
ChunkInsertCtxRel *rel_ctx = lfirst(lc);
chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(rel_ctx, tup);
chunk_insert_ctx_rel_insert_tuple(rel_ctx, tup);
}
}
@ -286,11 +293,10 @@ copy_table_tuple_found(TupleInfo *ti, void *data)
if (ctx->pe->num_partitions > 1)
{
//Datum partition_datum = index_getattr(scan->xs_itup, 1, scan->xs_itupdesc, &is_null);
/* first element is partition index (used for sorting but not necessary here) */
Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null);
Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null);
//partition_no = DatumGetInt16(partition_datum);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = DatumGetInt16(keyspace_datum);
}
@ -298,18 +304,18 @@ copy_table_tuple_found(TupleInfo *ti, void *data)
{
Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = -1;
keyspace_pt = KEYSPACE_PT_NO_PARTITIONING;
}
if (ctx->chunk_ctx != NULL && !(ctx->chunk_ctx->chunk->chunk->start_time <= time_pt && ctx->chunk_ctx->chunk->chunk->end_time >= time_pt))
if (ctx->chunk_ctx != NULL && !chunk_row_timepoint_is_member(ctx->chunk_ctx->chunk->chunk, time_pt))
{
/* moving on to next chunk; */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
ctx->chunk_ctx = NULL;
}
if (ctx->part != NULL && !(ctx->part->keyspace_start <= keyspace_pt && ctx->part->keyspace_end >= keyspace_pt))
if (ctx->part != NULL && !partition_keyspace_pt_is_member(ctx->part, keyspace_pt))
{
/* moving on to next ctx->partition. */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
@ -326,26 +332,21 @@ copy_table_tuple_found(TupleInfo *ti, void *data)
{
Datum was_closed_datum;
chunk_cache_entry *chunk;
Cache *pinned = chunk_crn_set_cache_pin();
/*
* TODO: this first call should be non-locking and use a cache(for
* performance)
*/
chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, false);
chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, false);
was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id));
/* chunk may have been closed and thus changed /or/ need to get share lock */
chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, true);
chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, true);
ctx->chunk_ctx = chunk_insert_ctx_new(chunk);
/* elog(WARNING, "got new chunk %d", chunk->id); */
ctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned);
}
/*
* elog(WARNING, "time is partition_no: %d keyspace: %d time: %ld
* chunk %d", partition_no, keyspace_pt, time_pt, chunk->id);
*/
/* insert here: */
//has to be a copy(not sure why)
/* has to be a copy(not sure why) */
chunk_insert_ctx_insert_tuple(ctx->chunk_ctx,heap_copytuple(ti->tuple));
return true;
}
@ -398,7 +399,7 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
hypertable_cache_entry *hci;
epoch_and_partitions_set *pe;
CacheStorage *hypertable_cache_storage;
Cache *hypertable_cache;
ObjectAddress idx;
DropStmt *drop = makeNode(DropStmt);
@ -429,24 +430,23 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
* get the hypertable cache; use the time column name to figure out the
* column fnum for time field
*/
hypertable_cache_storage = hypertable_cache_pin_storage();
hypertable_cache = hypertable_cache_pin();
hci = hypertable_cache_get(hypertable_id);
hci = hypertable_cache_get_entry(hypertable_cache, hypertable_id);
/* TODO: hack assumes single pe. */
pe = hypertable_cache_get_partition_epoch(hci, 0, trigdata->tg_relation->rd_id);
pe = hypertable_cache_get_partition_epoch(hypertable_cache, hci, 0, trigdata->tg_relation->rd_id);
/*
* create an index that colocates row from the same chunk together and
* guarantees an order on chunk access as well
*/
idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning,
partition_epoch_get_partition_end_times(pe), pe->num_partitions);
idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, pe);
scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId);
cache_release_storage(hypertable_cache_storage);
cache_release(hypertable_cache);
drop->removeType = OBJECT_INDEX;
drop->missing_ok = FALSE;
@ -536,6 +536,21 @@ create_copy_table(int32 hypertable_id, Oid root_oid)
return copyTableRelationAddr.objectId;
}
static IndexElem *
makeIndexElem(char *name, Node *expr){
Assert((name ==NULL || expr == NULL) && (name !=NULL || expr !=NULL));
IndexElem *time_elem = makeNode(IndexElem);
time_elem->name = name;
time_elem->expr = expr;
time_elem->indexcolname = NULL;
time_elem->collation = NIL;
time_elem->opclass = NIL;
time_elem->ordering = SORTBY_DEFAULT;
time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
return time_elem;
}
/* creates index for inserting in set chunk order.
*
* The index is the following:
@ -560,7 +575,7 @@ create_copy_table(int32 hypertable_id, Oid root_oid)
*
* */
static ObjectAddress
create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions)
create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, epoch_and_partitions_set *epoch)
{
IndexStmt *index_stmt = makeNode(IndexStmt);
IndexElem *time_elem;
@ -568,31 +583,24 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par
List *indexElem = NIL;
int i;
time_elem = makeNode(IndexElem);
time_elem->name = time_field;
time_elem->expr = NULL;
time_elem->indexcolname = NULL;
time_elem->collation = NIL;
time_elem->opclass = NIL;
time_elem->ordering = SORTBY_DEFAULT;
time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
time_elem = makeIndexElem(time_field, NULL);
if (part_info != NULL)
{
IndexElem *partition_elem;
IndexElem *keyspace_elem;
List *array_pos_func_name = list_make2(makeString("_iobeamdb_catalog"), makeString("array_position_least"));
List *array_pos_func_name = list_make2(makeString(CATALOG_SCHEMA_NAME), makeString(ARRAY_POSITION_LEAST_FN_NAME));
List *array_pos_args;
List *array_list = NIL;
A_ArrayExpr *array_expr;
FuncCall *array_pos_fc;
for (i = 0; i < num_partitions; i++)
for (i = 0; i < epoch->num_partitions; i++)
{
A_Const *end_time_const = makeNode(A_Const);
TypeCast *cast = makeNode(TypeCast);
end_time_const->val = *makeInteger((int) end_time_partitions[i]);
end_time_const->val = *makeInteger((int) epoch->partitions[i].keyspace_end);
end_time_const->location = -1;
@ -610,23 +618,8 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par
array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info));
array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1);
partition_elem = makeNode(IndexElem);
partition_elem->name = NULL;
partition_elem->expr = (Node *) array_pos_fc;
partition_elem->indexcolname = NULL;
partition_elem->collation = NIL;
partition_elem->opclass = NIL;
partition_elem->ordering = SORTBY_DEFAULT;
partition_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
keyspace_elem = makeNode(IndexElem);
keyspace_elem->name = NULL;
keyspace_elem->expr = (Node *) get_keyspace_fn_call(part_info);
keyspace_elem->indexcolname = NULL;
keyspace_elem->collation = NIL;
keyspace_elem->opclass = NIL;
keyspace_elem->ordering = SORTBY_DEFAULT;
keyspace_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
partition_elem = makeIndexElem(NULL, (Node *) array_pos_fc);
keyspace_elem = makeIndexElem(NULL, (Node *) get_keyspace_fn_call(part_info));
/* partition_number, time, keyspace */
/* can probably get rid of keyspace but later */
@ -640,22 +633,7 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par
index_stmt->idxname = "copy_insert";
index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1);
index_stmt->accessMethod = "btree";
index_stmt->tableSpace = NULL;
index_stmt->indexParams = indexElem;
index_stmt->options = NULL;
index_stmt->whereClause = NULL;
index_stmt->excludeOpNames = NIL;
index_stmt->idxcomment = NULL;
index_stmt->indexOid = InvalidOid;
index_stmt->oldNode = InvalidOid;
index_stmt->unique = false;
index_stmt->primary = false;
index_stmt->isconstraint = false;
index_stmt->deferrable = false;
index_stmt->initdeferred = false;
index_stmt->transformed = false;
index_stmt->concurrent = false;
index_stmt->if_not_exists = false;
relid =
RangeVarGetRelidExtended(index_stmt->relation, ShareLock,
@ -685,7 +663,6 @@ get_keyspace_fn_call(PartitioningInfo *part_info)
A_Const *mod_const;
List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name));
List *part_func_args;
FuncCall *part_fc;
col_ref->fields = list_make1(makeString(part_info->column));
col_ref->location = -1;
@ -695,6 +672,5 @@ get_keyspace_fn_call(PartitioningInfo *part_info)
mod_const->location = -1;
part_func_args = list_make2(col_ref, mod_const);
part_fc = makeFuncCall(part_func_name, part_func_args, -1);
return (Node *) part_fc;
return (Node *) makeFuncCall(part_func_name, part_func_args, -1);
}

View File

@ -760,10 +760,6 @@ void iobeamdb_ProcessUtility(Node *parsetree,
return;
}
/*if(IsA(parsetree, IndexStmt)) {
pprint(parsetree);
}*/
prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag);
}

View File

@ -223,3 +223,8 @@ chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock)
return chunk;
}
bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt){
return row->start_time <= time_pt && row->end_time >= time_pt;
}

View File

@ -47,4 +47,6 @@ extern crn_set *fetch_crn_set(crn_set *entry, int32 chunk_id);
chunk_row *
chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock);
bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt);
#endif /* IOBEAMDB_METADATA_QUERIES_H */

View File

@ -296,7 +296,7 @@ cmp_partitions(const void *keyspace_pt_arg, const void *value)
int16 keyspace_pt = *((int16 *) keyspace_pt_arg);
const Partition *part = value;
if (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt)
if (partition_keyspace_pt_is_member(part, keyspace_pt))
{
return 0;
}
@ -319,7 +319,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt
return NULL;
}
if (keyspace_pt < 0)
if (keyspace_pt == KEYSPACE_PT_NO_PARTITIONING)
{
if (epoch->num_partitions > 1)
{
@ -340,19 +340,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt
return part;
}
int16 *
partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch)
bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt)
{
int16 *end_times_partitions = palloc(sizeof(int16) * epoch->num_partitions);
int i;
for (i = 0; i < epoch->num_partitions; i++)
{
end_times_partitions[i] = epoch->partitions[i].keyspace_end;
}
return end_times_partitions;
return keyspace_pt == KEYSPACE_PT_NO_PARTITIONING || (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt);
}

View File

@ -1,6 +1,8 @@
#ifndef IOBEAMDB_PARTITIONING_H
#define IOBEAMDB_PARTITIONING_H
#define KEYSPACE_PT_NO_PARTITIONING -1
#include <postgres.h>
#include <access/attnum.h>
#include <fmgr.h>
@ -52,5 +54,6 @@ epoch_and_partitions_set *partition_epoch_scan(int32 hypertable_id, int64 timepo
int16 partitioning_func_apply(PartitioningFunc *pf, Datum value);
Partition *partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt);
int16 * partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch);
bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt);
#endif /* IOBEAMDB_PARTITIONING_H */

View File

@ -39,7 +39,6 @@ PG_FUNCTION_INFO_V1(get_partition_for_key);
Datum
get_partition_for_key(PG_FUNCTION_ARGS)
{
/* SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; */
struct varlena *data;
int32 mod;
Datum hash_d;
@ -63,7 +62,12 @@ get_partition_for_key(PG_FUNCTION_ARGS)
/*
* array_position_least
* array_position_least returns the highest position in the array such that the element
* in the array is <= searched element. If the array is of partition-keyspace-end values
* then this gives a unique bucket for each keyspace value.
*
* Arg 0: sorted array of smallint
* Arg 1: searched_element (smallint)
*/
PG_FUNCTION_INFO_V1(array_position_least);
@ -90,18 +94,15 @@ array_position_least(PG_FUNCTION_ARGS)
elog(ERROR, "only support smallint arrays");
}
/*
* We refuse to search for elements in multi-dimensional arrays, since we
* have no good way to report the element's location in the array.
*/
/* should be a single dimensioned array */
if (ARR_NDIM(array) > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("searching for elements in multidimensional arrays is not supported")));
errmsg("can only work with single-dimension array")));
if (PG_ARGISNULL(1))
{
elog(ERROR, "does not expect null");
elog(ERROR, "does not expect null as searched-for element");
}
else
{
@ -128,16 +129,12 @@ array_position_least(PG_FUNCTION_ARGS)
my_extra->element_type = INT2OID;
}
/* Examine each array element until we find a match. */
/* Examine each array element until the element is >= searched_element. */
array_iterator = array_create_iterator(array, 0, my_extra);
while (array_iterate(array_iterator, &value, &isnull))
{
position++;
/*
* Can't look at the array element's value if it's null; but if we
* search for null, we have a hit and are done.
*/
if (isnull)
{
elog(ERROR, "No element in array should be null");

View File

@ -41,4 +41,8 @@ void hlib_murmur3(const void *data, size_t len, uint64_t *io);
/* SQL function */
Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS);
#define ARRAY_POSITION_LEAST_FN_NAME "array_position_least"
Datum array_position_least(PG_FUNCTION_ARGS);
#endif

View File

@ -20,6 +20,7 @@ typedef union ScanDesc {
*/
typedef struct InternalScannerCtx {
Relation tablerel, indexrel;
TupleInfo tinfo;
ScanDesc scan;
ScannerCtx *sctx;
} InternalScannerCtx;
@ -30,7 +31,7 @@ typedef struct InternalScannerCtx {
typedef struct Scanner {
Relation (*open)(InternalScannerCtx *ctx);
ScanDesc (*beginscan)(InternalScannerCtx *ctx);
bool (*getnext)(InternalScannerCtx *ctx, TupleInfo *ti);
bool (*getnext)(InternalScannerCtx *ctx);
void (*endscan)(InternalScannerCtx *ctx);
void (*close)(InternalScannerCtx *ctx);
} Scanner;
@ -50,10 +51,10 @@ static ScanDesc heap_scanner_beginscan(InternalScannerCtx *ctx)
return ctx->scan;
}
static bool heap_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti)
static bool heap_scanner_getnext(InternalScannerCtx *ctx)
{
ti->tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection);
return HeapTupleIsValid(ti->tuple);
ctx->tinfo.tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection);
return HeapTupleIsValid(ctx->tinfo.tuple);
}
static void heap_scanner_endscan(InternalScannerCtx *ctx)
@ -86,12 +87,12 @@ static ScanDesc index_scanner_beginscan(InternalScannerCtx *ctx)
return ctx->scan;
}
static bool index_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti)
static bool index_scanner_getnext(InternalScannerCtx *ctx)
{
ti->tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection);
ti->ituple = ctx->scan.index_scan->xs_itup;
ti->ituple_desc = ctx->scan.index_scan->xs_itupdesc;
return HeapTupleIsValid(ti->tuple);
ctx->tinfo.tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection);
ctx->tinfo.ituple = ctx->scan.index_scan->xs_itup;
ctx->tinfo.ituple_desc = ctx->scan.index_scan->xs_itupdesc;
return HeapTupleIsValid(ctx->tinfo.tuple);
}
static void index_scanner_endscan(InternalScannerCtx *ctx)
@ -135,7 +136,6 @@ static Scanner scanners[] = {
int scanner_scan(ScannerCtx *ctx)
{
TupleDesc tuple_desc;
TupleInfo ti;
bool is_valid;
int num_tuples = 0;
Scanner *scanner = &scanners[ctx->scantype];
@ -148,19 +148,19 @@ int scanner_scan(ScannerCtx *ctx)
tuple_desc = RelationGetDescr(ictx.tablerel);
ti.scanrel = ictx.tablerel;
ti.desc = tuple_desc;
ictx.tinfo.scanrel = ictx.tablerel;
ictx.tinfo.desc = tuple_desc;
/* Call pre-scan handler, if any. */
if (ctx->prescan != NULL)
ctx->prescan(ctx->data);
is_valid = scanner->getnext(&ictx, &ti);
is_valid = scanner->getnext(&ictx);
while (is_valid)
{
if (ctx->filter == NULL || ctx->filter(&ti, ctx->data))
if (ctx->filter == NULL || ctx->filter(&ictx.tinfo, ctx->data))
{
num_tuples++;
@ -169,7 +169,7 @@ int scanner_scan(ScannerCtx *ctx)
Buffer buffer;
HeapUpdateFailureData hufd;
ti.lockresult = heap_lock_tuple(ictx.tablerel, ti.tuple,
ictx.tinfo.lockresult = heap_lock_tuple(ictx.tablerel, ictx.tinfo.tuple,
GetCurrentCommandId(false),
ctx->tuplock.lockmode,
ctx->tuplock.waitpolicy,
@ -180,11 +180,11 @@ int scanner_scan(ScannerCtx *ctx)
}
/* Abort the scan if the handler wants us to */
if (!ctx->tuple_found(&ti, ctx->data))
if (!ctx->tuple_found(&ictx.tinfo, ctx->data))
break;
}
is_valid = scanner->getnext(&ictx,&ti);
is_valid = scanner->getnext(&ictx);
}
/* Call post-scan handler, if any. */