From 64e8ec1877f23de828d7c19ce4af1f4f27fe79e0 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Tue, 28 Feb 2017 22:54:19 -0500 Subject: [PATCH 1/3] Ordering inserts to avoid deadlocks --- sql/main/partitioning.sql | 5 +- src/cache.c | 88 +++-- src/cache.h | 16 +- src/cache_invalidate.c | 2 +- src/chunk_cache.c | 37 +- src/chunk_cache.h | 5 +- src/hypertable_cache.c | 10 +- src/hypertable_cache.h | 3 + src/insert.c | 600 ++++++++++++++++++++++++++------ src/iobeamdb.c | 4 + src/partitioning.c | 18 + src/partitioning.h | 2 +- src/pgmurmur3.c | 114 +++++- src/pgmurmur3.h | 7 +- src/scanner.c | 34 +- src/scanner.h | 4 + test/expected/drop_chunks.out | 12 +- test/expected/insert.out | 6 +- test/expected/insert_single.out | 2 +- test/expected/timestamp.out | 2 +- 20 files changed, 786 insertions(+), 185 deletions(-) diff --git a/sql/main/partitioning.sql b/sql/main/partitioning.sql index dae6d5645..c36c9bcf1 100644 --- a/sql/main/partitioning.sql +++ b/sql/main/partitioning.sql @@ -1,2 +1,5 @@ CREATE OR REPLACE FUNCTION _iobeamdb_catalog.get_partition_for_key(text, int) RETURNS smallint - AS '$libdir/iobeamdb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT; + AS '$libdir/iobeamdb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT; + +CREATE OR REPLACE FUNCTION _iobeamdb_catalog.array_position_least(smallint[], smallint) RETURNS smallint + AS '$libdir/iobeamdb', 'array_position_least' LANGUAGE C IMMUTABLE STRICT; diff --git a/src/cache.c b/src/cache.c index 3edb8ff73..2e77efaf1 100644 --- a/src/cache.c +++ b/src/cache.c @@ -3,51 +3,95 @@ void cache_init(Cache *cache) { - if (cache->htab != NULL) + MemoryContext ctx, old; + if (cache->store != NULL) { elog(ERROR, "Cache %s is already initialized", cache->name); return; } - if (cache->hctl.hcxt == NULL) - { - cache->hctl.hcxt = AllocSetContextCreate(CacheMemoryContext, - cache->name, - ALLOCSET_DEFAULT_SIZES); + ctx = AllocSetContextCreate(CacheMemoryContext, + cache->name, + ALLOCSET_DEFAULT_SIZES); + old = MemoryContextSwitchTo(ctx); + + cache->store = palloc(sizeof(CacheStorage)); + + Assert(cache->hctl.hcxt == NULL); + cache->hctl.hcxt = ctx; + cache->store->htab = hash_create(cache->name, cache->numelements, + &cache->hctl, cache->flags); + cache->hctl.hcxt = NULL; + + cache->store->mcxt = ctx; + cache->store->valid = true; + cache->store->refcount = 0; + cache->store->destroy_storage_hook = cache->destroy_storage_hook; + + MemoryContextSwitchTo(old); +} + +static void +storage_destroy(CacheStorage *store) { + store->valid = false; + if (store->refcount > 0) { + //will be destroyed later + return; } - cache->htab = hash_create(cache->name, cache->numelements, - &cache->hctl, cache->flags); + if (store->destroy_storage_hook != NULL) + store->destroy_storage_hook(store); + + hash_destroy(store->htab); + MemoryContextDelete(store->mcxt); } void cache_invalidate(Cache *cache) { - if (cache->htab == NULL) + if (cache->store == NULL) return; - if (cache->pre_invalidate_hook != NULL) - cache->pre_invalidate_hook(cache); + storage_destroy(cache->store); + cache->store = NULL; - hash_destroy(cache->htab); - cache->htab = NULL; - MemoryContextDelete(cache->hctl.hcxt); - cache->hctl.hcxt = NULL; - - if (cache->post_invalidate_hook != NULL) - cache->post_invalidate_hook(cache); + cache_init(cache); //start new store } +/* + * Pinning storage is needed if any items returned by the cache + * may need to survive invalidation events (i.e. AcceptInvalidationMessages() may be called). + * + * Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open. + * + * Each call to cache_pin_storage MUST BE paired with a call to cache_release_storage. + * + */ +extern CacheStorage *cache_pin_storage(Cache *cache) +{ + cache->store->refcount++; + return cache->store; +} +extern void cache_release_storage(CacheStorage *store) +{ + Assert(store->refcount > 0); + store->refcount--; + if (!store->valid) { + storage_destroy(store); + } +} + + MemoryContext cache_memory_ctx(Cache *cache) { - return cache->hctl.hcxt; + return cache->store->mcxt; } MemoryContext cache_switch_to_memory_context(Cache *cache) { - return MemoryContextSwitchTo(cache->hctl.hcxt); + return MemoryContextSwitchTo(cache->store->mcxt); } void * @@ -55,12 +99,12 @@ cache_fetch(Cache *cache, CacheQueryCtx *ctx) { bool found; - if (cache->htab == NULL) + if (cache->store->htab == NULL) { elog(ERROR, "Hash %s not initialized", cache->name); } - ctx->entry = hash_search(cache->htab, cache->get_key(ctx), HASH_ENTER, &found); + ctx->entry = hash_search(cache->store->htab, cache->get_key(ctx), HASH_ENTER, &found); if (!found && cache->create_entry != NULL) { diff --git a/src/cache.h b/src/cache.h index 90b53ea4f..ed2292da9 100644 --- a/src/cache.h +++ b/src/cache.h @@ -11,18 +11,25 @@ typedef struct CacheQueryCtx void *private[0]; } CacheQueryCtx; +typedef struct CacheStorage { + HTAB *htab; + MemoryContext mcxt; + int refcount; + bool valid; + void (*destroy_storage_hook) (struct CacheStorage *); +} CacheStorage; + typedef struct Cache { HASHCTL hctl; - HTAB *htab; + CacheStorage *store; const char *name; long numelements; int flags; void *(*get_key) (struct CacheQueryCtx *); void *(*create_entry) (struct Cache *, CacheQueryCtx *); void *(*update_entry) (struct Cache *, CacheQueryCtx *); - void (*pre_invalidate_hook) (struct Cache *); - void (*post_invalidate_hook) (struct Cache *); + void (*destroy_storage_hook) (struct CacheStorage *); } Cache; extern void cache_init(Cache *cache); @@ -32,4 +39,7 @@ extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx); extern MemoryContext cache_memory_ctx(Cache *cache); extern MemoryContext cache_switch_to_memory_context(Cache *cache); +extern CacheStorage *cache_pin_storage(Cache *cache); +extern void cache_release_storage(CacheStorage *store); + #endif /* _IOBEAMDB_CACHE_H_ */ diff --git a/src/cache_invalidate.c b/src/cache_invalidate.c index 2e975cef5..795ee9fa5 100644 --- a/src/cache_invalidate.c +++ b/src/cache_invalidate.c @@ -95,7 +95,6 @@ invalidate_relcache_trigger(PG_FUNCTION_ARGS) void _cache_invalidate_init(void) { - CacheRegisterRelcacheCallback(inval_cache_callback, PointerGetDatum(NULL)); } void @@ -110,6 +109,7 @@ _cache_invalidate_fini(void) void _cache_invalidate_extload(void) { + CacheRegisterRelcacheCallback(inval_cache_callback, PointerGetDatum(NULL)); hypertable_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID); chunk_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID); } diff --git a/src/chunk_cache.c b/src/chunk_cache.c index ed92dba9f..c357d4ab8 100644 --- a/src/chunk_cache.c +++ b/src/chunk_cache.c @@ -31,10 +31,11 @@ */ typedef struct chunk_insert_plan_htable_entry { - int32 chunk_id; - int64 start_time; - int64 end_time; - SPIPlanPtr move_from_copyt_plan; + int32 chunk_id; + int64 start_time; + int64 end_time; + crn_set *crns; + SPIPlanPtr move_from_copyt_plan; } chunk_insert_plan_htable_entry; typedef struct ChunkCacheQueryCtx @@ -57,7 +58,7 @@ chunk_insert_plan_cache_get_key(CacheQueryCtx *ctx) static void *chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); static void *chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); -static void chunk_insert_plan_cache_pre_invalidate(Cache *cache); +static void chunk_insert_plan_cache_destroy_storage(CacheStorage *store); static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx); static Cache chunk_insert_plan_cache = { @@ -66,24 +67,22 @@ static Cache chunk_insert_plan_cache = { .entrysize = sizeof(chunk_insert_plan_htable_entry), .hcxt = NULL, }, - .htab = NULL, .name = CHUNK_CACHE_INVAL_PROXY_TABLE, .numelements = 16, .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, .get_key = chunk_insert_plan_cache_get_key, .create_entry = chunk_insert_plan_cache_create_entry, .update_entry = chunk_insert_plan_cache_update_entry, - .pre_invalidate_hook = chunk_insert_plan_cache_pre_invalidate, - .post_invalidate_hook = cache_init, + .destroy_storage_hook = chunk_insert_plan_cache_destroy_storage, }; static void -chunk_insert_plan_cache_pre_invalidate(Cache *cache) +chunk_insert_plan_cache_destroy_storage(CacheStorage *store) { chunk_insert_plan_htable_entry *entry; HASH_SEQ_STATUS scan; - hash_seq_init(&scan, cache->htab); + hash_seq_init(&scan, store->htab); while ((entry = hash_seq_search(&scan))) { @@ -96,7 +95,8 @@ chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_insert_plan_htable_entry *pe = ctx->entry; - char *insert_sql; + char *insert_sql; + MemoryContext old; insert_sql = get_copy_table_insert_sql(cctx); pe->chunk_id = cctx->chunk_id; @@ -104,6 +104,11 @@ chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) pe->end_time = cctx->chunk_end_time; pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); + /* TODO: old crn leaks memory here... */ + old = cache_switch_to_memory_context(cache); + pe->crns = fetch_crn_set(NULL, pe->chunk_id); + MemoryContextSwitchTo(old); + return pe; } @@ -112,7 +117,8 @@ chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_insert_plan_htable_entry *pe = ctx->entry; - char *insert_sql; + char *insert_sql; + MemoryContext old; if (pe->start_time == cctx->chunk_start_time && pe->end_time == cctx->chunk_end_time) @@ -122,6 +128,10 @@ chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) SPI_freeplan(pe->move_from_copyt_plan); pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); + old = cache_switch_to_memory_context(cache); + pe->crns = fetch_crn_set(NULL, pe->chunk_id); + MemoryContextSwitchTo(old); + return pe; } @@ -273,9 +283,11 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_ move_plan = get_chunk_insert_plan_cache_entry(hci, pe_entry, part, chunk->id, chunk->start_time, chunk->end_time); entry->move_from_copyt_plan = move_plan->move_from_copyt_plan; + entry->crns = move_plan->crns; return entry; } + static char * get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx) { @@ -351,6 +363,5 @@ _chunk_cache_init(void) void _chunk_cache_fini(void) { - chunk_insert_plan_cache.post_invalidate_hook = NULL; cache_invalidate(&chunk_insert_plan_cache); } diff --git a/src/chunk_cache.h b/src/chunk_cache.h index 75e32c9ba..b6a1bf673 100644 --- a/src/chunk_cache.h +++ b/src/chunk_cache.h @@ -4,6 +4,8 @@ #include #include +#include "metadata_queries.h" + #define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk" #define CHUNK_CACHE_INVAL_PROXY_OID \ get_relname_relid(CHUNK_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID) @@ -17,7 +19,8 @@ typedef struct chunk_cache_entry { int32 id; chunk_row *chunk; - SPIPlanPtr move_from_copyt_plan; + crn_set *crns; + SPIPlanPtr move_from_copyt_plan; } chunk_cache_entry; extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, diff --git a/src/hypertable_cache.c b/src/hypertable_cache.c index d4c48ed69..a847dcc8d 100644 --- a/src/hypertable_cache.c +++ b/src/hypertable_cache.c @@ -34,13 +34,11 @@ static Cache hypertable_cache = { .entrysize = sizeof(hypertable_cache_entry), .hcxt = NULL, }, - .htab = NULL, .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, .numelements = 16, .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, .get_key = hypertable_cache_get_key, .create_entry = hypertable_cache_create_entry, - .post_invalidate_hook = cache_init, }; /* Column numbers for 'hypertable' table in sql/common/tables.sql */ @@ -193,6 +191,13 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, return epoch; } +extern CacheStorage * +hypertable_cache_pin_storage() +{ + return cache_pin_storage(&hypertable_cache); +} + + void _hypertable_cache_init(void) { CreateCacheMemoryContext(); @@ -202,6 +207,5 @@ void _hypertable_cache_init(void) void _hypertable_cache_fini(void) { - hypertable_cache.post_invalidate_hook = NULL; cache_invalidate(&hypertable_cache); } diff --git a/src/hypertable_cache.h b/src/hypertable_cache.h index 7b9634c36..866cf808c 100644 --- a/src/hypertable_cache.h +++ b/src/hypertable_cache.h @@ -2,6 +2,7 @@ #define IOBEAMDB_HYPERTABLE_CACHE_H #include +#include "cache.h" typedef struct hypertable_basic_info hypertable_basic_info; typedef struct epoch_and_partitions_set epoch_and_partitions_set; @@ -30,6 +31,8 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, void invalidate_hypertable_cache_callback(void); +extern CacheStorage *hypertable_cache_pin_storage(void); + void _hypertable_cache_init(void); void _hypertable_cache_fini(void); diff --git a/src/insert.c b/src/insert.c index f2c574bb8..3d471e833 100644 --- a/src/insert.c +++ b/src/insert.c @@ -1,6 +1,8 @@ #include "postgres.h" #include "funcapi.h" #include "access/htup_details.h" +#include "access/relscan.h" +#include "commands/defrem.h" #include "catalog/namespace.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" @@ -25,9 +27,11 @@ #include "tcop/tcopprot.h" #include "tcop/utility.h" #include "deps/dblink.h" -#include "utils/tqual.h" +#include "parser/parse_utilcmd.h" +#include "parser/parser.h" #include "access/xact.h" +#include "access/htup_details.h" #include "parser/parse_oper.h" #include "parser/parse_func.h" @@ -41,12 +45,19 @@ #include "utils.h" #include "metadata_queries.h" #include "partitioning.h" +#include "scanner.h" + +#include +#include +#include +#include #define INSERT_TRIGGER_COPY_TABLE_FN "insert_trigger_on_copy_table_c" #define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger" /* private funcs */ -static int tuple_fnumber(TupleDesc tupdesc, const char *fname); +static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions); +static Node *get_keyspace_fn_call(PartitioningInfo *part_info); /* * Inserts rows from the temporary copy table into correct hypertable child tables. @@ -69,8 +80,312 @@ get_close_if_needed_fn() return single; } -PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c); +/* + * + * Helper functions for inserting tuples into chunk tables + * + * We insert one chunk at a time and hold a context while we insert + * a particular chunk; + * + * */ + +typedef struct ChunkInsertCtxRel +{ + Relation rel; + TupleTableSlot *slot; + EState *estate; + ResultRelInfo *resultRelInfo; + BulkInsertState bistate; +} ChunkInsertCtxRel; + +static ChunkInsertCtxRel* +chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range_table) { + TupleDesc tupDesc; + ChunkInsertCtxRel *rel_ctx = palloc(sizeof(ChunkInsertCtxRel)); + + rel_ctx->estate = CreateExecutorState(); + tupDesc = RelationGetDescr(rel); + + rel_ctx->estate->es_result_relations = resultRelInfo; + rel_ctx->estate->es_num_result_relations = 1; + rel_ctx->estate->es_result_relation_info = resultRelInfo; + rel_ctx->estate->es_range_table = range_table; + + rel_ctx->slot = ExecInitExtraTupleSlot(rel_ctx->estate); + ExecSetSlotDescriptor(rel_ctx->slot, tupDesc); + + rel_ctx->rel = rel; + rel_ctx->resultRelInfo = resultRelInfo; + rel_ctx->bistate = GetBulkInsertState(); + return rel_ctx; +} + +static void +chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx) +{ + FreeBulkInsertState(rel_ctx->bistate); + ExecCloseIndices(rel_ctx->resultRelInfo); + ExecResetTupleTable(rel_ctx->estate->es_tupleTable, false); + FreeExecutorState(rel_ctx->estate); + heap_close(rel_ctx->rel, NoLock); +} + + +static void +chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) +{ + int hi_options = 0; /* no optimization */ + CommandId mycid = GetCurrentCommandId(true); + + /* + * Constraints might reference the tableoid column, so initialize + * t_tableOid before evaluating them. + */ + tuple->t_tableOid = RelationGetRelid(rel_ctx->rel); + + ExecStoreTuple(tuple, rel_ctx->slot, InvalidBuffer, false); + + if (rel_ctx->rel->rd_att->constr) + ExecConstraints(rel_ctx->resultRelInfo, rel_ctx->slot, rel_ctx->estate); + + /* OK, store the tuple and create index entries for it */ + heap_insert(rel_ctx->rel, tuple, mycid, hi_options, rel_ctx->bistate); + + if (rel_ctx->resultRelInfo->ri_NumIndices > 0) + ExecInsertIndexTuples(rel_ctx->slot, &(tuple->t_self), + rel_ctx->estate, false, NULL, + NIL); +} + +typedef struct ChunkInsertCtx +{ + chunk_cache_entry *chunk; + List *ctxs; +} ChunkInsertCtx; + +static ChunkInsertCtx * +chunk_insert_ctx_new(chunk_cache_entry *chunk) +{ + /* int num_tables = list_length(chunk->crns->tables); */ + ListCell *lc; + List *rel_ctx_list = NIL; + ChunkInsertCtx *ctx; + + foreach(lc, chunk->crns->tables) + { + crn_row *cr = lfirst(lc); + RangeVar *rv = makeRangeVarFromNameList(list_make2(makeString(cr->schema_name.data), makeString(cr->table_name.data))); + Relation rel; + RangeTblEntry *rte; + List *range_table; + ResultRelInfo *resultRelInfo; + ChunkInsertCtxRel *rel_ctx;; + + rel = heap_openrv(rv, RowExclusiveLock); + + /* permission check */ + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = RelationGetRelid(rel); + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = ACL_INSERT; + range_table = list_make1(rte); + + ExecCheckRTPerms(range_table, true); + + if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Hypertables don't support Row level security"))); + + } + + if (XactReadOnly && !rel->rd_islocaltemp) + PreventCommandIfReadOnly("COPY FROM"); + PreventCommandIfParallelMode("COPY FROM"); + + if (rel->rd_rel->relkind != RELKIND_RELATION) + { + elog(ERROR, "inserting not to table"); + } + + /* + * We need a ResultRelInfo so we can use the regular executor's + * index-entry-making machinery. (There used to be a huge amount of + * code here that basically duplicated execUtils.c ...) + */ + + resultRelInfo = makeNode(ResultRelInfo); + InitResultRelInfo(resultRelInfo, + rel, + 1, /* dummy rangetable index */ + 0); + + ExecOpenIndices(resultRelInfo, false); + + if (resultRelInfo->ri_TrigDesc != NULL) + { + elog(ERROR, "triggers on chunk tables not supported"); + } + + rel_ctx = chunk_insert_ctx_rel_new(rel, resultRelInfo, range_table); + rel_ctx_list = lappend(rel_ctx_list, rel_ctx); + } + + ctx = palloc(sizeof(ChunkInsertCtx)); + ctx->ctxs = rel_ctx_list; + ctx->chunk = chunk; + return ctx; +} + +static void +chunk_insert_ctx_destroy(ChunkInsertCtx *ctx) +{ + ListCell *lc; + + if (ctx == NULL) + { + return; + } + + foreach(lc, ctx->ctxs) + { + ChunkInsertCtxRel *rel_ctx = lfirst(lc); + chunk_insert_ctx_rel_destroy(rel_ctx); + } +} + +static void +chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup) +{ + ListCell *lc; + + foreach(lc, ctx->ctxs) + { + ChunkInsertCtxRel *rel_ctx = lfirst(lc); + chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(rel_ctx, tup); + } +} + +typedef struct CopyTableQueryCtx { + Partition *part; + ChunkInsertCtx *chunk_ctx; + epoch_and_partitions_set *pe; + hypertable_cache_entry *hci; +} CopyTableQueryCtx; + +static bool +copy_table_tuple_found(TupleInfo *ti, void *data) +{ + bool is_null; + CopyTableQueryCtx *ctx = data; + int16 keyspace_pt; + int64 time_pt; + + if (ctx->pe->num_partitions > 1) + { + //Datum partition_datum = index_getattr(scan->xs_itup, 1, scan->xs_itupdesc, &is_null); + Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null); + Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null); + + //partition_no = DatumGetInt16(partition_datum); + time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); + keyspace_pt = DatumGetInt16(keyspace_datum); + } + else + { + Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null); + time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); + keyspace_pt = -1; + } + + + if (ctx->chunk_ctx != NULL && !(ctx->chunk_ctx->chunk->chunk->start_time <= time_pt && ctx->chunk_ctx->chunk->chunk->end_time >= time_pt)) + { + /* moving on to next chunk; */ + chunk_insert_ctx_destroy(ctx->chunk_ctx); + ctx->chunk_ctx = NULL; + + } + if (ctx->part != NULL && !(ctx->part->keyspace_start <= keyspace_pt && ctx->part->keyspace_end >= keyspace_pt)) + { + /* moving on to next ctx->partition. */ + chunk_insert_ctx_destroy(ctx->chunk_ctx); + ctx->chunk_ctx = NULL; + ctx->part = NULL; + } + + if (ctx->part == NULL) + { + ctx->part = partition_epoch_get_partition(ctx->pe, keyspace_pt); + } + + if (ctx->chunk_ctx == NULL) + { + Datum was_closed_datum; + chunk_cache_entry *chunk; + /* + * TODO: this first call should be non-locking and use a cache(for + * performance) + */ + chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, false); + was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id)); + /* chunk may have been closed and thus changed /or/ need to get share lock */ + chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, true); + + ctx->chunk_ctx = chunk_insert_ctx_new(chunk); + /* elog(WARNING, "got new chunk %d", chunk->id); */ + } + + /* + * elog(WARNING, "time is partition_no: %d keyspace: %d time: %ld + * chunk %d", partition_no, keyspace_pt, time_pt, chunk->id); + */ + + /* insert here: */ + //has to be a copy(not sure why) + chunk_insert_ctx_insert_tuple(ctx->chunk_ctx,heap_copytuple(ti->tuple)); + return true; +} + +static void scan_copy_table_and_insert_post(int num_tuples, void *data) +{ + CopyTableQueryCtx *ctx = data; + if (ctx->chunk_ctx != NULL) + chunk_insert_ctx_destroy(ctx->chunk_ctx); +} + +static void scan_copy_table_and_insert( hypertable_cache_entry *hci, + epoch_and_partitions_set *pe, + Oid table, Oid index) +{ + CopyTableQueryCtx query_ctx = { + .pe = pe, + .hci = hci, + }; + + ScannerCtx scanCtx = { + .table = table, + .index = index, + .scantype = ScannerTypeIndex, + .want_itup = true, + .nkeys = 0, + .scankey = NULL, + .data = &query_ctx, + .tuple_found = copy_table_tuple_found, + .postscan = scan_copy_table_and_insert_post, + .lockmode = AccessShareLock, + .scandirection = ForwardScanDirection, + }; + + /* Perform an index scan on primary key. */ + scanner_scan(&scanCtx); +} + + +PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c); Datum insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) { @@ -78,17 +393,15 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) /* arg 0 = hypertable id */ char *hypertable_id_arg = trigdata->tg_trigger->tgargs[0]; + int32 hypertable_id = atoi(hypertable_id_arg); - HeapTuple firstrow; hypertable_cache_entry *hci; - int time_fnum, - i, - num_chunks; - bool isnull; - List *chunk_id_list = NIL; - ListCell *chunk_id_cell; - int *chunk_id_array; + epoch_and_partitions_set *pe; + CacheStorage *hypertable_cache_storage; + ObjectAddress idx; + + DropStmt *drop = makeNode(DropStmt); /* * --This guard protects against calling insert_data() twice in the same @@ -101,9 +414,6 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) * two different hypertables. */ char *insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true); - HeapScanDesc scan; - ScanKeyData scankey[1]; - int nkeys = 0; if (insert_guard != NULL && strcmp(insert_guard, "on") == 0) { @@ -119,98 +429,37 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) * get the hypertable cache; use the time column name to figure out the * column fnum for time field */ - hci = hypertable_cache_get(atoi(hypertable_id_arg)); - time_fnum = tuple_fnumber(trigdata->tg_relation->rd_att, hci->time_column_name); + hypertable_cache_storage = hypertable_cache_pin_storage(); - scan = heap_beginscan(trigdata->tg_relation, SnapshotSelf, nkeys, scankey); - - /* get one row in a loop until the copy table is empty. */ - while ((firstrow = heap_getnext(scan, ForwardScanDirection))) - { - Datum time_datum; - int64 time_internal; - epoch_and_partitions_set *pe_entry; - Partition *part = NULL; - chunk_cache_entry *entry; - int ret; - - time_datum = heap_getattr(firstrow, time_fnum, trigdata->tg_relation->rd_att, &isnull); + hci = hypertable_cache_get(hypertable_id); - if (isnull) - { - elog(ERROR, "Time column is null"); - } + /* TODO: hack assumes single pe. */ + pe = hypertable_cache_get_partition_epoch(hci, 0, trigdata->tg_relation->rd_id); - time_internal = time_value_to_internal(time_datum, hci->time_column_type); - - pe_entry = hypertable_cache_get_partition_epoch(hci, time_internal, - trigdata->tg_relation->rd_id); + /* + * create an index that colocates row from the same chunk together and + * guarantees an order on chunk access as well + */ + idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, + partition_epoch_get_partition_end_times(pe), pe->num_partitions); - if (pe_entry->partitioning != NULL && pe_entry->num_partitions > 1) - { - PartitioningInfo *pi = pe_entry->partitioning; - Datum part_value = heap_getattr(firstrow, pi->column_attnum, - trigdata->tg_relation->rd_att, &isnull); - int16 keyspace_pt = partitioning_func_apply(&pi->partfunc, part_value); - /* get the partition using the keyspace value of the row. */ - part = partition_epoch_get_partition(pe_entry, keyspace_pt); - } - else - { - /* Not Partitioning: get the first and only partition */ - part = partition_epoch_get_partition(pe_entry, -1); - } + scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId); - entry = get_chunk_cache_entry(hci, pe_entry, part, time_internal, true); - - if (entry->chunk->end_time == OPEN_END_TIME) - { - chunk_id_list = lappend_int(chunk_id_list, entry->id); - } - if (SPI_connect() < 0) - { - elog(ERROR, "Got an SPI connect error"); - } - ret = SPI_execute_plan(entry->move_from_copyt_plan, NULL, NULL, false, 1); - if (ret <= 0) - { - elog(ERROR, "Got an SPI error %d", ret); - } - SPI_finish(); + cache_release_storage(hypertable_cache_storage); - } + drop->removeType = OBJECT_INDEX; + drop->missing_ok = FALSE; + drop->objects = list_make1(list_make1(makeString("copy_insert"))); + drop->arguments = NIL; + drop->behavior = DROP_RESTRICT; + drop->concurrent = false; - heap_endscan(scan); - - /* build chunk id array */ - num_chunks = list_length(chunk_id_list); - chunk_id_array = palloc(sizeof(int) * num_chunks); - i = 0; - foreach(chunk_id_cell, chunk_id_list) - { - int chunk_id = lfirst_int(chunk_id_cell); - - chunk_id_array[i++] = chunk_id; - } - /* sort by chunk_id to avoid deadlocks */ - qsort(chunk_id_array, num_chunks, sizeof(int), int_cmp); - - /* close chunks */ - for (i = 0; i < num_chunks; i++) - { - /* TODO: running this on every insert is really expensive */ - /* Keep some kind of cache of result an run this heuristically. */ - int32 chunk_id = chunk_id_array[i]; - - FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk_id)); - } + RemoveRelations(drop); return PointerGetDatum(NULL); } - - /* Creates a temp table for INSERT and COPY commands. This table * stores the data until it is distributed to the appropriate chunks. * The copy table uses ON COMMIT DELETE ROWS and inherits from the root table. @@ -287,18 +536,165 @@ create_copy_table(int32 hypertable_id, Oid root_oid) return copyTableRelationAddr.objectId; } - -static int -tuple_fnumber(TupleDesc tupdesc, const char *fname) +/* creates index for inserting in set chunk order. + * + * The index is the following: + * If there is a partitioning_func: + * partition_no, time, keyspace_value + * If there is no partitioning_func: + * time + * + * Partition_num is simply a unique number identifying the partition for the epoch the row belongs to. + * It is obtained by getting the maximal index in the end_time_partitions array such that the keyspace value + * is less than or equal to the value in the array. + * + * Keyspace_value without partition_num is not sufficient because: + * consider the partitions with keyspaces 0-5,6-10, and time partitions 100-200,201-300 + * Then consider the following input: + * row 1: keyspace=0, time=100 + * row 2: keyspace=2, time=250 + * row 3: keyspace=4, time=100 + * row 1 and 3 should be in the same chunk but they are now not together in the order (row 2 is between them). + * + * keyspace_value should probably be moved out of the index. + * + * */ +static ObjectAddress +create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions) { - int res; + IndexStmt *index_stmt = makeNode(IndexStmt); + IndexElem *time_elem; + Oid relid; + List *indexElem = NIL; + int i; - for (res = 0; res < tupdesc->natts; res++) + time_elem = makeNode(IndexElem); + time_elem->name = time_field; + time_elem->expr = NULL; + time_elem->indexcolname = NULL; + time_elem->collation = NIL; + time_elem->opclass = NIL; + time_elem->ordering = SORTBY_DEFAULT; + time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + + if (part_info != NULL) { - if (namestrcmp(&tupdesc->attrs[res]->attname, fname) == 0) - return res + 1; + IndexElem *partition_elem; + IndexElem *keyspace_elem; + List *array_pos_func_name = list_make2(makeString("_iobeamdb_catalog"), makeString("array_position_least")); + List *array_pos_args; + List *array_list = NIL; + A_ArrayExpr *array_expr; + FuncCall *array_pos_fc; + + for (i = 0; i < num_partitions; i++) + { + A_Const *end_time_const = makeNode(A_Const); + TypeCast *cast = makeNode(TypeCast); + + end_time_const->val = *makeInteger((int) end_time_partitions[i]); + end_time_const->location = -1; + + + cast->arg = (Node *) end_time_const; + cast->typeName = SystemTypeName("int2"); + cast->location = -1; + + array_list = lappend(array_list, cast); + } + + array_expr = makeNode(A_ArrayExpr); + array_expr->elements = array_list; + array_expr->location = -1; + + array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info)); + array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1); + + partition_elem = makeNode(IndexElem); + partition_elem->name = NULL; + partition_elem->expr = (Node *) array_pos_fc; + partition_elem->indexcolname = NULL; + partition_elem->collation = NIL; + partition_elem->opclass = NIL; + partition_elem->ordering = SORTBY_DEFAULT; + partition_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + + keyspace_elem = makeNode(IndexElem); + keyspace_elem->name = NULL; + keyspace_elem->expr = (Node *) get_keyspace_fn_call(part_info); + keyspace_elem->indexcolname = NULL; + keyspace_elem->collation = NIL; + keyspace_elem->opclass = NIL; + keyspace_elem->ordering = SORTBY_DEFAULT; + keyspace_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + + /* partition_number, time, keyspace */ + /* can probably get rid of keyspace but later */ + indexElem = list_make3(partition_elem, time_elem, keyspace_elem); + } + else + { + indexElem = list_make1(time_elem); } - elog(ERROR, "field not found: %s", fname); + index_stmt->idxname = "copy_insert"; + index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1); + index_stmt->accessMethod = "btree"; + index_stmt->tableSpace = NULL; + index_stmt->indexParams = indexElem; + index_stmt->options = NULL; + index_stmt->whereClause = NULL; + index_stmt->excludeOpNames = NIL; + index_stmt->idxcomment = NULL; + index_stmt->indexOid = InvalidOid; + index_stmt->oldNode = InvalidOid; + index_stmt->unique = false; + index_stmt->primary = false; + index_stmt->isconstraint = false; + index_stmt->deferrable = false; + index_stmt->initdeferred = false; + index_stmt->transformed = false; + index_stmt->concurrent = false; + index_stmt->if_not_exists = false; + + relid = + RangeVarGetRelidExtended(index_stmt->relation, ShareLock, + false, false, + RangeVarCallbackOwnsRelation, + NULL); + + index_stmt = transformIndexStmt(relid, index_stmt, ""); + return DefineIndex(relid, /* OID of heap relation */ + index_stmt, + InvalidOid, /* no predefined OID */ + false, /* is_alter_table */ + true, /* check_rights */ + false, /* skip_build */ + false); /* quiet */ + } +/* Helper function to create the FuncCall for calculating the keyspace_value. Used for + * creating the copy_insert index + * + */ +static Node * +get_keyspace_fn_call(PartitioningInfo *part_info) +{ + ColumnRef *col_ref = makeNode(ColumnRef); + A_Const *mod_const; + List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name)); + List *part_func_args; + FuncCall *part_fc; + + col_ref->fields = list_make1(makeString(part_info->column)); + col_ref->location = -1; + + mod_const = makeNode(A_Const); + mod_const->val = *makeInteger(part_info->partfunc.modulos); + mod_const->location = -1; + + part_func_args = list_make2(col_ref, mod_const); + part_fc = makeFuncCall(part_func_name, part_func_args, -1); + return (Node *) part_fc; +} diff --git a/src/iobeamdb.c b/src/iobeamdb.c index 68118090c..603d3c83f 100644 --- a/src/iobeamdb.c +++ b/src/iobeamdb.c @@ -760,6 +760,10 @@ void iobeamdb_ProcessUtility(Node *parsetree, return; } + /*if(IsA(parsetree, IndexStmt)) { + pprint(parsetree); + }*/ + prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); } diff --git a/src/partitioning.c b/src/partitioning.c index bd096a07b..2547e2aa5 100644 --- a/src/partitioning.c +++ b/src/partitioning.c @@ -175,6 +175,8 @@ partition_epoch_tuple_found(TupleInfo *ti, void *arg) DatumGetCString(partcol), DatumGetInt16(partmod), pctx->relid); + } else { + pe->partitioning = NULL; } /* Scan for the epoch's partitions */ @@ -338,3 +340,19 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt return part; } + +int16 * +partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch) +{ + + int16 *end_times_partitions = palloc(sizeof(int16) * epoch->num_partitions); + int i; + + for (i = 0; i < epoch->num_partitions; i++) + { + end_times_partitions[i] = epoch->partitions[i].keyspace_end; + } + + return end_times_partitions; +} + diff --git a/src/partitioning.h b/src/partitioning.h index 3d6d4d8f8..4eefb88fc 100644 --- a/src/partitioning.h +++ b/src/partitioning.h @@ -52,5 +52,5 @@ epoch_and_partitions_set *partition_epoch_scan(int32 hypertable_id, int64 timepo int16 partitioning_func_apply(PartitioningFunc *pf, Datum value); Partition *partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt); - +int16 * partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch); #endif /* IOBEAMDB_PARTITIONING_H */ diff --git a/src/pgmurmur3.c b/src/pgmurmur3.c index 9922843f8..6daa4502f 100644 --- a/src/pgmurmur3.c +++ b/src/pgmurmur3.c @@ -1,9 +1,12 @@ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: t; c-basic-offset: 4 -*- */ #include "pgmurmur3.h" +#include #include "utils/builtins.h" +#include +#include -/* adapted from https://github.com/markokr/pghashlib/ */ +/* adapted from https://github.com/markokr/pghashlib/ */ PG_FUNCTION_INFO_V1(pg_murmur3_hash_string); @@ -11,7 +14,7 @@ Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS) { struct varlena *data; - uint64_t io[MAX_IO_VALUES]; + uint64_t io[MAX_IO_VALUES]; memset(io, 0, sizeof(io)); @@ -36,12 +39,12 @@ PG_FUNCTION_INFO_V1(get_partition_for_key); Datum get_partition_for_key(PG_FUNCTION_ARGS) { -// SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; - struct varlena *data; - int32 mod; - Datum hash_d; - int32 hash_i; - int16 res; +/* SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; */ + struct varlena *data; + int32 mod; + Datum hash_d; + int32 hash_i; + int16 res; /* request aligned data on weird architectures */ #ifdef HLIB_UNALIGNED_READ_OK data = PG_GETARG_VARLENA_PP(0); @@ -57,3 +60,98 @@ get_partition_for_key(PG_FUNCTION_ARGS) PG_RETURN_INT16(res); } + + +/* + * array_position_least + */ + +PG_FUNCTION_INFO_V1(array_position_least); + +Datum +array_position_least(PG_FUNCTION_ARGS) +{ + ArrayType *array; + Datum searched_element, + value; + bool isnull; + int position; + ArrayMetaState *my_extra; + ArrayIterator array_iterator; + + if (PG_ARGISNULL(0) || PG_ARGISNULL(1)) + { + elog(ERROR, "neither parameter should be null"); + } + + array = PG_GETARG_ARRAYTYPE_P(0); + if (INT2OID != ARR_ELEMTYPE(array)) + { + elog(ERROR, "only support smallint arrays"); + } + + /* + * We refuse to search for elements in multi-dimensional arrays, since we + * have no good way to report the element's location in the array. + */ + if (ARR_NDIM(array) > 1) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("searching for elements in multidimensional arrays is not supported"))); + + if (PG_ARGISNULL(1)) + { + elog(ERROR, "does not expect null"); + } + else + { + searched_element = PG_GETARG_DATUM(1); + } + + position = (ARR_LBOUND(array))[0] - 1; + + /* + * We arrange to look up type info for array_create_iterator only once per + * series of calls, assuming the element type doesn't change underneath + * us. + */ + my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra; + if (my_extra == NULL) + { + fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt, + sizeof(ArrayMetaState)); + my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra; + get_typlenbyvalalign(INT2OID, + &my_extra->typlen, + &my_extra->typbyval, + &my_extra->typalign); + my_extra->element_type = INT2OID; + } + + /* Examine each array element until we find a match. */ + array_iterator = array_create_iterator(array, 0, my_extra); + while (array_iterate(array_iterator, &value, &isnull)) + { + position++; + + /* + * Can't look at the array element's value if it's null; but if we + * search for null, we have a hit and are done. + */ + if (isnull) + { + elog(ERROR, "No element in array should be null"); + } + + if (DatumGetInt16(value) >= DatumGetInt16(searched_element)) + break; + } + + array_free_iterator(array_iterator); + + /* Avoid leaking memory when handed toasted input */ + PG_FREE_IF_COPY(array, 0); + + + PG_RETURN_INT16(position); +} diff --git a/src/pgmurmur3.h b/src/pgmurmur3.h index eca0f46ad..573e42bc3 100644 --- a/src/pgmurmur3.h +++ b/src/pgmurmur3.h @@ -9,7 +9,7 @@ #include #endif #ifdef HAVE_INTTYPES_H - #include +#include #endif #if !defined(PG_VERSION_NUM) || (PG_VERSION_NUM < 80300) @@ -36,10 +36,9 @@ #define MAX_IO_VALUES 2 /* hash function signatures */ -void hlib_murmur3(const void *data, size_t len, uint64_t *io); +void hlib_murmur3(const void *data, size_t len, uint64_t *io); /* SQL function */ -Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS); +Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS); #endif - diff --git a/src/scanner.c b/src/scanner.c index 08fe1ed16..4f9986aaa 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -30,7 +30,7 @@ typedef struct InternalScannerCtx { typedef struct Scanner { Relation (*open)(InternalScannerCtx *ctx); ScanDesc (*beginscan)(InternalScannerCtx *ctx); - HeapTuple (*getnext)(InternalScannerCtx *ctx); + bool (*getnext)(InternalScannerCtx *ctx, TupleInfo *ti); void (*endscan)(InternalScannerCtx *ctx); void (*close)(InternalScannerCtx *ctx); } Scanner; @@ -50,9 +50,10 @@ static ScanDesc heap_scanner_beginscan(InternalScannerCtx *ctx) return ctx->scan; } -static HeapTuple heap_scanner_getnext(InternalScannerCtx *ctx) +static bool heap_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) { - return heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); + ti->tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); + return HeapTupleIsValid(ti->tuple); } static void heap_scanner_endscan(InternalScannerCtx *ctx) @@ -79,14 +80,18 @@ static ScanDesc index_scanner_beginscan(InternalScannerCtx *ctx) ctx->scan.index_scan = index_beginscan(ctx->tablerel, ctx->indexrel, SnapshotSelf, sctx->nkeys, sctx->norderbys); + ctx->scan.index_scan->xs_want_itup = ctx->sctx->want_itup; index_rescan(ctx->scan.index_scan, sctx->scankey, sctx->nkeys, NULL, sctx->norderbys); return ctx->scan; } -static HeapTuple index_scanner_getnext(InternalScannerCtx *ctx) +static bool index_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) { - return index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); + ti->tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); + ti->ituple = ctx->scan.index_scan->xs_itup; + ti->ituple_desc = ctx->scan.index_scan->xs_itupdesc; + return HeapTupleIsValid(ti->tuple); } static void index_scanner_endscan(InternalScannerCtx *ctx) @@ -129,8 +134,9 @@ static Scanner scanners[] = { */ int scanner_scan(ScannerCtx *ctx) { - HeapTuple tuple; TupleDesc tuple_desc; + TupleInfo ti; + bool is_valid; int num_tuples = 0; Scanner *scanner = &scanners[ctx->scantype]; InternalScannerCtx ictx = { @@ -141,20 +147,18 @@ int scanner_scan(ScannerCtx *ctx) scanner->beginscan(&ictx); tuple_desc = RelationGetDescr(ictx.tablerel); + + ti.scanrel = ictx.tablerel; + ti.desc = tuple_desc; /* Call pre-scan handler, if any. */ if (ctx->prescan != NULL) ctx->prescan(ctx->data); - tuple = scanner->getnext(&ictx); + is_valid = scanner->getnext(&ictx, &ti); - while (HeapTupleIsValid(tuple)) + while (is_valid) { - TupleInfo ti = { - .scanrel = ictx.tablerel, - .tuple = tuple, - .desc = tuple_desc, - }; if (ctx->filter == NULL || ctx->filter(&ti, ctx->data)) { @@ -165,7 +169,7 @@ int scanner_scan(ScannerCtx *ctx) Buffer buffer; HeapUpdateFailureData hufd; - ti.lockresult = heap_lock_tuple(ictx.tablerel, tuple, + ti.lockresult = heap_lock_tuple(ictx.tablerel, ti.tuple, GetCurrentCommandId(false), ctx->tuplock.lockmode, ctx->tuplock.waitpolicy, @@ -180,7 +184,7 @@ int scanner_scan(ScannerCtx *ctx) break; } - tuple = scanner->getnext(&ictx); + is_valid = scanner->getnext(&ictx,&ti); } /* Call post-scan handler, if any. */ diff --git a/src/scanner.h b/src/scanner.h index 44541c17b..47bf0d83a 100644 --- a/src/scanner.h +++ b/src/scanner.h @@ -18,6 +18,9 @@ typedef struct TupleInfo Relation scanrel; HeapTuple tuple; TupleDesc desc; + /* return index tuple if it was requested -- only for index scans */ + IndexTuple ituple; + TupleDesc ituple_desc; /* * If the user requested a tuple lock, the result of the lock is passed on * in lockresult. @@ -31,6 +34,7 @@ typedef struct ScannerCtx { ScannerType scantype; ScanKey scankey; int nkeys, norderbys; + bool want_itup; LOCKMODE lockmode; struct { LockTupleMode lockmode; diff --git a/test/expected/drop_chunks.out b/test/expected/drop_chunks.out index 8dfe3d8c4..02450c9b7 100644 --- a/test/expected/drop_chunks.out +++ b/test/expected/drop_chunks.out @@ -91,13 +91,13 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 7 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_7_data | | 1 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | (12 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; @@ -159,12 +159,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | (10 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; @@ -221,12 +221,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table 3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3 4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4 5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5 - 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6 + 6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2 9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3 10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4 11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5 - 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6 + 12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | (9 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; diff --git a/test/expected/insert.out b/test/expected/insert.out index 36a248a7f..4481b4224 100644 --- a/test/expected/insert.out +++ b/test/expected/insert.out @@ -90,7 +90,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c ----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------ 4 | 3 | | 1 | 4 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_4_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 5 | 3 | 2 | 2 | 5 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_5_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 - 6 | 3 | 3 | 3 | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 + 6 | 3 | 3 | | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 (3 rows) \d+ "_iobeamdb_internal".* @@ -351,7 +351,7 @@ Inherits: _iobeamdb_internal._hyper_2_3_0_partition device_id | text | | extended | | Check constraints: "partition" CHECK (_iobeamdb_catalog.get_partition_for_key(device_id, 32768) >= '0'::smallint AND _iobeamdb_catalog.get_partition_for_key(device_id, 32768) <= '32767'::smallint) - "time_range" CHECK ("time" >= '3'::bigint AND "time" <= '3'::bigint) NOT VALID + "time_range" CHECK ("time" >= '3'::bigint) NOT VALID Inherits: _iobeamdb_internal._hyper_2_3_0_partition Table "_iobeamdb_internal._hyper_2_3_0_partition" @@ -400,7 +400,7 @@ SELECT * FROM _iobeamdb_catalog.chunk; 3 | 2 | | 4 | 3 | | 1 5 | 3 | 2 | 2 - 6 | 3 | 3 | 3 + 6 | 3 | 3 | (6 rows) SELECT * FROM _iobeamdb_catalog.chunk_replica_node; diff --git a/test/expected/insert_single.out b/test/expected/insert_single.out index bd2e42452..5c4b5e16b 100644 --- a/test/expected/insert_single.out +++ b/test/expected/insert_single.out @@ -91,7 +91,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c ----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------ 3 | 2 | | 1 | 3 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_3_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 4 | 2 | 2 | 2 | 4 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_4_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 - 5 | 2 | 3 | 3 | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 + 5 | 2 | 3 | | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000 (3 rows) \c single diff --git a/test/expected/timestamp.out b/test/expected/timestamp.out index 3dd496170..baa82e685 100644 --- a/test/expected/timestamp.out +++ b/test/expected/timestamp.out @@ -61,9 +61,9 @@ INSERT INTO PUBLIC."testNs"("timeCustom", device_id, series_0, series_1) VALUES SELECT * FROM PUBLIC."testNs"; timeCustom | device_id | series_0 | series_1 | series_2 | series_bool --------------------------+-----------+----------+----------+----------+------------- + Tue Nov 10 23:00:02 2009 | dev1 | 2.5 | 3 | | Thu Nov 12 01:00:00 2009 | dev1 | 1.5 | 1 | | Thu Nov 12 01:00:00 2009 | dev1 | 1.5 | 2 | | - Tue Nov 10 23:00:02 2009 | dev1 | 2.5 | 3 | | Tue Nov 10 23:00:00 2009 | dev2 | 1.5 | 1 | | Tue Nov 10 23:00:00 2009 | dev2 | 1.5 | 2 | | (5 rows) From 2404940315f66d10222c5d3cd2704955ecbaa617 Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 2 Mar 2017 19:31:58 -0500 Subject: [PATCH 2/3] getting rid of move_plan in cache. creating the plan caused a lock to be taken on the chunk table -- a lock which lead to a deadlock --- sql/common/cache.sql | 4 +++ src/cache_invalidate.c | 19 ++++++++++++ src/chunk_cache.c | 69 +++++++++++++++--------------------------- 3 files changed, 47 insertions(+), 45 deletions(-) diff --git a/sql/common/cache.sql b/sql/common/cache.sql index 079b6b837..0997af3e8 100644 --- a/sql/common/cache.sql +++ b/sql/common/cache.sql @@ -9,6 +9,10 @@ SELECT pg_catalog.pg_extension_config_dump('_iobeamdb_cache.cache_inval_chunk', CREATE OR REPLACE FUNCTION _iobeamdb_cache.invalidate_relcache_trigger() RETURNS TRIGGER AS '$libdir/iobeamdb', 'invalidate_relcache_trigger' LANGUAGE C; +--for debugging +CREATE OR REPLACE FUNCTION _iobeamdb_cache.invalidate_relcache(proxy_oid OID) + RETURNS BOOLEAN AS '$libdir/iobeamdb', 'invalidate_relcache' LANGUAGE C; + CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.hypertable FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_hypertable'); diff --git a/src/cache_invalidate.c b/src/cache_invalidate.c index 795ee9fa5..2b212e67c 100644 --- a/src/cache_invalidate.c +++ b/src/cache_invalidate.c @@ -38,6 +38,7 @@ void _cache_invalidate_extload(void); Datum invalidate_relcache_trigger(PG_FUNCTION_ARGS); PG_FUNCTION_INFO_V1(invalidate_relcache_trigger); +PG_FUNCTION_INFO_V1(invalidate_relcache); static Oid hypertable_cache_inval_proxy_oid = InvalidOid; @@ -91,6 +92,24 @@ invalidate_relcache_trigger(PG_FUNCTION_ARGS) return PointerGetDatum(trigdata->tg_trigtuple); } +/* + * This is similar to invalidate_relcache_trigger but not a trigger. + * Not used regularly but useful for debugging. + * + */ + +Datum +invalidate_relcache(PG_FUNCTION_ARGS) +{ + Oid proxy_oid = PG_GETARG_OID(0); + + /* arg 0 = relid of the cache_inval_proxy table */ + CacheInvalidateRelcacheByRelid(proxy_oid); + + /* tuple to return to executor */ + return BoolGetDatum(true); +} + void _cache_invalidate_init(void) diff --git a/src/chunk_cache.c b/src/chunk_cache.c index c357d4ab8..902d98993 100644 --- a/src/chunk_cache.c +++ b/src/chunk_cache.c @@ -17,7 +17,7 @@ /* * Chunk Insert Plan Cache: * - * Hashtable of chunk_id => chunk_insert_plan_htable_entry. + * Hashtable of chunk_id => chunk_crn_set_htable_entry. * * This cache stores plans for the execution of the command for moving stuff * from the copy table to the tables associated with the chunk. @@ -29,14 +29,13 @@ * each insert anyway... * */ -typedef struct chunk_insert_plan_htable_entry +typedef struct chunk_crn_set_htable_entry { int32 chunk_id; int64 start_time; int64 end_time; crn_set *crns; - SPIPlanPtr move_from_copyt_plan; -} chunk_insert_plan_htable_entry; +} chunk_crn_set_htable_entry; typedef struct ChunkCacheQueryCtx { @@ -50,51 +49,35 @@ typedef struct ChunkCacheQueryCtx } ChunkCacheQueryCtx; static void * -chunk_insert_plan_cache_get_key(CacheQueryCtx *ctx) +chunk_crn_set_cache_get_key(CacheQueryCtx *ctx) { return &((ChunkCacheQueryCtx *) ctx)->chunk_id; } -static void *chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); -static void *chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); +static void *chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); +static void *chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); -static void chunk_insert_plan_cache_destroy_storage(CacheStorage *store); static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx); -static Cache chunk_insert_plan_cache = { +static Cache chunk_crn_set_cache = { .hctl = { .keysize = sizeof(int32), - .entrysize = sizeof(chunk_insert_plan_htable_entry), + .entrysize = sizeof(chunk_crn_set_htable_entry), .hcxt = NULL, }, .name = CHUNK_CACHE_INVAL_PROXY_TABLE, .numelements = 16, .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, - .get_key = chunk_insert_plan_cache_get_key, - .create_entry = chunk_insert_plan_cache_create_entry, - .update_entry = chunk_insert_plan_cache_update_entry, - .destroy_storage_hook = chunk_insert_plan_cache_destroy_storage, + .get_key = chunk_crn_set_cache_get_key, + .create_entry = chunk_crn_set_cache_create_entry, + .update_entry = chunk_crn_set_cache_update_entry, }; -static void -chunk_insert_plan_cache_destroy_storage(CacheStorage *store) -{ - chunk_insert_plan_htable_entry *entry; - HASH_SEQ_STATUS scan; - - hash_seq_init(&scan, store->htab); - - while ((entry = hash_seq_search(&scan))) - { - SPI_freeplan(entry->move_from_copyt_plan); - } -} - static void * -chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) +chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; - chunk_insert_plan_htable_entry *pe = ctx->entry; + chunk_crn_set_htable_entry *pe = ctx->entry; char *insert_sql; MemoryContext old; @@ -102,7 +85,6 @@ chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) pe->chunk_id = cctx->chunk_id; pe->start_time = cctx->chunk_start_time; pe->end_time = cctx->chunk_end_time; - pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); /* TODO: old crn leaks memory here... */ old = cache_switch_to_memory_context(cache); @@ -113,10 +95,10 @@ chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) } static void * -chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) +chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; - chunk_insert_plan_htable_entry *pe = ctx->entry; + chunk_crn_set_htable_entry *pe = ctx->entry; char *insert_sql; MemoryContext old; @@ -125,8 +107,6 @@ chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) return pe; insert_sql = get_copy_table_insert_sql(cctx); - SPI_freeplan(pe->move_from_copyt_plan); - pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL); old = cache_switch_to_memory_context(cache); pe->crns = fetch_crn_set(NULL, pe->chunk_id); @@ -139,11 +119,11 @@ void invalidate_chunk_cache_callback(void) { CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache"); - cache_invalidate(&chunk_insert_plan_cache); + cache_invalidate(&chunk_crn_set_cache); } -static chunk_insert_plan_htable_entry * -get_chunk_insert_plan_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, +static chunk_crn_set_htable_entry * +get_chunk_crn_set_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, Partition *part, int32 chunk_id, int64 chunk_start_time, int64 chunk_end_time) { @@ -156,7 +136,7 @@ get_chunk_insert_plan_cache_entry(hypertable_cache_entry *hci, epoch_and_partiti .chunk_end_time = chunk_end_time, }; - return cache_fetch(&chunk_insert_plan_cache, &ctx.cctx); + return cache_fetch(&chunk_crn_set_cache, &ctx.cctx); } static chunk_row * @@ -266,7 +246,7 @@ chunk_cache_entry * get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, Partition *part, int64 timepoint, bool lock) { - chunk_insert_plan_htable_entry *move_plan; + chunk_crn_set_htable_entry *chunk_crn_cache; chunk_cache_entry *entry; chunk_row *chunk; @@ -280,10 +260,9 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_ entry = palloc(sizeof(chunk_cache_entry)); entry->chunk = chunk; entry->id = chunk->id; - move_plan = get_chunk_insert_plan_cache_entry(hci, pe_entry, part, chunk->id, + chunk_crn_cache = get_chunk_crn_set_cache_entry(hci, pe_entry, part, chunk->id, chunk->start_time, chunk->end_time); - entry->move_from_copyt_plan = move_plan->move_from_copyt_plan; - entry->crns = move_plan->crns; + entry->crns = chunk_crn_cache->crns; return entry; } @@ -357,11 +336,11 @@ void _chunk_cache_init(void) { CreateCacheMemoryContext(); - cache_init(&chunk_insert_plan_cache); + cache_init(&chunk_crn_set_cache); } void _chunk_cache_fini(void) { - cache_invalidate(&chunk_insert_plan_cache); + cache_invalidate(&chunk_crn_set_cache); } From 4d4ac78ef510aa25f795ef69dfe99a2395959ebb Mon Sep 17 00:00:00 2001 From: Matvey Arye Date: Thu, 2 Mar 2017 22:07:55 -0500 Subject: [PATCH 3/3] cleanup --- src/cache.c | 81 +++++++++------------ src/cache.h | 19 ++--- src/cache_invalidate.c | 4 +- src/chunk_cache.c | 155 ++++++++++++++--------------------------- src/chunk_cache.h | 11 +-- src/hypertable_cache.c | 61 ++++++++++------ src/hypertable_cache.h | 8 +-- src/insert.c | 126 ++++++++++++++------------------- src/iobeamdb.c | 4 -- src/metadata_queries.c | 5 ++ src/metadata_queries.h | 2 + src/partitioning.c | 20 ++---- src/partitioning.h | 5 +- src/pgmurmur3.c | 23 +++--- src/pgmurmur3.h | 4 ++ src/scanner.c | 34 ++++----- 16 files changed, 235 insertions(+), 327 deletions(-) diff --git a/src/cache.c b/src/cache.c index 2e77efaf1..57079b5ff 100644 --- a/src/cache.c +++ b/src/cache.c @@ -1,97 +1,78 @@ #include "cache.h" + void cache_init(Cache *cache) { - MemoryContext ctx, old; - if (cache->store != NULL) + if (cache->htab != NULL) { elog(ERROR, "Cache %s is already initialized", cache->name); return; } - ctx = AllocSetContextCreate(CacheMemoryContext, - cache->name, - ALLOCSET_DEFAULT_SIZES); - old = MemoryContextSwitchTo(ctx); - - cache->store = palloc(sizeof(CacheStorage)); - - Assert(cache->hctl.hcxt == NULL); - cache->hctl.hcxt = ctx; - cache->store->htab = hash_create(cache->name, cache->numelements, + cache->htab = hash_create(cache->name, cache->numelements, &cache->hctl, cache->flags); - cache->hctl.hcxt = NULL; - - cache->store->mcxt = ctx; - cache->store->valid = true; - cache->store->refcount = 0; - cache->store->destroy_storage_hook = cache->destroy_storage_hook; - - MemoryContextSwitchTo(old); + cache->refcount = 1; } static void -storage_destroy(CacheStorage *store) { - store->valid = false; - if (store->refcount > 0) { - //will be destroyed later +cache_destroy(Cache *cache) { + if (cache->refcount > 0) { + /* will be destroyed later */ return; } - if (store->destroy_storage_hook != NULL) - store->destroy_storage_hook(store); + if (cache->pre_destroy_hook != NULL) + cache->pre_destroy_hook(cache); - hash_destroy(store->htab); - MemoryContextDelete(store->mcxt); + hash_destroy(cache->htab); + cache->htab = NULL; + MemoryContextDelete(cache->hctl.hcxt); + cache->hctl.hcxt = NULL; } void cache_invalidate(Cache *cache) { - if (cache->store == NULL) + if (cache == NULL) return; - - storage_destroy(cache->store); - cache->store = NULL; - - cache_init(cache); //start new store + cache->refcount--; + cache_destroy(cache); } /* - * Pinning storage is needed if any items returned by the cache + * Pinning is needed if any items returned by the cache * may need to survive invalidation events (i.e. AcceptInvalidationMessages() may be called). * - * Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open. + * Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open). * - * Each call to cache_pin_storage MUST BE paired with a call to cache_release_storage. + * Each call to cache_pin MUST BE paired with a call to cache_release. * */ -extern CacheStorage *cache_pin_storage(Cache *cache) +extern Cache *cache_pin(Cache *cache) { - cache->store->refcount++; - return cache->store; + cache->refcount++; + return cache; } -extern void cache_release_storage(CacheStorage *store) + +extern void cache_release(Cache *cache) { - Assert(store->refcount > 0); - store->refcount--; - if (!store->valid) { - storage_destroy(store); - } + Assert(cache->refcount > 0); + cache->refcount--; + cache_destroy(cache); } MemoryContext cache_memory_ctx(Cache *cache) { - return cache->store->mcxt; + return cache->hctl.hcxt; } MemoryContext cache_switch_to_memory_context(Cache *cache) { - return MemoryContextSwitchTo(cache->store->mcxt); + return MemoryContextSwitchTo(cache->hctl.hcxt); } void * @@ -99,12 +80,12 @@ cache_fetch(Cache *cache, CacheQueryCtx *ctx) { bool found; - if (cache->store->htab == NULL) + if (cache->htab == NULL) { elog(ERROR, "Hash %s not initialized", cache->name); } - ctx->entry = hash_search(cache->store->htab, cache->get_key(ctx), HASH_ENTER, &found); + ctx->entry = hash_search(cache->htab, cache->get_key(ctx), HASH_ENTER, &found); if (!found && cache->create_entry != NULL) { diff --git a/src/cache.h b/src/cache.h index ed2292da9..703ef9492 100644 --- a/src/cache.h +++ b/src/cache.h @@ -11,25 +11,18 @@ typedef struct CacheQueryCtx void *private[0]; } CacheQueryCtx; -typedef struct CacheStorage { - HTAB *htab; - MemoryContext mcxt; - int refcount; - bool valid; - void (*destroy_storage_hook) (struct CacheStorage *); -} CacheStorage; - typedef struct Cache { HASHCTL hctl; - CacheStorage *store; - const char *name; + HTAB *htab; + int refcount; + const char *name; long numelements; int flags; void *(*get_key) (struct CacheQueryCtx *); void *(*create_entry) (struct Cache *, CacheQueryCtx *); void *(*update_entry) (struct Cache *, CacheQueryCtx *); - void (*destroy_storage_hook) (struct CacheStorage *); + void (*pre_destroy_hook) (struct Cache *); } Cache; extern void cache_init(Cache *cache); @@ -39,7 +32,7 @@ extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx); extern MemoryContext cache_memory_ctx(Cache *cache); extern MemoryContext cache_switch_to_memory_context(Cache *cache); -extern CacheStorage *cache_pin_storage(Cache *cache); -extern void cache_release_storage(CacheStorage *store); +extern Cache *cache_pin(Cache *cache); +extern void cache_release(Cache *cache); #endif /* _IOBEAMDB_CACHE_H_ */ diff --git a/src/cache_invalidate.c b/src/cache_invalidate.c index 2b212e67c..69e23e8b1 100644 --- a/src/cache_invalidate.c +++ b/src/cache_invalidate.c @@ -57,10 +57,10 @@ inval_cache_callback(Datum arg, Oid relid) return; if (relid == InvalidOid || relid == hypertable_cache_inval_proxy_oid) - invalidate_hypertable_cache_callback(); + hypertable_cache_invalidate_callback(); if (relid == InvalidOid || relid == chunk_cache_inval_proxy_oid) - invalidate_chunk_cache_callback(); + chunk_crn_set_cache_invalidate_callback(); } /* diff --git a/src/chunk_cache.c b/src/chunk_cache.c index 902d98993..223f3ce95 100644 --- a/src/chunk_cache.c +++ b/src/chunk_cache.c @@ -40,9 +40,6 @@ typedef struct chunk_crn_set_htable_entry typedef struct ChunkCacheQueryCtx { CacheQueryCtx cctx; - hypertable_cache_entry *hci; - epoch_and_partitions_set *pe_entry; - Partition *part; int32 chunk_id; int64 chunk_start_time; int64 chunk_end_time; @@ -57,31 +54,40 @@ chunk_crn_set_cache_get_key(CacheQueryCtx *ctx) static void *chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx); static void *chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx); -static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx); +static Cache *chunk_crn_set_cache_create() { + MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext, + CHUNK_CACHE_INVAL_PROXY_TABLE, + ALLOCSET_DEFAULT_SIZES); -static Cache chunk_crn_set_cache = { - .hctl = { - .keysize = sizeof(int32), - .entrysize = sizeof(chunk_crn_set_htable_entry), - .hcxt = NULL, - }, - .name = CHUNK_CACHE_INVAL_PROXY_TABLE, - .numelements = 16, - .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, - .get_key = chunk_crn_set_cache_get_key, - .create_entry = chunk_crn_set_cache_create_entry, - .update_entry = chunk_crn_set_cache_update_entry, -}; + Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache)); + *cache = (Cache) { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(chunk_crn_set_htable_entry), + .hcxt = ctx, + }, + .name = CHUNK_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = chunk_crn_set_cache_get_key, + .create_entry = chunk_crn_set_cache_create_entry, + .update_entry = chunk_crn_set_cache_update_entry, + }; + + cache_init(cache); + + return cache; +} + +static Cache *chunk_crn_set_cache_current = NULL; static void * chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_crn_set_htable_entry *pe = ctx->entry; - char *insert_sql; MemoryContext old; - insert_sql = get_copy_table_insert_sql(cctx); pe->chunk_id = cctx->chunk_id; pe->start_time = cctx->chunk_start_time; pe->end_time = cctx->chunk_end_time; @@ -99,15 +105,12 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) { ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx; chunk_crn_set_htable_entry *pe = ctx->entry; - char *insert_sql; MemoryContext old; if (pe->start_time == cctx->chunk_start_time && pe->end_time == cctx->chunk_end_time) return pe; - insert_sql = get_copy_table_insert_sql(cctx); - old = cache_switch_to_memory_context(cache); pe->crns = fetch_crn_set(NULL, pe->chunk_id); MemoryContextSwitchTo(old); @@ -116,29 +119,36 @@ chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx) } void -invalidate_chunk_cache_callback(void) +chunk_crn_set_cache_invalidate_callback(void) { - CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache"); - cache_invalidate(&chunk_crn_set_cache); + CACHE1_elog(WARNING, "DESTROY chunk_crn_set_cache"); + cache_invalidate(chunk_crn_set_cache_current); + chunk_crn_set_cache_current = chunk_crn_set_cache_create(); } static chunk_crn_set_htable_entry * -get_chunk_crn_set_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int32 chunk_id, int64 chunk_start_time, - int64 chunk_end_time) +chunk_crn_set_cache_get_entry(Cache *cache, int32 chunk_id, int64 chunk_start_time, int64 chunk_end_time) { + if (cache == NULL) + { + cache = chunk_crn_set_cache_current; + } ChunkCacheQueryCtx ctx = { - .hci = hci, - .pe_entry = pe_entry, - .part = part, .chunk_id = chunk_id, .chunk_start_time = chunk_start_time, .chunk_end_time = chunk_end_time, }; - return cache_fetch(&chunk_crn_set_cache, &ctx.cctx); + return cache_fetch(cache, &ctx.cctx); } +extern Cache * +chunk_crn_set_cache_pin() +{ + return cache_pin(chunk_crn_set_cache_current); +} + + static chunk_row * chunk_row_create(int32 id, int32 partition_id, int64 starttime, int64 endtime) { @@ -242,9 +252,12 @@ chunk_scan(int32 partition_id, int64 timepoint, bool tuplock) return cctx.chunk; } +/* + * Get chunk cache entry. + * The cache parameter is a chunk_crn_set_cache (can be null to use current cache). + */ chunk_cache_entry * -get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int64 timepoint, bool lock) +get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock) { chunk_crn_set_htable_entry *chunk_crn_cache; chunk_cache_entry *entry; @@ -260,87 +273,21 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_ entry = palloc(sizeof(chunk_cache_entry)); entry->chunk = chunk; entry->id = chunk->id; - chunk_crn_cache = get_chunk_crn_set_cache_entry(hci, pe_entry, part, chunk->id, - chunk->start_time, chunk->end_time); + chunk_crn_cache = chunk_crn_set_cache_get_entry(cache, chunk->id, + chunk->start_time, chunk->end_time); entry->crns = chunk_crn_cache->crns; return entry; } - -static char * -get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx) -{ - StringInfo where_clause = makeStringInfo(); - StringInfo insert_clauses = makeStringInfo(); - StringInfo sql_insert = makeStringInfo(); - ListCell *cell; - int i; - crn_set *crn = fetch_crn_set(NULL, ctx->chunk_id); - - appendStringInfo(where_clause, "WHERE TRUE"); - - if (ctx->pe_entry->num_partitions > 1) - { - appendStringInfo(where_clause, " AND (%s.%s(%s::TEXT, %d) BETWEEN %d AND %d)", - quote_identifier(ctx->pe_entry->partitioning->partfunc.schema), - quote_identifier(ctx->pe_entry->partitioning->partfunc.name), - quote_identifier(ctx->pe_entry->partitioning->column), - ctx->pe_entry->partitioning->partfunc.modulos, - ctx->part->keyspace_start, - ctx->part->keyspace_end); - } - - - if (ctx->chunk_start_time != OPEN_START_TIME) - { - appendStringInfo(where_clause, " AND (%1$s >= %2$s) ", - quote_identifier(ctx->hci->time_column_name), - internal_time_to_column_literal_sql(ctx->chunk_start_time, - ctx->hci->time_column_type)); - } - - if (ctx->chunk_end_time != OPEN_END_TIME) - { - appendStringInfo(where_clause, " AND (%1$s <= %2$s) ", - quote_identifier(ctx->hci->time_column_name), - internal_time_to_column_literal_sql(ctx->chunk_end_time, - ctx->hci->time_column_type)); - } - - i = 0; - foreach(cell, crn->tables) - { - crn_row *tab = lfirst(cell); - - i = i + 1; - appendStringInfo(insert_clauses, "i_%d AS (INSERT INTO %s.%s SELECT * FROM selected)", - i, - quote_identifier(tab->schema_name.data), - quote_identifier(tab->table_name.data) - ); - } - pfree(crn); - crn = NULL; - - appendStringInfo(sql_insert, "\ - WITH selected AS ( DELETE FROM ONLY %1$s %2$s RETURNING * ), \ - %3$s \ - SELECT 1", copy_table_name(ctx->hci->id), - where_clause->data, - insert_clauses->data); - - return sql_insert->data; -} - void _chunk_cache_init(void) { CreateCacheMemoryContext(); - cache_init(&chunk_crn_set_cache); + chunk_crn_set_cache_current = chunk_crn_set_cache_create(); } void _chunk_cache_fini(void) { - cache_invalidate(&chunk_crn_set_cache); + cache_invalidate(chunk_crn_set_cache_current); } diff --git a/src/chunk_cache.h b/src/chunk_cache.h index b6a1bf673..c8764c266 100644 --- a/src/chunk_cache.h +++ b/src/chunk_cache.h @@ -5,6 +5,7 @@ #include #include "metadata_queries.h" +#include "cache.h" #define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk" #define CHUNK_CACHE_INVAL_PROXY_OID \ @@ -20,14 +21,14 @@ typedef struct chunk_cache_entry int32 id; chunk_row *chunk; crn_set *crns; - SPIPlanPtr move_from_copyt_plan; } chunk_cache_entry; + + +extern chunk_cache_entry *get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock); -extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry, - Partition *part, int64 time_pt, bool lock); - -extern void invalidate_chunk_cache_callback(void); +extern void chunk_crn_set_cache_invalidate_callback(void); +extern Cache *chunk_crn_set_cache_pin(void); extern void _chunk_cache_init(void); extern void _chunk_cache_fini(void); diff --git a/src/hypertable_cache.c b/src/hypertable_cache.c index a847dcc8d..4f6fe98e2 100644 --- a/src/hypertable_cache.c +++ b/src/hypertable_cache.c @@ -28,19 +28,33 @@ hypertable_cache_get_key(CacheQueryCtx *ctx) return &((HypertableCacheQueryCtx *) ctx)->hypertable_id; } -static Cache hypertable_cache = { - .hctl = { - .keysize = sizeof(int32), - .entrysize = sizeof(hypertable_cache_entry), - .hcxt = NULL, - }, - .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, - .numelements = 16, - .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, - .get_key = hypertable_cache_get_key, - .create_entry = hypertable_cache_create_entry, -}; +static Cache *hypertable_cache_create() { + MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext, + HYPERTABLE_CACHE_INVAL_PROXY_TABLE, + ALLOCSET_DEFAULT_SIZES); + + Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache)); + *cache = (Cache) { + .hctl = { + .keysize = sizeof(int32), + .entrysize = sizeof(hypertable_cache_entry), + .hcxt = ctx, + }, + .name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE, + .numelements = 16, + .flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS, + .get_key = hypertable_cache_get_key, + .create_entry = hypertable_cache_create_entry, + }; + + cache_init(cache); + + return cache; +} + + +static Cache *hypertable_cache_current = NULL; /* Column numbers for 'hypertable' table in sql/common/tables.sql */ #define HT_TBL_COL_ID 1 #define HT_TBL_COL_TIME_COL_NAME 10 @@ -101,22 +115,23 @@ hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx) } void -invalidate_hypertable_cache_callback(void) +hypertable_cache_invalidate_callback(void) { CACHE1_elog(WARNING, "DESTROY hypertable_cache"); - cache_invalidate(&hypertable_cache); + cache_invalidate(hypertable_cache_current); + hypertable_cache_current = hypertable_cache_create(); } /* Get hypertable cache entry. If the entry is not in the cache, add it. */ hypertable_cache_entry * -hypertable_cache_get(int32 hypertable_id) +hypertable_cache_get_entry(Cache *cache, int32 hypertable_id) { HypertableCacheQueryCtx ctx = { .hypertable_id = hypertable_id, }; - return cache_fetch(&hypertable_cache, &ctx.cctx); + return cache_fetch(cache, &ctx.cctx); } /* function to compare epochs */ @@ -140,7 +155,7 @@ cmp_epochs(const void *time_pt_pointer, const void *test) } epoch_and_partitions_set * -hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid) +hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid) { MemoryContext old; epoch_and_partitions_set *epoch, @@ -166,7 +181,7 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, return (*cache_entry); } - old = cache_switch_to_memory_context(&hypertable_cache); + old = cache_switch_to_memory_context(cache); epoch = partition_epoch_scan(hce->id, time_pt, relid); /* check if full */ @@ -191,21 +206,21 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, return epoch; } -extern CacheStorage * -hypertable_cache_pin_storage() +extern Cache * +hypertable_cache_pin() { - return cache_pin_storage(&hypertable_cache); + return cache_pin(hypertable_cache_current); } void _hypertable_cache_init(void) { CreateCacheMemoryContext(); - cache_init(&hypertable_cache); + hypertable_cache_current = hypertable_cache_create(); } void _hypertable_cache_fini(void) { - cache_invalidate(&hypertable_cache); + cache_invalidate(hypertable_cache_current); } diff --git a/src/hypertable_cache.h b/src/hypertable_cache.h index 866cf808c..88acd7abf 100644 --- a/src/hypertable_cache.h +++ b/src/hypertable_cache.h @@ -24,14 +24,14 @@ typedef struct hypertable_cache_entry epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY]; } hypertable_cache_entry; -hypertable_cache_entry *hypertable_cache_get(int32 hypertable_id); +hypertable_cache_entry *hypertable_cache_get_entry(Cache * cache, int32 hypertable_id); epoch_and_partitions_set * -hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid); +hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid); -void invalidate_hypertable_cache_callback(void); +void hypertable_cache_invalidate_callback(void); -extern CacheStorage *hypertable_cache_pin_storage(void); +extern Cache *hypertable_cache_pin(void); void _hypertable_cache_init(void); void _hypertable_cache_fini(void); diff --git a/src/insert.c b/src/insert.c index 3d471e833..a50eb482c 100644 --- a/src/insert.c +++ b/src/insert.c @@ -46,6 +46,8 @@ #include "metadata_queries.h" #include "partitioning.h" #include "scanner.h" +#include "catalog.h" +#include "pgmurmur3.h" #include #include @@ -56,7 +58,8 @@ #define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger" /* private funcs */ -static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions); + +static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info,epoch_and_partitions_set *epoch); static Node *get_keyspace_fn_call(PartitioningInfo *part_info); /* @@ -133,7 +136,7 @@ chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx) static void -chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) +chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple) { int hi_options = 0; /* no optimization */ CommandId mycid = GetCurrentCommandId(true); @@ -161,16 +164,19 @@ chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(ChunkInsertCtxRel *rel_ctx, H typedef struct ChunkInsertCtx { chunk_cache_entry *chunk; + Cache *pinned; List *ctxs; } ChunkInsertCtx; static ChunkInsertCtx * -chunk_insert_ctx_new(chunk_cache_entry *chunk) +chunk_insert_ctx_new(chunk_cache_entry *chunk, Cache *pinned) { - /* int num_tables = list_length(chunk->crns->tables); */ ListCell *lc; List *rel_ctx_list = NIL; ChunkInsertCtx *ctx; + + ctx = palloc(sizeof(ChunkInsertCtx)); + ctx->pinned = pinned; foreach(lc, chunk->crns->tables) { @@ -234,7 +240,6 @@ chunk_insert_ctx_new(chunk_cache_entry *chunk) rel_ctx_list = lappend(rel_ctx_list, rel_ctx); } - ctx = palloc(sizeof(ChunkInsertCtx)); ctx->ctxs = rel_ctx_list; ctx->chunk = chunk; return ctx; @@ -250,6 +255,8 @@ chunk_insert_ctx_destroy(ChunkInsertCtx *ctx) return; } + cache_release(ctx->pinned); + foreach(lc, ctx->ctxs) { ChunkInsertCtxRel *rel_ctx = lfirst(lc); @@ -265,7 +272,7 @@ chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup) foreach(lc, ctx->ctxs) { ChunkInsertCtxRel *rel_ctx = lfirst(lc); - chunk_insert_ctx_rel_chunk_insert_ctx_insert_tuple(rel_ctx, tup); + chunk_insert_ctx_rel_insert_tuple(rel_ctx, tup); } } @@ -286,11 +293,10 @@ copy_table_tuple_found(TupleInfo *ti, void *data) if (ctx->pe->num_partitions > 1) { - //Datum partition_datum = index_getattr(scan->xs_itup, 1, scan->xs_itupdesc, &is_null); + /* first element is partition index (used for sorting but not necessary here) */ Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null); Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null); - //partition_no = DatumGetInt16(partition_datum); time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); keyspace_pt = DatumGetInt16(keyspace_datum); } @@ -298,18 +304,18 @@ copy_table_tuple_found(TupleInfo *ti, void *data) { Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null); time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type); - keyspace_pt = -1; + keyspace_pt = KEYSPACE_PT_NO_PARTITIONING; } - if (ctx->chunk_ctx != NULL && !(ctx->chunk_ctx->chunk->chunk->start_time <= time_pt && ctx->chunk_ctx->chunk->chunk->end_time >= time_pt)) + if (ctx->chunk_ctx != NULL && !chunk_row_timepoint_is_member(ctx->chunk_ctx->chunk->chunk, time_pt)) { /* moving on to next chunk; */ chunk_insert_ctx_destroy(ctx->chunk_ctx); ctx->chunk_ctx = NULL; } - if (ctx->part != NULL && !(ctx->part->keyspace_start <= keyspace_pt && ctx->part->keyspace_end >= keyspace_pt)) + if (ctx->part != NULL && !partition_keyspace_pt_is_member(ctx->part, keyspace_pt)) { /* moving on to next ctx->partition. */ chunk_insert_ctx_destroy(ctx->chunk_ctx); @@ -326,26 +332,21 @@ copy_table_tuple_found(TupleInfo *ti, void *data) { Datum was_closed_datum; chunk_cache_entry *chunk; + Cache *pinned = chunk_crn_set_cache_pin(); /* * TODO: this first call should be non-locking and use a cache(for * performance) */ - chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, false); + chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, false); was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id)); /* chunk may have been closed and thus changed /or/ need to get share lock */ - chunk = get_chunk_cache_entry(ctx->hci, ctx->pe, ctx->part, time_pt, true); + chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, true); - ctx->chunk_ctx = chunk_insert_ctx_new(chunk); - /* elog(WARNING, "got new chunk %d", chunk->id); */ + ctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned); } - /* - * elog(WARNING, "time is partition_no: %d keyspace: %d time: %ld - * chunk %d", partition_no, keyspace_pt, time_pt, chunk->id); - */ - /* insert here: */ - //has to be a copy(not sure why) + /* has to be a copy(not sure why) */ chunk_insert_ctx_insert_tuple(ctx->chunk_ctx,heap_copytuple(ti->tuple)); return true; } @@ -398,7 +399,7 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) hypertable_cache_entry *hci; epoch_and_partitions_set *pe; - CacheStorage *hypertable_cache_storage; + Cache *hypertable_cache; ObjectAddress idx; DropStmt *drop = makeNode(DropStmt); @@ -429,24 +430,23 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS) * get the hypertable cache; use the time column name to figure out the * column fnum for time field */ - hypertable_cache_storage = hypertable_cache_pin_storage(); + hypertable_cache = hypertable_cache_pin(); - hci = hypertable_cache_get(hypertable_id); + hci = hypertable_cache_get_entry(hypertable_cache, hypertable_id); /* TODO: hack assumes single pe. */ - pe = hypertable_cache_get_partition_epoch(hci, 0, trigdata->tg_relation->rd_id); + pe = hypertable_cache_get_partition_epoch(hypertable_cache, hci, 0, trigdata->tg_relation->rd_id); /* * create an index that colocates row from the same chunk together and * guarantees an order on chunk access as well */ - idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, - partition_epoch_get_partition_end_times(pe), pe->num_partitions); + idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, pe); scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId); - cache_release_storage(hypertable_cache_storage); + cache_release(hypertable_cache); drop->removeType = OBJECT_INDEX; drop->missing_ok = FALSE; @@ -536,6 +536,21 @@ create_copy_table(int32 hypertable_id, Oid root_oid) return copyTableRelationAddr.objectId; } +static IndexElem * +makeIndexElem(char *name, Node *expr){ + Assert((name ==NULL || expr == NULL) && (name !=NULL || expr !=NULL)); + + IndexElem *time_elem = makeNode(IndexElem); + time_elem->name = name; + time_elem->expr = expr; + time_elem->indexcolname = NULL; + time_elem->collation = NIL; + time_elem->opclass = NIL; + time_elem->ordering = SORTBY_DEFAULT; + time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + return time_elem; +} + /* creates index for inserting in set chunk order. * * The index is the following: @@ -560,7 +575,7 @@ create_copy_table(int32 hypertable_id, Oid root_oid) * * */ static ObjectAddress -create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, int16 *end_time_partitions, int num_partitions) +create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, epoch_and_partitions_set *epoch) { IndexStmt *index_stmt = makeNode(IndexStmt); IndexElem *time_elem; @@ -568,31 +583,24 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par List *indexElem = NIL; int i; - time_elem = makeNode(IndexElem); - time_elem->name = time_field; - time_elem->expr = NULL; - time_elem->indexcolname = NULL; - time_elem->collation = NIL; - time_elem->opclass = NIL; - time_elem->ordering = SORTBY_DEFAULT; - time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + time_elem = makeIndexElem(time_field, NULL); if (part_info != NULL) { IndexElem *partition_elem; IndexElem *keyspace_elem; - List *array_pos_func_name = list_make2(makeString("_iobeamdb_catalog"), makeString("array_position_least")); + List *array_pos_func_name = list_make2(makeString(CATALOG_SCHEMA_NAME), makeString(ARRAY_POSITION_LEAST_FN_NAME)); List *array_pos_args; List *array_list = NIL; A_ArrayExpr *array_expr; FuncCall *array_pos_fc; - for (i = 0; i < num_partitions; i++) + for (i = 0; i < epoch->num_partitions; i++) { A_Const *end_time_const = makeNode(A_Const); TypeCast *cast = makeNode(TypeCast); - end_time_const->val = *makeInteger((int) end_time_partitions[i]); + end_time_const->val = *makeInteger((int) epoch->partitions[i].keyspace_end); end_time_const->location = -1; @@ -610,23 +618,8 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info)); array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1); - partition_elem = makeNode(IndexElem); - partition_elem->name = NULL; - partition_elem->expr = (Node *) array_pos_fc; - partition_elem->indexcolname = NULL; - partition_elem->collation = NIL; - partition_elem->opclass = NIL; - partition_elem->ordering = SORTBY_DEFAULT; - partition_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; - - keyspace_elem = makeNode(IndexElem); - keyspace_elem->name = NULL; - keyspace_elem->expr = (Node *) get_keyspace_fn_call(part_info); - keyspace_elem->indexcolname = NULL; - keyspace_elem->collation = NIL; - keyspace_elem->opclass = NIL; - keyspace_elem->ordering = SORTBY_DEFAULT; - keyspace_elem->nulls_ordering = SORTBY_NULLS_DEFAULT; + partition_elem = makeIndexElem(NULL, (Node *) array_pos_fc); + keyspace_elem = makeIndexElem(NULL, (Node *) get_keyspace_fn_call(part_info)); /* partition_number, time, keyspace */ /* can probably get rid of keyspace but later */ @@ -640,22 +633,7 @@ create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *par index_stmt->idxname = "copy_insert"; index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1); index_stmt->accessMethod = "btree"; - index_stmt->tableSpace = NULL; index_stmt->indexParams = indexElem; - index_stmt->options = NULL; - index_stmt->whereClause = NULL; - index_stmt->excludeOpNames = NIL; - index_stmt->idxcomment = NULL; - index_stmt->indexOid = InvalidOid; - index_stmt->oldNode = InvalidOid; - index_stmt->unique = false; - index_stmt->primary = false; - index_stmt->isconstraint = false; - index_stmt->deferrable = false; - index_stmt->initdeferred = false; - index_stmt->transformed = false; - index_stmt->concurrent = false; - index_stmt->if_not_exists = false; relid = RangeVarGetRelidExtended(index_stmt->relation, ShareLock, @@ -685,7 +663,6 @@ get_keyspace_fn_call(PartitioningInfo *part_info) A_Const *mod_const; List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name)); List *part_func_args; - FuncCall *part_fc; col_ref->fields = list_make1(makeString(part_info->column)); col_ref->location = -1; @@ -695,6 +672,5 @@ get_keyspace_fn_call(PartitioningInfo *part_info) mod_const->location = -1; part_func_args = list_make2(col_ref, mod_const); - part_fc = makeFuncCall(part_func_name, part_func_args, -1); - return (Node *) part_fc; + return (Node *) makeFuncCall(part_func_name, part_func_args, -1); } diff --git a/src/iobeamdb.c b/src/iobeamdb.c index 603d3c83f..68118090c 100644 --- a/src/iobeamdb.c +++ b/src/iobeamdb.c @@ -760,10 +760,6 @@ void iobeamdb_ProcessUtility(Node *parsetree, return; } - /*if(IsA(parsetree, IndexStmt)) { - pprint(parsetree); - }*/ - prev_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); } diff --git a/src/metadata_queries.c b/src/metadata_queries.c index 762b02c62..92df3b211 100644 --- a/src/metadata_queries.c +++ b/src/metadata_queries.c @@ -223,3 +223,8 @@ chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock) return chunk; } + + +bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt){ + return row->start_time <= time_pt && row->end_time >= time_pt; +} diff --git a/src/metadata_queries.h b/src/metadata_queries.h index ff4d9ed55..e4c3a91c7 100644 --- a/src/metadata_queries.h +++ b/src/metadata_queries.h @@ -47,4 +47,6 @@ extern crn_set *fetch_crn_set(crn_set *entry, int32 chunk_id); chunk_row * chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock); +bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt); + #endif /* IOBEAMDB_METADATA_QUERIES_H */ diff --git a/src/partitioning.c b/src/partitioning.c index 2547e2aa5..a235e0b27 100644 --- a/src/partitioning.c +++ b/src/partitioning.c @@ -296,7 +296,7 @@ cmp_partitions(const void *keyspace_pt_arg, const void *value) int16 keyspace_pt = *((int16 *) keyspace_pt_arg); const Partition *part = value; - if (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt) + if (partition_keyspace_pt_is_member(part, keyspace_pt)) { return 0; } @@ -319,7 +319,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt return NULL; } - if (keyspace_pt < 0) + if (keyspace_pt == KEYSPACE_PT_NO_PARTITIONING) { if (epoch->num_partitions > 1) { @@ -340,19 +340,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt return part; } - -int16 * -partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch) +bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt) { - - int16 *end_times_partitions = palloc(sizeof(int16) * epoch->num_partitions); - int i; - - for (i = 0; i < epoch->num_partitions; i++) - { - end_times_partitions[i] = epoch->partitions[i].keyspace_end; - } - - return end_times_partitions; + return keyspace_pt == KEYSPACE_PT_NO_PARTITIONING || (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt); } - diff --git a/src/partitioning.h b/src/partitioning.h index 4eefb88fc..793232304 100644 --- a/src/partitioning.h +++ b/src/partitioning.h @@ -1,6 +1,8 @@ #ifndef IOBEAMDB_PARTITIONING_H #define IOBEAMDB_PARTITIONING_H +#define KEYSPACE_PT_NO_PARTITIONING -1 + #include #include #include @@ -52,5 +54,6 @@ epoch_and_partitions_set *partition_epoch_scan(int32 hypertable_id, int64 timepo int16 partitioning_func_apply(PartitioningFunc *pf, Datum value); Partition *partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt); -int16 * partition_epoch_get_partition_end_times(epoch_and_partitions_set *epoch); + +bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt); #endif /* IOBEAMDB_PARTITIONING_H */ diff --git a/src/pgmurmur3.c b/src/pgmurmur3.c index 6daa4502f..207b3fe1e 100644 --- a/src/pgmurmur3.c +++ b/src/pgmurmur3.c @@ -39,7 +39,6 @@ PG_FUNCTION_INFO_V1(get_partition_for_key); Datum get_partition_for_key(PG_FUNCTION_ARGS) { -/* SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret; */ struct varlena *data; int32 mod; Datum hash_d; @@ -63,7 +62,12 @@ get_partition_for_key(PG_FUNCTION_ARGS) /* - * array_position_least + * array_position_least returns the highest position in the array such that the element + * in the array is <= searched element. If the array is of partition-keyspace-end values + * then this gives a unique bucket for each keyspace value. + * + * Arg 0: sorted array of smallint + * Arg 1: searched_element (smallint) */ PG_FUNCTION_INFO_V1(array_position_least); @@ -90,18 +94,15 @@ array_position_least(PG_FUNCTION_ARGS) elog(ERROR, "only support smallint arrays"); } - /* - * We refuse to search for elements in multi-dimensional arrays, since we - * have no good way to report the element's location in the array. - */ + /* should be a single dimensioned array */ if (ARR_NDIM(array) > 1) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("searching for elements in multidimensional arrays is not supported"))); + errmsg("can only work with single-dimension array"))); if (PG_ARGISNULL(1)) { - elog(ERROR, "does not expect null"); + elog(ERROR, "does not expect null as searched-for element"); } else { @@ -128,16 +129,12 @@ array_position_least(PG_FUNCTION_ARGS) my_extra->element_type = INT2OID; } - /* Examine each array element until we find a match. */ + /* Examine each array element until the element is >= searched_element. */ array_iterator = array_create_iterator(array, 0, my_extra); while (array_iterate(array_iterator, &value, &isnull)) { position++; - /* - * Can't look at the array element's value if it's null; but if we - * search for null, we have a hit and are done. - */ if (isnull) { elog(ERROR, "No element in array should be null"); diff --git a/src/pgmurmur3.h b/src/pgmurmur3.h index 573e42bc3..7667e69cd 100644 --- a/src/pgmurmur3.h +++ b/src/pgmurmur3.h @@ -41,4 +41,8 @@ void hlib_murmur3(const void *data, size_t len, uint64_t *io); /* SQL function */ Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS); + +#define ARRAY_POSITION_LEAST_FN_NAME "array_position_least" +Datum array_position_least(PG_FUNCTION_ARGS); + #endif diff --git a/src/scanner.c b/src/scanner.c index 4f9986aaa..bf1e0f189 100644 --- a/src/scanner.c +++ b/src/scanner.c @@ -20,6 +20,7 @@ typedef union ScanDesc { */ typedef struct InternalScannerCtx { Relation tablerel, indexrel; + TupleInfo tinfo; ScanDesc scan; ScannerCtx *sctx; } InternalScannerCtx; @@ -30,7 +31,7 @@ typedef struct InternalScannerCtx { typedef struct Scanner { Relation (*open)(InternalScannerCtx *ctx); ScanDesc (*beginscan)(InternalScannerCtx *ctx); - bool (*getnext)(InternalScannerCtx *ctx, TupleInfo *ti); + bool (*getnext)(InternalScannerCtx *ctx); void (*endscan)(InternalScannerCtx *ctx); void (*close)(InternalScannerCtx *ctx); } Scanner; @@ -50,10 +51,10 @@ static ScanDesc heap_scanner_beginscan(InternalScannerCtx *ctx) return ctx->scan; } -static bool heap_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) +static bool heap_scanner_getnext(InternalScannerCtx *ctx) { - ti->tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); - return HeapTupleIsValid(ti->tuple); + ctx->tinfo.tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection); + return HeapTupleIsValid(ctx->tinfo.tuple); } static void heap_scanner_endscan(InternalScannerCtx *ctx) @@ -86,12 +87,12 @@ static ScanDesc index_scanner_beginscan(InternalScannerCtx *ctx) return ctx->scan; } -static bool index_scanner_getnext(InternalScannerCtx *ctx, TupleInfo *ti) +static bool index_scanner_getnext(InternalScannerCtx *ctx) { - ti->tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); - ti->ituple = ctx->scan.index_scan->xs_itup; - ti->ituple_desc = ctx->scan.index_scan->xs_itupdesc; - return HeapTupleIsValid(ti->tuple); + ctx->tinfo.tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection); + ctx->tinfo.ituple = ctx->scan.index_scan->xs_itup; + ctx->tinfo.ituple_desc = ctx->scan.index_scan->xs_itupdesc; + return HeapTupleIsValid(ctx->tinfo.tuple); } static void index_scanner_endscan(InternalScannerCtx *ctx) @@ -135,7 +136,6 @@ static Scanner scanners[] = { int scanner_scan(ScannerCtx *ctx) { TupleDesc tuple_desc; - TupleInfo ti; bool is_valid; int num_tuples = 0; Scanner *scanner = &scanners[ctx->scantype]; @@ -148,19 +148,19 @@ int scanner_scan(ScannerCtx *ctx) tuple_desc = RelationGetDescr(ictx.tablerel); - ti.scanrel = ictx.tablerel; - ti.desc = tuple_desc; + ictx.tinfo.scanrel = ictx.tablerel; + ictx.tinfo.desc = tuple_desc; /* Call pre-scan handler, if any. */ if (ctx->prescan != NULL) ctx->prescan(ctx->data); - is_valid = scanner->getnext(&ictx, &ti); + is_valid = scanner->getnext(&ictx); while (is_valid) { - if (ctx->filter == NULL || ctx->filter(&ti, ctx->data)) + if (ctx->filter == NULL || ctx->filter(&ictx.tinfo, ctx->data)) { num_tuples++; @@ -169,7 +169,7 @@ int scanner_scan(ScannerCtx *ctx) Buffer buffer; HeapUpdateFailureData hufd; - ti.lockresult = heap_lock_tuple(ictx.tablerel, ti.tuple, + ictx.tinfo.lockresult = heap_lock_tuple(ictx.tablerel, ictx.tinfo.tuple, GetCurrentCommandId(false), ctx->tuplock.lockmode, ctx->tuplock.waitpolicy, @@ -180,11 +180,11 @@ int scanner_scan(ScannerCtx *ctx) } /* Abort the scan if the handler wants us to */ - if (!ctx->tuple_found(&ti, ctx->data)) + if (!ctx->tuple_found(&ictx.tinfo, ctx->data)) break; } - is_valid = scanner->getnext(&ictx,&ti); + is_valid = scanner->getnext(&ictx); } /* Call post-scan handler, if any. */