diff --git a/src/cache.c b/src/cache.c index 2e77efaf1..57079b5ff 100644 --- a/src/cache.c +++ b/src/cache.c @@ -1,97 +1,78 @@ #include "cache.h" + void cache_init(Cache *cache) { - MemoryContext ctx, old; - if (cache->store != NULL) + if (cache->htab != NULL) { elog(ERROR, "Cache %s is already initialized", cache->name); return; } - ctx = AllocSetContextCreate(CacheMemoryContext, - cache->name, - ALLOCSET_DEFAULT_SIZES); - old = MemoryContextSwitchTo(ctx); - - cache->store = palloc(sizeof(CacheStorage)); - - Assert(cache->hctl.hcxt == NULL); - cache->hctl.hcxt = ctx; - cache->store->htab = hash_create(cache->name, cache->numelements, + cache->htab = hash_create(cache->name, cache->numelements, &cache->hctl, cache->flags); - cache->hctl.hcxt = NULL; - - cache->store->mcxt = ctx; - cache->store->valid = true; - cache->store->refcount = 0; - cache->store->destroy_storage_hook = cache->destroy_storage_hook; - - MemoryContextSwitchTo(old); + cache->refcount = 1; } static void -storage_destroy(CacheStorage *store) { - store->valid = false; - if (store->refcount > 0) { - //will be destroyed later +cache_destroy(Cache *cache) { + if (cache->refcount > 0) { + /* will be destroyed later */ return; } - if (store->destroy_storage_hook != NULL) - store->destroy_storage_hook(store); + if (cache->pre_destroy_hook != NULL) + cache->pre_destroy_hook(cache); - hash_destroy(store->htab); - MemoryContextDelete(store->mcxt); + hash_destroy(cache->htab); + cache->htab = NULL; + MemoryContextDelete(cache->hctl.hcxt); + cache->hctl.hcxt = NULL; } void cache_invalidate(Cache *cache) { - if (cache->store == NULL) + if (cache == NULL) return; - - storage_destroy(cache->store); - cache->store = NULL; - - cache_init(cache); //start new store + cache->refcount--; + cache_destroy(cache); } /* - * Pinning storage is needed if any items returned by the cache + * Pinning is needed if any items returned by the cache * may need to survive invalidation events (i.e. AcceptInvalidationMessages() may be called). * - * Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open. + * Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open). * - * Each call to cache_pin_storage MUST BE paired with a call to cache_release_storage. + * Each call to cache_pin MUST BE paired with a call to cache_release. * */ -extern CacheStorage *cache_pin_storage(Cache *cache) +extern Cache *cache_pin(Cache *cache) { - cache->store->refcount++; - return cache->store; + cache->refcount++; + return cache; } -extern void cache_release_storage(CacheStorage *store) + +extern void cache_release(Cache *cache) { - Assert(store->refcount > 0); - store->refcount--; - if (!store->valid) { - storage_destroy(store); - } + Assert(cache->refcount > 0); + cache->refcount--; + cache_destroy(cache); } MemoryContext cache_memory_ctx(Cache *cache) { - return cache->store->mcxt; + return cache->hctl.hcxt; } MemoryContext cache_switch_to_memory_context(Cache *cache) { - return MemoryContextSwitchTo(cache->store->mcxt); + return MemoryContextSwitchTo(cache->hctl.hcxt); } void * @@ -99,12 +80,12 @@ cache_fetch(Cache *cache, CacheQueryCtx *ctx) { bool found; - if (cache->store->htab == NULL) + if (cache->htab == NULL) { elog(ERROR, "Hash %s not initialized", cache->name); } - ctx->entry = hash_search(cache->store->htab, cache->get_key(ctx), HASH_ENTER, &found); + ctx->entry = hash_search(cache->htab, cache->get_key(ctx), HASH_ENTER, &found); if (!found && cache->create_entry != NULL) { diff --git a/src/cache.h b/src/cache.h index ed2292da9..703ef9492 100644 --- a/src/cache.h +++ b/src/cache.h @@ -11,25 +11,18 @@ typedef struct CacheQueryCtx void *private[0]; } CacheQueryCtx; -typedef struct CacheStorage { - HTAB *htab; - MemoryContext mcxt; - int refcount; - bool valid; - void (*destroy_storage_hook) (struct CacheStorage *); -} CacheStorage; - typedef struct Cache { HASHCTL hctl; - CacheStorage *store; - const char *name; + HTAB *htab; + int refcount; + const char *name; long numelements; int flags; void *(*get_key) (struct CacheQueryCtx *); void *(*create_entry) (struct Cache *, CacheQueryCtx *); void *(*update_entry) (struct Cache *, CacheQueryCtx *); - void (*destroy_storage_hook) (struct CacheStorage *); + void (*pre_destroy_hook) (struct Cache *); } Cache; extern void cache_init(Cache *cache); @@ -39,7 +32,7 @@ extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx); extern MemoryContext cache_memory_ctx(Cache *cache); extern MemoryContext cache_switch_to_memory_context(Cache *cache); -extern CacheStorage *cache_pin_storage(Cache *cache); -extern void cache_release_storage(CacheStorage *store); +extern Cache *cache_pin(Cache *cache); +extern void cache_release(Cache *cache); #endif /* _IOBEAMDB_CACHE_H_ */ diff --git a/src/cache_invalidate.c b/src/cache_invalidate.c index 2b212e67c..69e23e8b1 100644 --- a/src/cache_invalidate.c +++ b/src/cache_invalidate.c @@ -57,10 +57,10 @@ inval_cache_callback(Datum arg, Oid relid) return; if (relid == InvalidOid || relid == hypertable_cache_inval_proxy_oid) - invalidate_hypertable_cache_callback(); + hypertable_cache_invalidate_callback(); if (relid == InvalidOid || relid == chunk_cache_inval_proxy_oid) - invalidate_chunk_cache_callback(); + chunk_crn_set_cache_invalidate_callback(); } /* diff --git a/src/chunk_cache.c b/src/chunk_cache.c index 902d98993..223f3ce95 100644 --- a/src/chunk_cache.c +++ b/src/chunk_cache.c @@ -40,9 +40,6 @@ typedef struct chunk_crn_set_htable_entry typedef struct ChunkCacheQueryCtx { CacheQueryCtx cctx; - hypertable_cache_entry *hci; - epoch_and_partitions_set *pe_entry; - Partition *part; int32 chunk_id; int64 chunk_start_time; int64 chunk_end_time; @@ -57,31 +54,40 @@ chunk_crn_set_cache_get_key(CacheQueryCtx *ctx) static void *chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); static void *chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); -static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx); +static Cache *chunk_crn_set_cache_create() { + MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext, + CHUNK_CACHE_INVAL_PROXY_TABLE, + ALLOCSET_DEFAULT_SIZES); -static Cache chunk_crn_set_cache = { - .hctl = { - .keysize = sizeof(int32), - .entrysize = sizeof(chunk_crn_set_htable_entry), - .hcxt = NULL, - }, - .name = CHUNK_CACHE_INVAL_PROXY_TABLE, - .numelements = 16, - .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, - .get_key = chunk_crn_set_cache_get_key, - .create_entry = chunk_crn_set_cache_create_entry, - .update_entry = chunk_crn_set_cache_update_entry, -}; + Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache)); + *cache = (Cache) { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(chunk_crn_set_htable_entry), + .hcxt = ctx, + }, + .name = CHUNK_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = chunk_crn_set_cache_get_key, + .create_entry = chunk_crn_set_cache_create_entry, + .update_entry = chunk_crn_set_cache_update_entry, + }; + + cache_init(cache); + + return cache; +} + +static Cache *chunk_crn_set_cache_current = NULL; static void * chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_crn_set_htable_entry *pe = ctx->entry; - char *insert_sql; MemoryContext old; - insert_sql = get_copy_table_insert_sql(cctx); pe->chunk_id = cctx->chunk_id; pe->start_time = cctx->chunk_start_time; pe->end_time = cctx->chunk_end_time; @@ -99,15 +105,12 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_crn_set_htable_entry *pe = ctx->entry; - char *insert_sql; MemoryContext old; if (pe->start_time == cctx->chunk_start_time && pe->end_time == cctx->chunk_end_time) return pe; - insert_sql = get_copy_table_insert_sql(cctx); - old = cache_switch_to_memory_context(cache); pe->crns = fetch_crn_set(NULL, pe->chunk_id); MemoryContextSwitchTo(old); @@ -116,29 +119,36 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) } void -invalidate_chunk_cache_callback(void) +chunk_crn_set_cache_invalidate_callback(void) { - CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache"); - cache_invalidate(&chunk_crn_set_cache); + CACHE1_elog(WARNING, "DESTROY chunk_crn_set_cache"); + cache_invalidate(chunk_crn_set_cache_current); + chunk_crn_set_cache_current = chunk_crn_set_cache_create(); } static chunk_crn_set_htable_entry * -get_chunk_crn_set_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int32 chunk_id, int64 chunk_start_time, - int64 chunk_end_time) +chunk_crn_set_cache_get_entry(Cache *cache, int32 chunk_id, int64 chunk_start_time, int64 chunk_end_time) { + if (cache == NULL) + { + cache = chunk_crn_set_cache_current; + } ChunkCacheQueryCtx ctx = { - .hci = hci, - .pe_entry = pe_entry, - .part = part, .chunk_id = chunk_id, .chunk_start_time = chunk_start_time, .chunk_end_time = chunk_end_time, }; - return cache_fetch(&chunk_crn_set_cache, &ctx.cctx); + return cache_fetch(cache, &ctx.cctx); } +extern Cache * +chunk_crn_set_cache_pin() +{ + return cache_pin(chunk_crn_set_cache_current); +} + + static chunk_row * chunk_row_create(int32 id, int32 partition_id, int64 starttime, int64 endtime) { @@ -242,9 +252,12 @@ chunk_scan(int32 partition_id, int64 timepoint, bool tuplock) return cctx.chunk; } +/* + * Get chunk cache entry. + * The cache parameter is a chunk_crn_set_cache (can be null to use current cache). + */ chunk_cache_entry * -get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int64 timepoint, bool lock) +get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock) { chunk_crn_set_htable_entry *chunk_crn_cache; chunk_cache_entry *entry; @@ -260,87 +273,21 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_ entry = palloc(sizeof(chunk_cache_entry)); entry->chunk = chunk; entry->id = chunk->id; - chunk_crn_cache = get_chunk_crn_set_cache_entry(hci, pe_entry, part, chunk->id, - chunk->start_time, chunk->end_time); + chunk_crn_cache = chunk_crn_set_cache_get_entry(cache, chunk->id, + chunk->start_time, chunk->end_time); entry->crns = chunk_crn_cache->crns; return entry; } - -static char * -get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx) -{ - StringInfo where_clause = makeStringInfo(); - StringInfo insert_clauses = makeStringInfo(); - StringInfo sql_insert = makeStringInfo(); - ListCell *cell; - int i; - crn_set *crn = fetch_crn_set(NULL, ctx->chunk_id); - - appendStringInfo(where_clause, "WHERE TRUE"); - - if (ctx->pe_entry->num_partitions > 1) - { - appendStringInfo(where_clause, " AND (%s.%s(%s::TEXT, %d) BETWEEN %d AND %d)", - quote_identifier(ctx->pe_entry->partitioning->partfunc.schema), - quote_identifier(ctx->pe_entry->partitioning->partfunc.name), - quote_identifier(ctx->pe_entry->partitioning->column), - ctx->pe_entry->partitioning->partfunc.modulos, - ctx->part->keyspace_start, - ctx->part->keyspace_end); - } - - - if (ctx->chunk_start_time != OPEN_START_TIME) - { - appendStringInfo(where_clause, " AND (%1$s >= %2$s) ", - quote_identifier(ctx->hci->time_column_name), - internal_time_to_column_literal_sql(ctx->chunk_start_time, - ctx->hci->time_column_type)); - } - - if (ctx->chunk_end_time != OPEN_END_TIME) - { - appendStringInfo(where_clause, " AND (%1$s <= %2$s) ", - quote_identifier(ctx->hci->time_column_name), - internal_time_to_column_literal_sql(ctx->chunk_end_time, - ctx->hci->time_column_type)); - } - - i = 0; - foreach(cell, crn->tables) - { - crn_row *tab = lfirst(cell); - - i = i + 1; - appendStringInfo(insert_clauses, "i_%d AS (INSERT INTO %s.%s SELECT * FROM selected)", - i, - quote_identifier(tab->schema_name.data), - quote_identifier(tab->table_name.data) - ); - } - pfree(crn); - crn = NULL; - - appendStringInfo(sql_insert, "\ - WITH selected AS ( DELETE FROM ONLY %1$s %2$s RETURNING * ), \ - %3$s \ - SELECT 1", copy_table_name(ctx->hci->id), - where_clause->data, - insert_clauses->data); - - return sql_insert->data; -} - void _chunk_cache_init(void) { CreateCacheMemoryContext(); - cache_init(&chunk_crn_set_cache); + chunk_crn_set_cache_current = chunk_crn_set_cache_create(); } void _chunk_cache_fini(void) { - cache_invalidate(&chunk_crn_set_cache); + cache_invalidate(chunk_crn_set_cache_current); } diff --git a/src/chunk_cache.h b/src/chunk_cache.h index b6a1bf673..c8764c266 100644 --- a/src/chunk_cache.h +++ b/src/chunk_cache.h @@ -5,6 +5,7 @@ #include #include "metadata_queries.h" +#include "cache.h" #define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk" #define CHUNK_CACHE_INVAL_PROXY_OID \ @@ -20,14 +21,14 @@ typedef struct chunk_cache_entry int32 id; chunk_row *chunk; crn_set *crns; - SPIPlanPtr move_from_copyt_plan; } chunk_cache_entry; + + +extern chunk_cache_entry *get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock); -extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int64 time_pt, bool lock); - -extern void invalidate_chunk_cache_callback(void); +extern void chunk_crn_set_cache_invalidate_callback(void); +extern Cache *chunk_crn_set_cache_pin(void); extern void _chunk_cache_init(void); extern void _chunk_cache_fini(void); diff --git a/src/hypertable_cache.c b/src/hypertable_cache.c index a847dcc8d..4f6fe98e2 100644 --- a/src/hypertable_cache.c +++ b/src/hypertable_cache.c @@ -28,19 +28,33 @@ hypertable_cache_get_key(CacheQueryCtx *ctx) return &((HypertableCacheQueryCtx *) ctx)->hypertable_id; } -static Cache hypertable_cache = { - .hctl = { - .keysize = sizeof(int32), - .entrysize = sizeof(hypertable_cache_entry), - .hcxt = NULL, - }, - .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, - .numelements = 16, - .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, - .get_key = hypertable_cache_get_key, - .create_entry = hypertable_cache_create_entry, -}; +static Cache *hypertable_cache_create() { + MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext, + HYPERTABLE_CACHE_INVAL_PROXY_TABLE, + ALLOCSET_DEFAULT_SIZES); + + Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache)); + *cache = (Cache) { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(hypertable_cache_entry), + .hcxt = ctx, + }, + .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = hypertable_cache_get_key, + .create_entry = hypertable_cache_create_entry, + }; + + cache_init(cache); + + return cache; +} + + +static Cache *hypertable_cache_current = NULL; /* Column numbers for 'hypertable' table in sql/common/tables.sql */ #define HT_TBL_COL_ID 1 #define HT_TBL_COL_TIME_COL_NAME 10 @@ -101,22 +115,23 @@ hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) } void -invalidate_hypertable_cache_callback(void) +hypertable_cache_invalidate_callback(void) { CACHE1_elog(WARNING, "DESTROY hypertable_cache"); - cache_invalidate(&hypertable_cache); + cache_invalidate(hypertable_cache_current); + hypertable_cache_current = hypertable_cache_create(); } /* Get hypertable cache entry. If the entry is not in the cache, add it. */ hypertable_cache_entry * -hypertable_cache_get(int32 hypertable_id) +hypertable_cache_get_entry(Cache *cache, int32 hypertable_id) { HypertableCacheQueryCtx ctx = { .hypertable_id = hypertable_id, }; - return cache_fetch(&hypertable_cache, &ctx.cctx); + return cache_fetch(cache, &ctx.cctx); } /* function to compare epochs */ @@ -140,7 +155,7 @@ cmp_epochs(const void *time_pt_pointer, const void *test) } epoch_and_partitions_set * -hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid) +hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid) { MemoryContext old; epoch_and_partitions_set *epoch, @@ -166,7 +181,7 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, return (*cache_entry); } - old = cache_switch_to_memory_context(&hypertable_cache); + old = cache_switch_to_memory_context(cache); epoch = partition_epoch_scan(hce->id, time_pt, relid); /* check if full */ @@ -191,21 +206,21 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, return epoch; } -extern CacheStorage * -hypertable_cache_pin_storage() +extern Cache * +hypertable_cache_pin() { - return cache_pin_storage(&hypertable_cache); + return cache_pin(hypertable_cache_current); } void _hypertable_cache_init(void) { CreateCacheMemoryContext(); - cache_init(&hypertable_cache); + hypertable_cache_current = hypertable_cache_create(); } void _hypertable_cache_fini(void) { - cache_invalidate(&hypertable_cache); + cache_invalidate(hypertable_cache_current); } diff --git a/src/hypertable_cache.h b/src/hypertable_cache.h index 866cf808c..88acd7abf 100644 --- a/src/hypertable_cache.h +++ b/src/hypertable_cache.h @@ -24,14 +24,14 @@ typedef struct hypertable_cache_entry epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY]; } hypertable_cache_entry; -hypertable_cache_entry *hypertable_cache_get(int32 hypertable_id); +hypertable_cache_entry *hypertable_cache_get_entry(Cache * cache, int32 hypertable_id); epoch_and_partitions_set * -hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid); +hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid); -void invalidate_hypertable_cache_callback(void); +void hypertable_cache_invalidate_callback(void); -extern CacheStorage *hypertable_cache_pin_storage(void); +extern Cache *hypertable_cache_pin(void); void _hypertable_cache_init(void); void _hypertable_cache_fini(void); diff --git a/src/insert.c b/src/insert.c index 3d471e833..a50eb482c 100644 --- a/src/insert.c +++ b/src/insert.c @@ -46,6 +46,8 @@ #include "metadata_queries.h" #include "partitioning.h" #include "scanner.h" +#include "catalog.h" +#include "pgmurmur3.h" #include #include @@ -56,7 +58,8 @@ #define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger" /* private funcs */ -static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions); + +static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info,epoch_and_partitions_set *epoch); static Node *get_keyspace_fn_call(PartitioningInfo *part_info); /* @@ -133,7 +136,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx) static void -chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) +chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) { int hi_options = 0; /* no optimization */ CommandId mycid = GetCurrentCommandId(true); @@ -161,16 +164,19 @@ chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, H typedef struct ChunkInsertCtx { chunk_cache_entry *chunk; + Cache *pinned; List *ctxs; } ChunkInsertCtx; static ChunkInsertCtx * -chunk_insert_ctx_new(chunk_cache_entry *chunk) +chunk_insert_ctx_new(chunk_cache_entry *chunk, Cache *pinned) { - /* int num_tables = list_length(chunk->crns->tables); */ ListCell *lc; List *rel_ctx_list = NIL; ChunkInsertCtx *ctx; + + ctx = palloc(sizeof(ChunkInsertCtx)); + ctx->pinned = pinned; foreach(lc, chunk->crns->tables) { @@ -234,7 +240,6 @@ chunk_insert_ctx_new(chunk_cache_entry *chunk) rel_ctx_list = lappend(rel_ctx_list, rel_ctx); } - ctx = palloc(sizeof(ChunkInsertCtx)); ctx->ctxs = rel_ctx_list; ctx->chunk = chunk; return ctx; @@ -250,6 +255,8 @@ chunk_insert_ctx_destroy(ChunkInsertCtx *ctx) return; } + cache_release(ctx->pinned); + foreach(lc, ctx->ctxs) { ChunkInsertCtxRel *rel_ctx = lfirst(lc); @@ -265,7 +272,7 @@ chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup) foreach(lc, ctx->ctxs) { ChunkInsertCtxRel *rel_ctx = lfirst(lc); - chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(rel_ctx, tup); + chunk_insert_ctx_rel_insert_tuple(rel_ctx, tup); } } @@ -286,11 +293,10 @@ copy_table_tuple_found(TupleInfo *ti, void *data) if (ctx->pe->num_partitions > 1) { - //Datum partition_datum = index_getattr(scan->xs_itup, 1, scan->xs_itupdesc, &is_null); + /* first element is partition index (used for sorting but not necessary here) */ Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null); Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null); - //partition_no = DatumGetInt16(partition_datum); time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); keyspace_pt = DatumGetInt16(keyspace_datum); } @@ -298,18 +304,18 @@ copy_table_tuple_found(TupleInfo *ti, void *data) { Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null); time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); - keyspace_pt = -1; + keyspace_pt = KEYSPACE_PT_NO_PARTITIONING; } - if (ctx->chunk_ctx != NULL && !(ctx->chunk_ctx->chunk->chunk->start_time <= time_pt && ctx->chunk_ctx->chunk->chunk->end_time >= time_pt)) + if (ctx->chunk_ctx != NULL && !chunk_row_timepoint_is_member(ctx->chunk_ctx->chunk->chunk, time_pt)) { /* moving on to next chunk; */ chunk_insert_ctx_destroy(ctx->chunk_ctx); ctx->chunk_ctx = NULL; } - if (ctx->part != NULL && !(ctx->part->keyspace_start <= keyspace_pt && ctx->part->keyspace_end >= keyspace_pt)) + if (ctx->part != NULL && !partition_keyspace_pt_is_member(ctx->part, keyspace_pt)) { /* moving on to next ctx->partition. */ chunk_insert_ctx_destroy(ctx->chunk_ctx); @@ -326,26 +332,21 @@ copy_table_tuple_found(TupleInfo *ti, void *data) { Datum was_closed_datum; chunk_cache_entry *chunk; + Cache *pinned = chunk_crn_set_cache_pin(); /* * TODO: this first call should be non-locking and use a cache(for * performance) */ - chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, false); + chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, false); was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id)); /* chunk may have been closed and thus changed /or/ need to get share lock */ - chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, true); + chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, true); - ctx->chunk_ctx = chunk_insert_ctx_new(chunk); - /* elog(WARNING, "got new chunk %d", chunk->id); */ + ctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned); } - /* - * elog(WARNING, "time is partition_no: %d keyspace: %d time: %ld - * chunk %d", partition_no, keyspace_pt, time_pt, chunk->id); - */ - /* insert here: */ - //has to be a copy(not sure why) + /* has to be a copy(not sure why) */ chunk_insert_ctx_insert_tuple(ctx->chunk_ctx,heap_copytuple(ti->tuple)); return true; } @@ -398,7 +399,7 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) hypertable_cache_entry *hci; epoch_and_partitions_set *pe; - CacheStorage *hypertable_cache_storage; + Cache *hypertable_cache; ObjectAddress idx; DropStmt *drop = makeNode(DropStmt); @@ -429,24 +430,23 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) * get the hypertable cache; use the time column name to figure out the * column fnum for time field */ - hypertable_cache_storage = hypertable_cache_pin_storage(); + hypertable_cache = hypertable_cache_pin(); - hci = hypertable_cache_get(hypertable_id); + hci = hypertable_cache_get_entry(hypertable_cache, hypertable_id); /* TODO: hack assumes single pe. */ - pe = hypertable_cache_get_partition_epoch(hci, 0, trigdata->tg_relation->rd_id); + pe = hypertable_cache_get_partition_epoch(hypertable_cache, hci, 0, trigdata->tg_relation->rd_id); /* * create an index that colocates row from the same chunk together and * guarantees an order on chunk access as well */ - idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, - partition_epoch_get_partition_end_times(pe), pe->num_partitions); + idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, pe); scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId); - cache_release_storage(hypertable_cache_storage); + cache_release(hypertable_cache); drop->removeType = OBJECT_INDEX; drop->missing_ok = FALSE; @@ -536,6 +536,21 @@ create_copy_table(int32 hypertable_id, Oid root_oid) return copyTableRelationAddr.objectId; } +static IndexElem * +makeIndexElem(char *name, Node *expr){ + Assert((name ==NULL || expr == NULL) && (name !=NULL || expr !=NULL)); + + IndexElem *time_elem = makeNode(IndexElem); + time_elem->name = name; + time_elem->expr = expr; + time_elem->indexcolname = NULL; + time_elem->collation = NIL; + time_elem->opclass = NIL; + time_elem->ordering = SORTBY_DEFAULT; + time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + return time_elem; +} + /* creates index for inserting in set chunk order. * * The index is the following: @@ -560,7 +575,7 @@ create_copy_table(int32 hypertable_id, Oid root_oid) * * */ static ObjectAddress -create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions) +create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, epoch_and_partitions_set *epoch) { IndexStmt *index_stmt = makeNode(IndexStmt); IndexElem *time_elem; @@ -568,31 +583,24 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par List *indexElem = NIL; int i; - time_elem = makeNode(IndexElem); - time_elem->name = time_field; - time_elem->expr = NULL; - time_elem->indexcolname = NULL; - time_elem->collation = NIL; - time_elem->opclass = NIL; - time_elem->ordering = SORTBY_DEFAULT; - time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + time_elem = makeIndexElem(time_field, NULL); if (part_info != NULL) { IndexElem *partition_elem; IndexElem *keyspace_elem; - List *array_pos_func_name = list_make2(makeString("_iobeamdb_catalog"), makeString("array_position_least")); + List *array_pos_func_name = list_make2(makeString(CATALOG_SCHEMA_NAME), makeString(ARRAY_POSITION_LEAST_FN_NAME)); List *array_pos_args; List *array_list = NIL; A_ArrayExpr *array_expr; FuncCall *array_pos_fc; - for (i = 0; i < num_partitions; i++) + for (i = 0; i < epoch->num_partitions; i++) { A_Const *end_time_const = makeNode(A_Const); TypeCast *cast = makeNode(TypeCast); - end_time_const->val = *makeInteger((int) end_time_partitions[i]); + end_time_const->val = *makeInteger((int) epoch->partitions[i].keyspace_end); end_time_const->location = -1; @@ -610,23 +618,8 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info)); array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1); - partition_elem = makeNode(IndexElem); - partition_elem->name = NULL; - partition_elem->expr = (Node *) array_pos_fc; - partition_elem->indexcolname = NULL; - partition_elem->collation = NIL; - partition_elem->opclass = NIL; - partition_elem->ordering = SORTBY_DEFAULT; - partition_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; - - keyspace_elem = makeNode(IndexElem); - keyspace_elem->name = NULL; - keyspace_elem->expr = (Node *) get_keyspace_fn_call(part_info); - keyspace_elem->indexcolname = NULL; - keyspace_elem->collation = NIL; - keyspace_elem->opclass = NIL; - keyspace_elem->ordering = SORTBY_DEFAULT; - keyspace_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + partition_elem = makeIndexElem(NULL, (Node *) array_pos_fc); + keyspace_elem = makeIndexElem(NULL, (Node *) get_keyspace_fn_call(part_info)); /* partition_number, time, keyspace */ /* can probably get rid of keyspace but later */ @@ -640,22 +633,7 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par index_stmt->idxname = "copy_insert"; index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1); index_stmt->accessMethod = "btree"; - index_stmt->tableSpace = NULL; index_stmt->indexParams = indexElem; - index_stmt->options = NULL; - index_stmt->whereClause = NULL; - index_stmt->excludeOpNames = NIL; - index_stmt->idxcomment = NULL; - index_stmt->indexOid = InvalidOid; - index_stmt->oldNode = InvalidOid; - index_stmt->unique = false; - index_stmt->primary = false; - index_stmt->isconstraint = false; - index_stmt->deferrable = false; - index_stmt->initdeferred = false; - index_stmt->transformed = false; - index_stmt->concurrent = false; - index_stmt->if_not_exists = false; relid = RangeVarGetRelidExtended(index_stmt->relation, ShareLock, @@ -685,7 +663,6 @@ get_keyspace_fn_call(PartitioningInfo *part_info) A_Const *mod_const; List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name)); List *part_func_args; - FuncCall *part_fc; col_ref->fields = list_make1(makeString(part_info->column)); col_ref->location = -1; @@ -695,6 +672,5 @@ get_keyspace_fn_call(PartitioningInfo *part_info) mod_const->location = -1; part_func_args = list_make2(col_ref, mod_const); - part_fc = makeFuncCall(part_func_name, part_func_args, -1); - return (Node *) part_fc; + return (Node *) makeFuncCall(part_func_name, part_func_args, -1); } diff --git a/src/iobeamdb.c b/src/iobeamdb.c index 603d3c83f..68118090c 100644 --- a/src/iobeamdb.c +++ b/src/iobeamdb.c @@ -760,10 +760,6 @@ void iobeamdb_ProcessUtility(Node *parsetree, return; } - /*if(IsA(parsetree, IndexStmt)) { - pprint(parsetree); - }*/ - prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); } diff --git a/src/metadata_queries.c b/src/metadata_queries.c index 762b02c62..92df3b211 100644 --- a/src/metadata_queries.c +++ b/src/metadata_queries.c @@ -223,3 +223,8 @@ chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock) return chunk; } + + +bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt){ + return row->start_time <= time_pt && row->end_time >= time_pt; +} diff --git a/src/metadata_queries.h b/src/metadata_queries.h index ff4d9ed55..e4c3a91c7 100644 --- a/src/metadata_queries.h +++ b/src/metadata_queries.h @@ -47,4 +47,6 @@ extern crn_set *fetch_crn_set(crn_set *entry, int32 chunk_id); chunk_row * chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock); +bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt); + #endif /* IOBEAMDB_METADATA_QUERIES_H */ diff --git a/src/partitioning.c b/src/partitioning.c index 2547e2aa5..a235e0b27 100644 --- a/src/partitioning.c +++ b/src/partitioning.c @@ -296,7 +296,7 @@ cmp_partitions(const void *keyspace_pt_arg, const void *value) int16 keyspace_pt = *((int16 *) keyspace_pt_arg); const Partition *part = value; - if (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt) + if (partition_keyspace_pt_is_member(part, keyspace_pt)) { return 0; } @@ -319,7 +319,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt return NULL; } - if (keyspace_pt < 0) + if (keyspace_pt == KEYSPACE_PT_NO_PARTITIONING) { if (epoch->num_partitions > 1) { @@ -340,19 +340,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt return part; } - -int16 * -partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch) +bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt) { - - int16 *end_times_partitions = palloc(sizeof(int16) * epoch->num_partitions); - int i; - - for (i = 0; i < epoch->num_partitions; i++) - { - end_times_partitions[i] = epoch->partitions[i].keyspace_end; - } - - return end_times_partitions; + return keyspace_pt == KEYSPACE_PT_NO_PARTITIONING || (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt); } - diff --git a/src/partitioning.h b/src/partitioning.h index 4eefb88fc..793232304 100644 --- a/src/partitioning.h +++ b/src/partitioning.h @@ -1,6 +1,8 @@ #ifndef IOBEAMDB_PARTITIONING_H #define IOBEAMDB_PARTITIONING_H +#define KEYSPACE_PT_NO_PARTITIONING -1 + #include #include #include @@ -52,5 +54,6 @@ epoch_and_partitions_set *partition_epoch_scan(int32 hypertable_id, int64 timepo int16 partitioning_func_apply(PartitioningFunc *pf, Datum value); Partition *partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt); -int16 * partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch); + +bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt); #endif /* IOBEAMDB_PARTITIONING_H */ diff --git a/src/pgmurmur3.c b/src/pgmurmur3.c index 6daa4502f..207b3fe1e 100644 --- a/src/pgmurmur3.c +++ b/src/pgmurmur3.c @@ -39,7 +39,6 @@ PG_FUNCTION_INFO_V1(get_partition_for_key); Datum get_partition_for_key(PG_FUNCTION_ARGS) { -/* SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; */ struct varlena *data; int32 mod; Datum hash_d; @@ -63,7 +62,12 @@ get_partition_for_key(PG_FUNCTION_ARGS) /* - * array_position_least + * array_position_least returns the highest position in the array such that the element + * in the array is <= searched element. If the array is of partition-keyspace-end values + * then this gives a unique bucket for each keyspace value. + * + * Arg 0: sorted array of smallint + * Arg 1: searched_element (smallint) */ PG_FUNCTION_INFO_V1(array_position_least); @@ -90,18 +94,15 @@ array_position_least(PG_FUNCTION_ARGS) elog(ERROR, "only support smallint arrays"); } - /* - * We refuse to search for elements in multi-dimensional arrays, since we - * have no good way to report the element's location in the array. - */ + /* should be a single dimensioned array */ if (ARR_NDIM(array) > 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("searching for elements in multidimensional arrays is not supported"))); + errmsg("can only work with single-dimension array"))); if (PG_ARGISNULL(1)) { - elog(ERROR, "does not expect null"); + elog(ERROR, "does not expect null as searched-for element"); } else { @@ -128,16 +129,12 @@ array_position_least(PG_FUNCTION_ARGS) my_extra->element_type = INT2OID; } - /* Examine each array element until we find a match. */ + /* Examine each array element until the element is >= searched_element. */ array_iterator = array_create_iterator(array, 0, my_extra); while (array_iterate(array_iterator, &value, &isnull)) { position++; - /* - * Can't look at the array element's value if it's null; but if we - * search for null, we have a hit and are done. - */ if (isnull) { elog(ERROR, "No element in array should be null"); diff --git a/src/pgmurmur3.h b/src/pgmurmur3.h index 573e42bc3..7667e69cd 100644 --- a/src/pgmurmur3.h +++ b/src/pgmurmur3.h @@ -41,4 +41,8 @@ void hlib_murmur3(const void *data, size_t len, uint64_t *io); /* SQL function */ Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS); + +#define ARRAY_POSITION_LEAST_FN_NAME "array_position_least" +Datum array_position_least(PG_FUNCTION_ARGS); + #endif diff --git a/src/scanner.c b/src/scanner.c index 4f9986aaa..bf1e0f189 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -20,6 +20,7 @@ typedef union ScanDesc { */ typedef struct InternalScannerCtx { Relation tablerel, indexrel; + TupleInfo tinfo; ScanDesc scan; ScannerCtx *sctx; } InternalScannerCtx; @@ -30,7 +31,7 @@ typedef struct InternalScannerCtx { typedef struct Scanner { Relation (*open)(InternalScannerCtx *ctx); ScanDesc (*beginscan)(InternalScannerCtx *ctx); - bool (*getnext)(InternalScannerCtx *ctx, TupleInfo *ti); + bool (*getnext)(InternalScannerCtx *ctx); void (*endscan)(InternalScannerCtx *ctx); void (*close)(InternalScannerCtx *ctx); } Scanner; @@ -50,10 +51,10 @@ static ScanDesc heap_scanner_beginscan(InternalScannerCtx *ctx) return ctx->scan; } -static bool heap_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) +static bool heap_scanner_getnext(InternalScannerCtx *ctx) { - ti->tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); - return HeapTupleIsValid(ti->tuple); + ctx->tinfo.tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); + return HeapTupleIsValid(ctx->tinfo.tuple); } static void heap_scanner_endscan(InternalScannerCtx *ctx) @@ -86,12 +87,12 @@ static ScanDesc index_scanner_beginscan(InternalScannerCtx *ctx) return ctx->scan; } -static bool index_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) +static bool index_scanner_getnext(InternalScannerCtx *ctx) { - ti->tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); - ti->ituple = ctx->scan.index_scan->xs_itup; - ti->ituple_desc = ctx->scan.index_scan->xs_itupdesc; - return HeapTupleIsValid(ti->tuple); + ctx->tinfo.tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); + ctx->tinfo.ituple = ctx->scan.index_scan->xs_itup; + ctx->tinfo.ituple_desc = ctx->scan.index_scan->xs_itupdesc; + return HeapTupleIsValid(ctx->tinfo.tuple); } static void index_scanner_endscan(InternalScannerCtx *ctx) @@ -135,7 +136,6 @@ static Scanner scanners[] = { int scanner_scan(ScannerCtx *ctx) { TupleDesc tuple_desc; - TupleInfo ti; bool is_valid; int num_tuples = 0; Scanner *scanner = &scanners[ctx->scantype]; @@ -148,19 +148,19 @@ int scanner_scan(ScannerCtx *ctx) tuple_desc = RelationGetDescr(ictx.tablerel); - ti.scanrel = ictx.tablerel; - ti.desc = tuple_desc; + ictx.tinfo.scanrel = ictx.tablerel; + ictx.tinfo.desc = tuple_desc; /* Call pre-scan handler, if any. */ if (ctx->prescan != NULL) ctx->prescan(ctx->data); - is_valid = scanner->getnext(&ictx, &ti); + is_valid = scanner->getnext(&ictx); while (is_valid) { - if (ctx->filter == NULL || ctx->filter(&ti, ctx->data)) + if (ctx->filter == NULL || ctx->filter(&ictx.tinfo, ctx->data)) { num_tuples++; @@ -169,7 +169,7 @@ int scanner_scan(ScannerCtx *ctx) Buffer buffer; HeapUpdateFailureData hufd; - ti.lockresult = heap_lock_tuple(ictx.tablerel, ti.tuple, + ictx.tinfo.lockresult = heap_lock_tuple(ictx.tablerel, ictx.tinfo.tuple, GetCurrentCommandId(false), ctx->tuplock.lockmode, ctx->tuplock.waitpolicy, @@ -180,11 +180,11 @@ int scanner_scan(ScannerCtx *ctx) } /* Abort the scan if the handler wants us to */ - if (!ctx->tuple_found(&ti, ctx->data)) + if (!ctx->tuple_found(&ictx.tinfo, ctx->data)) break; } - is_valid = scanner->getnext(&ictx,&ti); + is_valid = scanner->getnext(&ictx); } /* Call post-scan handler, if any. */