Merged in cevian/ordered-insert (pull request #96)

Cevian/ordered insert

Approved-by: ci-vast
Approved-by: enordstr NA
This commit is contained in:
Matvey Arye 2017-03-05 20:24:43 +00:00
commit 0e36e3287f
22 changed files with 852 additions and 341 deletions

View File

@ -9,6 +9,10 @@ SELECT pg_catalog.pg_extension_config_dump('_iobeamdb_cache.cache_inval_chunk',
CREATE OR REPLACE FUNCTION _iobeamdb_cache.invalidate_relcache_trigger()
RETURNS TRIGGER AS '$libdir/iobeamdb', 'invalidate_relcache_trigger' LANGUAGE C;
--for debugging
CREATE OR REPLACE FUNCTION _iobeamdb_cache.invalidate_relcache(proxy_oid OID)
RETURNS BOOLEAN AS '$libdir/iobeamdb', 'invalidate_relcache' LANGUAGE C;
CREATE TRIGGER "0_cache_inval" AFTER INSERT OR UPDATE OR DELETE OR TRUNCATE ON _iobeamdb_catalog.hypertable
FOR EACH STATEMENT EXECUTE PROCEDURE _iobeamdb_cache.invalidate_relcache_trigger('cache_inval_hypertable');

View File

@ -1,2 +1,5 @@
CREATE OR REPLACE FUNCTION _iobeamdb_catalog.get_partition_for_key(text, int) RETURNS smallint
AS '$libdir/iobeamdb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT;
AS '$libdir/iobeamdb', 'get_partition_for_key' LANGUAGE C IMMUTABLE STRICT;
CREATE OR REPLACE FUNCTION _iobeamdb_catalog.array_position_least(smallint[], smallint) RETURNS smallint
AS '$libdir/iobeamdb', 'array_position_least' LANGUAGE C IMMUTABLE STRICT;

View File

@ -1,5 +1,6 @@
#include "cache.h"
void
cache_init(Cache *cache)
{
@ -9,35 +10,59 @@ cache_init(Cache *cache)
return;
}
if (cache->hctl.hcxt == NULL)
{
cache->hctl.hcxt = AllocSetContextCreate(CacheMemoryContext,
cache->name,
ALLOCSET_DEFAULT_SIZES);
}
cache->htab = hash_create(cache->name, cache->numelements,
&cache->hctl, cache->flags);
cache->refcount = 1;
}
void
cache_invalidate(Cache *cache)
{
if (cache->htab == NULL)
static void
cache_destroy(Cache *cache) {
if (cache->refcount > 0) {
/* will be destroyed later */
return;
}
if (cache->pre_invalidate_hook != NULL)
cache->pre_invalidate_hook(cache);
if (cache->pre_destroy_hook != NULL)
cache->pre_destroy_hook(cache);
hash_destroy(cache->htab);
cache->htab = NULL;
MemoryContextDelete(cache->hctl.hcxt);
cache->hctl.hcxt = NULL;
if (cache->post_invalidate_hook != NULL)
cache->post_invalidate_hook(cache);
}
void
cache_invalidate(Cache *cache)
{
if (cache == NULL)
return;
cache->refcount--;
cache_destroy(cache);
}
/*
* Pinning is needed if any items returned by the cache
* may need to survive invalidation events (i.e. AcceptInvalidationMessages() may be called).
*
* Invalidation messages may be processed on any internal function that takes a lock (e.g. heap_open).
*
* Each call to cache_pin MUST BE paired with a call to cache_release.
*
*/
extern Cache *cache_pin(Cache *cache)
{
cache->refcount++;
return cache;
}
extern void cache_release(Cache *cache)
{
Assert(cache->refcount > 0);
cache->refcount--;
cache_destroy(cache);
}
MemoryContext
cache_memory_ctx(Cache *cache)
{

View File

@ -14,15 +14,15 @@ typedef struct CacheQueryCtx
typedef struct Cache
{
HASHCTL hctl;
HTAB *htab;
const char *name;
HTAB *htab;
int refcount;
const char *name;
long numelements;
int flags;
void *(*get_key) (struct CacheQueryCtx *);
void *(*create_entry) (struct Cache *, CacheQueryCtx *);
void *(*update_entry) (struct Cache *, CacheQueryCtx *);
void (*pre_invalidate_hook) (struct Cache *);
void (*post_invalidate_hook) (struct Cache *);
void (*pre_destroy_hook) (struct Cache *);
} Cache;
extern void cache_init(Cache *cache);
@ -32,4 +32,7 @@ extern void *cache_fetch(Cache *cache, CacheQueryCtx *ctx);
extern MemoryContext cache_memory_ctx(Cache *cache);
extern MemoryContext cache_switch_to_memory_context(Cache *cache);
extern Cache *cache_pin(Cache *cache);
extern void cache_release(Cache *cache);
#endif /* _IOBEAMDB_CACHE_H_ */

View File

@ -38,6 +38,7 @@ void _cache_invalidate_extload(void);
Datum invalidate_relcache_trigger(PG_FUNCTION_ARGS);
PG_FUNCTION_INFO_V1(invalidate_relcache_trigger);
PG_FUNCTION_INFO_V1(invalidate_relcache);
static Oid hypertable_cache_inval_proxy_oid = InvalidOid;
@ -56,10 +57,10 @@ inval_cache_callback(Datum arg, Oid relid)
return;
if (relid == InvalidOid || relid == hypertable_cache_inval_proxy_oid)
invalidate_hypertable_cache_callback();
hypertable_cache_invalidate_callback();
if (relid == InvalidOid || relid == chunk_cache_inval_proxy_oid)
invalidate_chunk_cache_callback();
chunk_crn_set_cache_invalidate_callback();
}
/*
@ -91,11 +92,28 @@ invalidate_relcache_trigger(PG_FUNCTION_ARGS)
return PointerGetDatum(trigdata->tg_trigtuple);
}
/*
* This is similar to invalidate_relcache_trigger but not a trigger.
* Not used regularly but useful for debugging.
*
*/
Datum
invalidate_relcache(PG_FUNCTION_ARGS)
{
Oid proxy_oid = PG_GETARG_OID(0);
/* arg 0 = relid of the cache_inval_proxy table */
CacheInvalidateRelcacheByRelid(proxy_oid);
/* tuple to return to executor */
return BoolGetDatum(true);
}
void
_cache_invalidate_init(void)
{
CacheRegisterRelcacheCallback(inval_cache_callback, PointerGetDatum(NULL));
}
void
@ -110,6 +128,7 @@ _cache_invalidate_fini(void)
void
_cache_invalidate_extload(void)
{
CacheRegisterRelcacheCallback(inval_cache_callback, PointerGetDatum(NULL));
hypertable_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID);
chunk_cache_inval_proxy_oid = get_relname_relid(HYPERTABLE_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID);
}

View File

@ -17,7 +17,7 @@
/*
* Chunk Insert Plan Cache:
*
* Hashtable of chunk_id => chunk_insert_plan_htable_entry.
* Hashtable of chunk_id => chunk_crn_set_htable_entry.
*
* This cache stores plans for the execution of the command for moving stuff
* from the copy table to the tables associated with the chunk.
@ -29,126 +29,126 @@
* each insert anyway...
*
*/
typedef struct chunk_insert_plan_htable_entry
typedef struct chunk_crn_set_htable_entry
{
int32 chunk_id;
int64 start_time;
int64 end_time;
SPIPlanPtr move_from_copyt_plan;
} chunk_insert_plan_htable_entry;
int32 chunk_id;
int64 start_time;
int64 end_time;
crn_set *crns;
} chunk_crn_set_htable_entry;
typedef struct ChunkCacheQueryCtx
{
CacheQueryCtx cctx;
hypertable_cache_entry *hci;
epoch_and_partitions_set *pe_entry;
Partition *part;
int32 chunk_id;
int64 chunk_start_time;
int64 chunk_end_time;
} ChunkCacheQueryCtx;
static void *
chunk_insert_plan_cache_get_key(CacheQueryCtx *ctx)
chunk_crn_set_cache_get_key(CacheQueryCtx *ctx)
{
return &((ChunkCacheQueryCtx *) ctx)->chunk_id;
}
static void *chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx);
static void *chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx);
static void *chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx);
static void *chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx);
static void chunk_insert_plan_cache_pre_invalidate(Cache *cache);
static char *get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx);
static Cache *chunk_crn_set_cache_create() {
MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext,
CHUNK_CACHE_INVAL_PROXY_TABLE,
ALLOCSET_DEFAULT_SIZES);
static Cache chunk_insert_plan_cache = {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(chunk_insert_plan_htable_entry),
.hcxt = NULL,
},
.htab = NULL,
.name = CHUNK_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = chunk_insert_plan_cache_get_key,
.create_entry = chunk_insert_plan_cache_create_entry,
.update_entry = chunk_insert_plan_cache_update_entry,
.pre_invalidate_hook = chunk_insert_plan_cache_pre_invalidate,
.post_invalidate_hook = cache_init,
};
Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache));
*cache = (Cache) {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(chunk_crn_set_htable_entry),
.hcxt = ctx,
},
.name = CHUNK_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = chunk_crn_set_cache_get_key,
.create_entry = chunk_crn_set_cache_create_entry,
.update_entry = chunk_crn_set_cache_update_entry,
};
static void
chunk_insert_plan_cache_pre_invalidate(Cache *cache)
{
chunk_insert_plan_htable_entry *entry;
HASH_SEQ_STATUS scan;
cache_init(cache);
hash_seq_init(&scan, cache->htab);
while ((entry = hash_seq_search(&scan)))
{
SPI_freeplan(entry->move_from_copyt_plan);
}
return cache;
}
static Cache *chunk_crn_set_cache_current = NULL;
static void *
chunk_insert_plan_cache_create_entry(Cache *cache, CacheQueryCtx *ctx)
chunk_crn_set_cache_create_entry(Cache *cache, CacheQueryCtx *ctx)
{
ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx;
chunk_insert_plan_htable_entry *pe = ctx->entry;
char *insert_sql;
chunk_crn_set_htable_entry *pe = ctx->entry;
MemoryContext old;
insert_sql = get_copy_table_insert_sql(cctx);
pe->chunk_id = cctx->chunk_id;
pe->start_time = cctx->chunk_start_time;
pe->end_time = cctx->chunk_end_time;
pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL);
/* TODO: old crn leaks memory here... */
old = cache_switch_to_memory_context(cache);
pe->crns = fetch_crn_set(NULL, pe->chunk_id);
MemoryContextSwitchTo(old);
return pe;
}
static void *
chunk_insert_plan_cache_update_entry(Cache *cache, CacheQueryCtx *ctx)
chunk_crn_set_cache_update_entry(Cache *cache, CacheQueryCtx *ctx)
{
ChunkCacheQueryCtx *cctx = (ChunkCacheQueryCtx *) ctx;
chunk_insert_plan_htable_entry *pe = ctx->entry;
char *insert_sql;
chunk_crn_set_htable_entry *pe = ctx->entry;
MemoryContext old;
if (pe->start_time == cctx->chunk_start_time &&
pe->end_time == cctx->chunk_end_time)
return pe;
insert_sql = get_copy_table_insert_sql(cctx);
SPI_freeplan(pe->move_from_copyt_plan);
pe->move_from_copyt_plan = prepare_plan(insert_sql, 0, NULL);
old = cache_switch_to_memory_context(cache);
pe->crns = fetch_crn_set(NULL, pe->chunk_id);
MemoryContextSwitchTo(old);
return pe;
}
void
invalidate_chunk_cache_callback(void)
chunk_crn_set_cache_invalidate_callback(void)
{
CACHE1_elog(WARNING, "DESTROY chunk_insert plan cache");
cache_invalidate(&chunk_insert_plan_cache);
CACHE1_elog(WARNING, "DESTROY chunk_crn_set_cache");
cache_invalidate(chunk_crn_set_cache_current);
chunk_crn_set_cache_current = chunk_crn_set_cache_create();
}
static chunk_insert_plan_htable_entry *
get_chunk_insert_plan_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int32 chunk_id, int64 chunk_start_time,
int64 chunk_end_time)
static chunk_crn_set_htable_entry *
chunk_crn_set_cache_get_entry(Cache *cache, int32 chunk_id, int64 chunk_start_time, int64 chunk_end_time)
{
if (cache == NULL)
{
cache = chunk_crn_set_cache_current;
}
ChunkCacheQueryCtx ctx = {
.hci = hci,
.pe_entry = pe_entry,
.part = part,
.chunk_id = chunk_id,
.chunk_start_time = chunk_start_time,
.chunk_end_time = chunk_end_time,
};
return cache_fetch(&chunk_insert_plan_cache, &ctx.cctx);
return cache_fetch(cache, &ctx.cctx);
}
extern Cache *
chunk_crn_set_cache_pin()
{
return cache_pin(chunk_crn_set_cache_current);
}
static chunk_row *
chunk_row_create(int32 id, int32 partition_id, int64 starttime, int64 endtime)
{
@ -252,11 +252,14 @@ chunk_scan(int32 partition_id, int64 timepoint, bool tuplock)
return cctx.chunk;
}
/*
* Get chunk cache entry.
* The cache parameter is a chunk_crn_set_cache (can be null to use current cache).
*/
chunk_cache_entry *
get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int64 timepoint, bool lock)
get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock)
{
chunk_insert_plan_htable_entry *move_plan;
chunk_crn_set_htable_entry *chunk_crn_cache;
chunk_cache_entry *entry;
chunk_row *chunk;
@ -270,87 +273,21 @@ get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_
entry = palloc(sizeof(chunk_cache_entry));
entry->chunk = chunk;
entry->id = chunk->id;
move_plan = get_chunk_insert_plan_cache_entry(hci, pe_entry, part, chunk->id,
chunk->start_time, chunk->end_time);
entry->move_from_copyt_plan = move_plan->move_from_copyt_plan;
chunk_crn_cache = chunk_crn_set_cache_get_entry(cache, chunk->id,
chunk->start_time, chunk->end_time);
entry->crns = chunk_crn_cache->crns;
return entry;
}
static char *
get_copy_table_insert_sql(ChunkCacheQueryCtx *ctx)
{
StringInfo where_clause = makeStringInfo();
StringInfo insert_clauses = makeStringInfo();
StringInfo sql_insert = makeStringInfo();
ListCell *cell;
int i;
crn_set *crn = fetch_crn_set(NULL, ctx->chunk_id);
appendStringInfo(where_clause, "WHERE TRUE");
if (ctx->pe_entry->num_partitions > 1)
{
appendStringInfo(where_clause, " AND (%s.%s(%s::TEXT, %d) BETWEEN %d AND %d)",
quote_identifier(ctx->pe_entry->partitioning->partfunc.schema),
quote_identifier(ctx->pe_entry->partitioning->partfunc.name),
quote_identifier(ctx->pe_entry->partitioning->column),
ctx->pe_entry->partitioning->partfunc.modulos,
ctx->part->keyspace_start,
ctx->part->keyspace_end);
}
if (ctx->chunk_start_time != OPEN_START_TIME)
{
appendStringInfo(where_clause, " AND (%1$s >= %2$s) ",
quote_identifier(ctx->hci->time_column_name),
internal_time_to_column_literal_sql(ctx->chunk_start_time,
ctx->hci->time_column_type));
}
if (ctx->chunk_end_time != OPEN_END_TIME)
{
appendStringInfo(where_clause, " AND (%1$s <= %2$s) ",
quote_identifier(ctx->hci->time_column_name),
internal_time_to_column_literal_sql(ctx->chunk_end_time,
ctx->hci->time_column_type));
}
i = 0;
foreach(cell, crn->tables)
{
crn_row *tab = lfirst(cell);
i = i + 1;
appendStringInfo(insert_clauses, "i_%d AS (INSERT INTO %s.%s SELECT * FROM selected)",
i,
quote_identifier(tab->schema_name.data),
quote_identifier(tab->table_name.data)
);
}
pfree(crn);
crn = NULL;
appendStringInfo(sql_insert, "\
WITH selected AS ( DELETE FROM ONLY %1$s %2$s RETURNING * ), \
%3$s \
SELECT 1", copy_table_name(ctx->hci->id),
where_clause->data,
insert_clauses->data);
return sql_insert->data;
}
void
_chunk_cache_init(void)
{
CreateCacheMemoryContext();
cache_init(&chunk_insert_plan_cache);
chunk_crn_set_cache_current = chunk_crn_set_cache_create();
}
void
_chunk_cache_fini(void)
{
chunk_insert_plan_cache.post_invalidate_hook = NULL;
cache_invalidate(&chunk_insert_plan_cache);
cache_invalidate(chunk_crn_set_cache_current);
}

View File

@ -4,6 +4,9 @@
#include <postgres.h>
#include <executor/spi.h>
#include "metadata_queries.h"
#include "cache.h"
#define CHUNK_CACHE_INVAL_PROXY_TABLE "cache_inval_chunk"
#define CHUNK_CACHE_INVAL_PROXY_OID \
get_relname_relid(CHUNK_CACHE_INVAL_PROXY_TABLE, CACHE_INVAL_PROXY_SCHEMA_OID)
@ -17,14 +20,15 @@ typedef struct chunk_cache_entry
{
int32 id;
chunk_row *chunk;
SPIPlanPtr move_from_copyt_plan;
crn_set *crns;
} chunk_cache_entry;
extern chunk_cache_entry *get_chunk_cache_entry(hypertable_cache_entry *hci, epoch_and_partitions_set *pe_entry,
Partition *part, int64 time_pt, bool lock);
extern void invalidate_chunk_cache_callback(void);
extern chunk_cache_entry *get_chunk_cache_entry(Cache *cache, Partition *part, int64 timepoint, bool lock);
extern void chunk_crn_set_cache_invalidate_callback(void);
extern Cache *chunk_crn_set_cache_pin(void);
extern void _chunk_cache_init(void);
extern void _chunk_cache_fini(void);

View File

@ -28,21 +28,33 @@ hypertable_cache_get_key(CacheQueryCtx *ctx)
return &((HypertableCacheQueryCtx *) ctx)->hypertable_id;
}
static Cache hypertable_cache = {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(hypertable_cache_entry),
.hcxt = NULL,
},
.htab = NULL,
.name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = hypertable_cache_get_key,
.create_entry = hypertable_cache_create_entry,
.post_invalidate_hook = cache_init,
};
static Cache *hypertable_cache_create() {
MemoryContext ctx = AllocSetContextCreate(CacheMemoryContext,
HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
ALLOCSET_DEFAULT_SIZES);
Cache *cache = MemoryContextAlloc(ctx, sizeof(Cache));
*cache = (Cache) {
.hctl = {
.keysize = sizeof(int32),
.entrysize = sizeof(hypertable_cache_entry),
.hcxt = ctx,
},
.name = HYPERTABLE_CACHE_INVAL_PROXY_TABLE,
.numelements = 16,
.flags = HASH_ELEM | HASH_CONTEXT | HASH_BLOBS,
.get_key = hypertable_cache_get_key,
.create_entry = hypertable_cache_create_entry,
};
cache_init(cache);
return cache;
}
static Cache *hypertable_cache_current = NULL;
/* Column numbers for 'hypertable' table in sql/common/tables.sql */
#define HT_TBL_COL_ID 1
#define HT_TBL_COL_TIME_COL_NAME 10
@ -103,22 +115,23 @@ hypertable_cache_create_entry(Cache *cache, CacheQueryCtx *ctx)
}
void
invalidate_hypertable_cache_callback(void)
hypertable_cache_invalidate_callback(void)
{
CACHE1_elog(WARNING, "DESTROY hypertable_cache");
cache_invalidate(&hypertable_cache);
cache_invalidate(hypertable_cache_current);
hypertable_cache_current = hypertable_cache_create();
}
/* Get hypertable cache entry. If the entry is not in the cache, add it. */
hypertable_cache_entry *
hypertable_cache_get(int32 hypertable_id)
hypertable_cache_get_entry(Cache *cache, int32 hypertable_id)
{
HypertableCacheQueryCtx ctx = {
.hypertable_id = hypertable_id,
};
return cache_fetch(&hypertable_cache, &ctx.cctx);
return cache_fetch(cache, &ctx.cctx);
}
/* function to compare epochs */
@ -142,7 +155,7 @@ cmp_epochs(const void *time_pt_pointer, const void *test)
}
epoch_and_partitions_set *
hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid)
hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid)
{
MemoryContext old;
epoch_and_partitions_set *epoch,
@ -168,7 +181,7 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt,
return (*cache_entry);
}
old = cache_switch_to_memory_context(&hypertable_cache);
old = cache_switch_to_memory_context(cache);
epoch = partition_epoch_scan(hce->id, time_pt, relid);
/* check if full */
@ -193,15 +206,21 @@ hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt,
return epoch;
}
extern Cache *
hypertable_cache_pin()
{
return cache_pin(hypertable_cache_current);
}
void _hypertable_cache_init(void)
{
CreateCacheMemoryContext();
cache_init(&hypertable_cache);
hypertable_cache_current = hypertable_cache_create();
}
void
_hypertable_cache_fini(void)
{
hypertable_cache.post_invalidate_hook = NULL;
cache_invalidate(&hypertable_cache);
cache_invalidate(hypertable_cache_current);
}

View File

@ -2,6 +2,7 @@
#define IOBEAMDB_HYPERTABLE_CACHE_H
#include <postgres.h>
#include "cache.h"
typedef struct hypertable_basic_info hypertable_basic_info;
typedef struct epoch_and_partitions_set epoch_and_partitions_set;
@ -23,12 +24,14 @@ typedef struct hypertable_cache_entry
epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY];
} hypertable_cache_entry;
hypertable_cache_entry *hypertable_cache_get(int32 hypertable_id);
hypertable_cache_entry *hypertable_cache_get_entry(Cache * cache, int32 hypertable_id);
epoch_and_partitions_set *
hypertable_cache_get_partition_epoch(hypertable_cache_entry *hce, int64 time_pt, Oid relid);
hypertable_cache_get_partition_epoch(Cache *cache, hypertable_cache_entry *hce, int64 time_pt, Oid relid);
void invalidate_hypertable_cache_callback(void);
void hypertable_cache_invalidate_callback(void);
extern Cache *hypertable_cache_pin(void);
void _hypertable_cache_init(void);
void _hypertable_cache_fini(void);

View File

@ -1,6 +1,8 @@
#include "postgres.h"
#include "funcapi.h"
#include "access/htup_details.h"
#include "access/relscan.h"
#include "commands/defrem.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
@ -25,9 +27,11 @@
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "deps/dblink.h"
#include "utils/tqual.h"
#include "parser/parse_utilcmd.h"
#include "parser/parser.h"
#include "access/xact.h"
#include "access/htup_details.h"
#include "parser/parse_oper.h"
#include "parser/parse_func.h"
@ -41,12 +45,22 @@
#include "utils.h"
#include "metadata_queries.h"
#include "partitioning.h"
#include "scanner.h"
#include "catalog.h"
#include "pgmurmur3.h"
#include <utils/tqual.h>
#include <utils/rls.h>
#include <miscadmin.h>
#include <access/heapam.h>
#define INSERT_TRIGGER_COPY_TABLE_FN "insert_trigger_on_copy_table_c"
#define INSERT_TRIGGER_COPY_TABLE_NAME "insert_trigger"
/* private funcs */
static int tuple_fnumber(TupleDesc tupdesc, const char *fname);
static ObjectAddress create_insert_index(int32 hypertable_id, char * time_field, PartitioningInfo *part_info,epoch_and_partitions_set *epoch);
static Node *get_keyspace_fn_call(PartitioningInfo *part_info);
/*
* Inserts rows from the temporary copy table into correct hypertable child tables.
@ -69,8 +83,310 @@ get_close_if_needed_fn()
return single;
}
PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c);
/*
*
* Helper functions for inserting tuples into chunk tables
*
* We insert one chunk at a time and hold a context while we insert
* a particular chunk;
*
* */
typedef struct ChunkInsertCtxRel
{
Relation rel;
TupleTableSlot *slot;
EState *estate;
ResultRelInfo *resultRelInfo;
BulkInsertState bistate;
} ChunkInsertCtxRel;
static ChunkInsertCtxRel*
chunk_insert_ctx_rel_new(Relation rel, ResultRelInfo *resultRelInfo, List *range_table) {
TupleDesc tupDesc;
ChunkInsertCtxRel *rel_ctx = palloc(sizeof(ChunkInsertCtxRel));
rel_ctx->estate = CreateExecutorState();
tupDesc = RelationGetDescr(rel);
rel_ctx->estate->es_result_relations = resultRelInfo;
rel_ctx->estate->es_num_result_relations = 1;
rel_ctx->estate->es_result_relation_info = resultRelInfo;
rel_ctx->estate->es_range_table = range_table;
rel_ctx->slot = ExecInitExtraTupleSlot(rel_ctx->estate);
ExecSetSlotDescriptor(rel_ctx->slot, tupDesc);
rel_ctx->rel = rel;
rel_ctx->resultRelInfo = resultRelInfo;
rel_ctx->bistate = GetBulkInsertState();
return rel_ctx;
}
static void
chunk_insert_ctx_rel_destroy(ChunkInsertCtxRel *rel_ctx)
{
FreeBulkInsertState(rel_ctx->bistate);
ExecCloseIndices(rel_ctx->resultRelInfo);
ExecResetTupleTable(rel_ctx->estate->es_tupleTable, false);
FreeExecutorState(rel_ctx->estate);
heap_close(rel_ctx->rel, NoLock);
}
static void
chunk_insert_ctx_rel_insert_tuple(ChunkInsertCtxRel *rel_ctx, HeapTuple tuple)
{
int hi_options = 0; /* no optimization */
CommandId mycid = GetCurrentCommandId(true);
/*
* Constraints might reference the tableoid column, so initialize
* t_tableOid before evaluating them.
*/
tuple->t_tableOid = RelationGetRelid(rel_ctx->rel);
ExecStoreTuple(tuple, rel_ctx->slot, InvalidBuffer, false);
if (rel_ctx->rel->rd_att->constr)
ExecConstraints(rel_ctx->resultRelInfo, rel_ctx->slot, rel_ctx->estate);
/* OK, store the tuple and create index entries for it */
heap_insert(rel_ctx->rel, tuple, mycid, hi_options, rel_ctx->bistate);
if (rel_ctx->resultRelInfo->ri_NumIndices > 0)
ExecInsertIndexTuples(rel_ctx->slot, &(tuple->t_self),
rel_ctx->estate, false, NULL,
NIL);
}
typedef struct ChunkInsertCtx
{
chunk_cache_entry *chunk;
Cache *pinned;
List *ctxs;
} ChunkInsertCtx;
static ChunkInsertCtx *
chunk_insert_ctx_new(chunk_cache_entry *chunk, Cache *pinned)
{
ListCell *lc;
List *rel_ctx_list = NIL;
ChunkInsertCtx *ctx;
ctx = palloc(sizeof(ChunkInsertCtx));
ctx->pinned = pinned;
foreach(lc, chunk->crns->tables)
{
crn_row *cr = lfirst(lc);
RangeVar *rv = makeRangeVarFromNameList(list_make2(makeString(cr->schema_name.data), makeString(cr->table_name.data)));
Relation rel;
RangeTblEntry *rte;
List *range_table;
ResultRelInfo *resultRelInfo;
ChunkInsertCtxRel *rel_ctx;;
rel = heap_openrv(rv, RowExclusiveLock);
/* permission check */
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = RelationGetRelid(rel);
rte->relkind = rel->rd_rel->relkind;
rte->requiredPerms = ACL_INSERT;
range_table = list_make1(rte);
ExecCheckRTPerms(range_table, true);
if (check_enable_rls(rte->relid, InvalidOid, false) == RLS_ENABLED)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Hypertables don't support Row level security")));
}
if (XactReadOnly && !rel->rd_islocaltemp)
PreventCommandIfReadOnly("COPY FROM");
PreventCommandIfParallelMode("COPY FROM");
if (rel->rd_rel->relkind != RELKIND_RELATION)
{
elog(ERROR, "inserting not to table");
}
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of
* code here that basically duplicated execUtils.c ...)
*/
resultRelInfo = makeNode(ResultRelInfo);
InitResultRelInfo(resultRelInfo,
rel,
1, /* dummy rangetable index */
0);
ExecOpenIndices(resultRelInfo, false);
if (resultRelInfo->ri_TrigDesc != NULL)
{
elog(ERROR, "triggers on chunk tables not supported");
}
rel_ctx = chunk_insert_ctx_rel_new(rel, resultRelInfo, range_table);
rel_ctx_list = lappend(rel_ctx_list, rel_ctx);
}
ctx->ctxs = rel_ctx_list;
ctx->chunk = chunk;
return ctx;
}
static void
chunk_insert_ctx_destroy(ChunkInsertCtx *ctx)
{
ListCell *lc;
if (ctx == NULL)
{
return;
}
cache_release(ctx->pinned);
foreach(lc, ctx->ctxs)
{
ChunkInsertCtxRel *rel_ctx = lfirst(lc);
chunk_insert_ctx_rel_destroy(rel_ctx);
}
}
static void
chunk_insert_ctx_insert_tuple(ChunkInsertCtx *ctx, HeapTuple tup)
{
ListCell *lc;
foreach(lc, ctx->ctxs)
{
ChunkInsertCtxRel *rel_ctx = lfirst(lc);
chunk_insert_ctx_rel_insert_tuple(rel_ctx, tup);
}
}
typedef struct CopyTableQueryCtx {
Partition *part;
ChunkInsertCtx *chunk_ctx;
epoch_and_partitions_set *pe;
hypertable_cache_entry *hci;
} CopyTableQueryCtx;
static bool
copy_table_tuple_found(TupleInfo *ti, void *data)
{
bool is_null;
CopyTableQueryCtx *ctx = data;
int16 keyspace_pt;
int64 time_pt;
if (ctx->pe->num_partitions > 1)
{
/* first element is partition index (used for sorting but not necessary here) */
Datum time_datum = index_getattr(ti->ituple, 2, ti->ituple_desc, &is_null);
Datum keyspace_datum = index_getattr(ti->ituple, 3, ti->ituple_desc, &is_null);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = DatumGetInt16(keyspace_datum);
}
else
{
Datum time_datum = index_getattr(ti->ituple, 1, ti->ituple_desc, &is_null);
time_pt = time_value_to_internal(time_datum, ctx->hci->time_column_type);
keyspace_pt = KEYSPACE_PT_NO_PARTITIONING;
}
if (ctx->chunk_ctx != NULL && !chunk_row_timepoint_is_member(ctx->chunk_ctx->chunk->chunk, time_pt))
{
/* moving on to next chunk; */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
ctx->chunk_ctx = NULL;
}
if (ctx->part != NULL && !partition_keyspace_pt_is_member(ctx->part, keyspace_pt))
{
/* moving on to next ctx->partition. */
chunk_insert_ctx_destroy(ctx->chunk_ctx);
ctx->chunk_ctx = NULL;
ctx->part = NULL;
}
if (ctx->part == NULL)
{
ctx->part = partition_epoch_get_partition(ctx->pe, keyspace_pt);
}
if (ctx->chunk_ctx == NULL)
{
Datum was_closed_datum;
chunk_cache_entry *chunk;
Cache *pinned = chunk_crn_set_cache_pin();
/*
* TODO: this first call should be non-locking and use a cache(for
* performance)
*/
chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, false);
was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id));
/* chunk may have been closed and thus changed /or/ need to get share lock */
chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, true);
ctx->chunk_ctx = chunk_insert_ctx_new(chunk, pinned);
}
/* insert here: */
/* has to be a copy(not sure why) */
chunk_insert_ctx_insert_tuple(ctx->chunk_ctx,heap_copytuple(ti->tuple));
return true;
}
static void scan_copy_table_and_insert_post(int num_tuples, void *data)
{
CopyTableQueryCtx *ctx = data;
if (ctx->chunk_ctx != NULL)
chunk_insert_ctx_destroy(ctx->chunk_ctx);
}
static void scan_copy_table_and_insert( hypertable_cache_entry *hci,
epoch_and_partitions_set *pe,
Oid table, Oid index)
{
CopyTableQueryCtx query_ctx = {
.pe = pe,
.hci = hci,
};
ScannerCtx scanCtx = {
.table = table,
.index = index,
.scantype = ScannerTypeIndex,
.want_itup = true,
.nkeys = 0,
.scankey = NULL,
.data = &query_ctx,
.tuple_found = copy_table_tuple_found,
.postscan = scan_copy_table_and_insert_post,
.lockmode = AccessShareLock,
.scandirection = ForwardScanDirection,
};
/* Perform an index scan on primary key. */
scanner_scan(&scanCtx);
}
PG_FUNCTION_INFO_V1(insert_trigger_on_copy_table_c);
Datum
insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
{
@ -78,17 +394,15 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
/* arg 0 = hypertable id */
char *hypertable_id_arg = trigdata->tg_trigger->tgargs[0];
int32 hypertable_id = atoi(hypertable_id_arg);
HeapTuple firstrow;
hypertable_cache_entry *hci;
int time_fnum,
i,
num_chunks;
bool isnull;
List *chunk_id_list = NIL;
ListCell *chunk_id_cell;
int *chunk_id_array;
epoch_and_partitions_set *pe;
Cache *hypertable_cache;
ObjectAddress idx;
DropStmt *drop = makeNode(DropStmt);
/*
* --This guard protects against calling insert_data() twice in the same
@ -101,9 +415,6 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
* two different hypertables.
*/
char *insert_guard = GetConfigOptionByName("io.insert_data_guard", NULL, true);
HeapScanDesc scan;
ScanKeyData scankey[1];
int nkeys = 0;
if (insert_guard != NULL && strcmp(insert_guard, "on") == 0)
{
@ -119,98 +430,36 @@ insert_trigger_on_copy_table_c(PG_FUNCTION_ARGS)
* get the hypertable cache; use the time column name to figure out the
* column fnum for time field
*/
hci = hypertable_cache_get(atoi(hypertable_id_arg));
time_fnum = tuple_fnumber(trigdata->tg_relation->rd_att, hci->time_column_name);
hypertable_cache = hypertable_cache_pin();
scan = heap_beginscan(trigdata->tg_relation, SnapshotSelf, nkeys, scankey);
hci = hypertable_cache_get_entry(hypertable_cache, hypertable_id);
/* get one row in a loop until the copy table is empty. */
while ((firstrow = heap_getnext(scan, ForwardScanDirection)))
{
Datum time_datum;
int64 time_internal;
epoch_and_partitions_set *pe_entry;
Partition *part = NULL;
chunk_cache_entry *entry;
int ret;
/* TODO: hack assumes single pe. */
pe = hypertable_cache_get_partition_epoch(hypertable_cache, hci, 0, trigdata->tg_relation->rd_id);
time_datum = heap_getattr(firstrow, time_fnum, trigdata->tg_relation->rd_att, &isnull);
/*
* create an index that colocates row from the same chunk together and
* guarantees an order on chunk access as well
*/
idx = create_insert_index(hypertable_id, hci->time_column_name, pe->partitioning, pe);
if (isnull)
{
elog(ERROR, "Time column is null");
}
time_internal = time_value_to_internal(time_datum, hci->time_column_type);
scan_copy_table_and_insert(hci, pe, trigdata->tg_relation->rd_id, idx.objectId);
pe_entry = hypertable_cache_get_partition_epoch(hci, time_internal,
trigdata->tg_relation->rd_id);
cache_release(hypertable_cache);
if (pe_entry->partitioning != NULL && pe_entry->num_partitions > 1)
{
PartitioningInfo *pi = pe_entry->partitioning;
Datum part_value = heap_getattr(firstrow, pi->column_attnum,
trigdata->tg_relation->rd_att, &isnull);
int16 keyspace_pt = partitioning_func_apply(&pi->partfunc, part_value);
drop->removeType = OBJECT_INDEX;
drop->missing_ok = FALSE;
drop->objects = list_make1(list_make1(makeString("copy_insert")));
drop->arguments = NIL;
drop->behavior = DROP_RESTRICT;
drop->concurrent = false;
/* get the partition using the keyspace value of the row. */
part = partition_epoch_get_partition(pe_entry, keyspace_pt);
}
else
{
/* Not Partitioning: get the first and only partition */
part = partition_epoch_get_partition(pe_entry, -1);
}
entry = get_chunk_cache_entry(hci, pe_entry, part, time_internal, true);
if (entry->chunk->end_time == OPEN_END_TIME)
{
chunk_id_list = lappend_int(chunk_id_list, entry->id);
}
if (SPI_connect() < 0)
{
elog(ERROR, "Got an SPI connect error");
}
ret = SPI_execute_plan(entry->move_from_copyt_plan, NULL, NULL, false, 1);
if (ret <= 0)
{
elog(ERROR, "Got an SPI error %d", ret);
}
SPI_finish();
}
heap_endscan(scan);
/* build chunk id array */
num_chunks = list_length(chunk_id_list);
chunk_id_array = palloc(sizeof(int) * num_chunks);
i = 0;
foreach(chunk_id_cell, chunk_id_list)
{
int chunk_id = lfirst_int(chunk_id_cell);
chunk_id_array[i++] = chunk_id;
}
/* sort by chunk_id to avoid deadlocks */
qsort(chunk_id_array, num_chunks, sizeof(int), int_cmp);
/* close chunks */
for (i = 0; i < num_chunks; i++)
{
/* TODO: running this on every insert is really expensive */
/* Keep some kind of cache of result an run this heuristically. */
int32 chunk_id = chunk_id_array[i];
FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk_id));
}
RemoveRelations(drop);
return PointerGetDatum(NULL);
}
/* Creates a temp table for INSERT and COPY commands. This table
* stores the data until it is distributed to the appropriate chunks.
* The copy table uses ON COMMIT DELETE ROWS and inherits from the root table.
@ -287,18 +536,141 @@ create_copy_table(int32 hypertable_id, Oid root_oid)
return copyTableRelationAddr.objectId;
}
static IndexElem *
makeIndexElem(char *name, Node *expr){
Assert((name ==NULL || expr == NULL) && (name !=NULL || expr !=NULL));
static int
tuple_fnumber(TupleDesc tupdesc, const char *fname)
{
int res;
for (res = 0; res < tupdesc->natts; res++)
{
if (namestrcmp(&tupdesc->attrs[res]->attname, fname) == 0)
return res + 1;
}
elog(ERROR, "field not found: %s", fname);
IndexElem *time_elem = makeNode(IndexElem);
time_elem->name = name;
time_elem->expr = expr;
time_elem->indexcolname = NULL;
time_elem->collation = NIL;
time_elem->opclass = NIL;
time_elem->ordering = SORTBY_DEFAULT;
time_elem->nulls_ordering = SORTBY_NULLS_DEFAULT;
return time_elem;
}
/* creates index for inserting in set chunk order.
*
* The index is the following:
* If there is a partitioning_func:
* partition_no, time, keyspace_value
* If there is no partitioning_func:
* time
*
* Partition_num is simply a unique number identifying the partition for the epoch the row belongs to.
* It is obtained by getting the maximal index in the end_time_partitions array such that the keyspace value
* is less than or equal to the value in the array.
*
* Keyspace_value without partition_num is not sufficient because:
* consider the partitions with keyspaces 0-5,6-10, and time partitions 100-200,201-300
* Then consider the following input:
* row 1: keyspace=0, time=100
* row 2: keyspace=2, time=250
* row 3: keyspace=4, time=100
* row 1 and 3 should be in the same chunk but they are now not together in the order (row 2 is between them).
*
* keyspace_value should probably be moved out of the index.
*
* */
static ObjectAddress
create_insert_index(int32 hypertable_id, char *time_field, PartitioningInfo *part_info, epoch_and_partitions_set *epoch)
{
IndexStmt *index_stmt = makeNode(IndexStmt);
IndexElem *time_elem;
Oid relid;
List *indexElem = NIL;
int i;
time_elem = makeIndexElem(time_field, NULL);
if (part_info != NULL)
{
IndexElem *partition_elem;
IndexElem *keyspace_elem;
List *array_pos_func_name = list_make2(makeString(CATALOG_SCHEMA_NAME), makeString(ARRAY_POSITION_LEAST_FN_NAME));
List *array_pos_args;
List *array_list = NIL;
A_ArrayExpr *array_expr;
FuncCall *array_pos_fc;
for (i = 0; i < epoch->num_partitions; i++)
{
A_Const *end_time_const = makeNode(A_Const);
TypeCast *cast = makeNode(TypeCast);
end_time_const->val = *makeInteger((int) epoch->partitions[i].keyspace_end);
end_time_const->location = -1;
cast->arg = (Node *) end_time_const;
cast->typeName = SystemTypeName("int2");
cast->location = -1;
array_list = lappend(array_list, cast);
}
array_expr = makeNode(A_ArrayExpr);
array_expr->elements = array_list;
array_expr->location = -1;
array_pos_args = list_make2(array_expr, get_keyspace_fn_call(part_info));
array_pos_fc = makeFuncCall(array_pos_func_name, array_pos_args, -1);
partition_elem = makeIndexElem(NULL, (Node *) array_pos_fc);
keyspace_elem = makeIndexElem(NULL, (Node *) get_keyspace_fn_call(part_info));
/* partition_number, time, keyspace */
/* can probably get rid of keyspace but later */
indexElem = list_make3(partition_elem, time_elem, keyspace_elem);
}
else
{
indexElem = list_make1(time_elem);
}
index_stmt->idxname = "copy_insert";
index_stmt->relation = makeRangeVar("pg_temp", copy_table_name(hypertable_id), -1);
index_stmt->accessMethod = "btree";
index_stmt->indexParams = indexElem;
relid =
RangeVarGetRelidExtended(index_stmt->relation, ShareLock,
false, false,
RangeVarCallbackOwnsRelation,
NULL);
index_stmt = transformIndexStmt(relid, index_stmt, "");
return DefineIndex(relid, /* OID of heap relation */
index_stmt,
InvalidOid, /* no predefined OID */
false, /* is_alter_table */
true, /* check_rights */
false, /* skip_build */
false); /* quiet */
}
/* Helper function to create the FuncCall for calculating the keyspace_value. Used for
* creating the copy_insert index
*
*/
static Node *
get_keyspace_fn_call(PartitioningInfo *part_info)
{
ColumnRef *col_ref = makeNode(ColumnRef);
A_Const *mod_const;
List *part_func_name = list_make2(makeString(part_info->partfunc.schema), makeString(part_info->partfunc.name));
List *part_func_args;
col_ref->fields = list_make1(makeString(part_info->column));
col_ref->location = -1;
mod_const = makeNode(A_Const);
mod_const->val = *makeInteger(part_info->partfunc.modulos);
mod_const->location = -1;
part_func_args = list_make2(col_ref, mod_const);
return (Node *) makeFuncCall(part_func_name, part_func_args, -1);
}

View File

@ -223,3 +223,8 @@ chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock)
return chunk;
}
bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt){
return row->start_time <= time_pt && row->end_time >= time_pt;
}

View File

@ -47,4 +47,6 @@ extern crn_set *fetch_crn_set(crn_set *entry, int32 chunk_id);
chunk_row *
chunk_row_insert_new(int32 partition_id, int64 timepoint, bool lock);
bool chunk_row_timepoint_is_member(const chunk_row *row, const int64 time_pt);
#endif /* IOBEAMDB_METADATA_QUERIES_H */

View File

@ -175,6 +175,8 @@ partition_epoch_tuple_found(TupleInfo *ti, void *arg)
DatumGetCString(partcol),
DatumGetInt16(partmod),
pctx->relid);
} else {
pe->partitioning = NULL;
}
/* Scan for the epoch's partitions */
@ -294,7 +296,7 @@ cmp_partitions(const void *keyspace_pt_arg, const void *value)
int16 keyspace_pt = *((int16 *) keyspace_pt_arg);
const Partition *part = value;
if (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt)
if (partition_keyspace_pt_is_member(part, keyspace_pt))
{
return 0;
}
@ -317,7 +319,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt
return NULL;
}
if (keyspace_pt < 0)
if (keyspace_pt == KEYSPACE_PT_NO_PARTITIONING)
{
if (epoch->num_partitions > 1)
{
@ -338,3 +340,7 @@ partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt
return part;
}
bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt)
{
return keyspace_pt == KEYSPACE_PT_NO_PARTITIONING || (part->keyspace_start <= keyspace_pt && part->keyspace_end >= keyspace_pt);
}

View File

@ -1,6 +1,8 @@
#ifndef IOBEAMDB_PARTITIONING_H
#define IOBEAMDB_PARTITIONING_H
#define KEYSPACE_PT_NO_PARTITIONING -1
#include <postgres.h>
#include <access/attnum.h>
#include <fmgr.h>
@ -53,4 +55,5 @@ int16 partitioning_func_apply(PartitioningFunc *pf, Datum value);
Partition *partition_epoch_get_partition(epoch_and_partitions_set *epoch, int16 keyspace_pt);
bool partition_keyspace_pt_is_member(const Partition *part, const int16 keyspace_pt);
#endif /* IOBEAMDB_PARTITIONING_H */

View File

@ -1,7 +1,10 @@
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: t; c-basic-offset: 4 -*- */
#include "pgmurmur3.h"
#include <catalog/pg_type.h>
#include "utils/builtins.h"
#include <utils/array.h>
#include <utils/lsyscache.h>
/* adapted from https://github.com/markokr/pghashlib/ */
@ -11,7 +14,7 @@ Datum
pg_murmur3_hash_string(PG_FUNCTION_ARGS)
{
struct varlena *data;
uint64_t io[MAX_IO_VALUES];
uint64_t io[MAX_IO_VALUES];
memset(io, 0, sizeof(io));
@ -36,12 +39,11 @@ PG_FUNCTION_INFO_V1(get_partition_for_key);
Datum
get_partition_for_key(PG_FUNCTION_ARGS)
{
// SELECT ((_iobeamdb_internal.murmur3_hash_string(key, 1 :: INT4) & x'7fffffff' :: INTEGER) % mod_factor) :: SMALLINT INTO ret;
struct varlena *data;
int32 mod;
Datum hash_d;
int32 hash_i;
int16 res;
struct varlena *data;
int32 mod;
Datum hash_d;
int32 hash_i;
int16 res;
/* request aligned data on weird architectures */
#ifdef HLIB_UNALIGNED_READ_OK
data = PG_GETARG_VARLENA_PP(0);
@ -57,3 +59,96 @@ get_partition_for_key(PG_FUNCTION_ARGS)
PG_RETURN_INT16(res);
}
/*
* array_position_least returns the highest position in the array such that the element
* in the array is <= searched element. If the array is of partition-keyspace-end values
* then this gives a unique bucket for each keyspace value.
*
* Arg 0: sorted array of smallint
* Arg 1: searched_element (smallint)
*/
PG_FUNCTION_INFO_V1(array_position_least);
Datum
array_position_least(PG_FUNCTION_ARGS)
{
ArrayType *array;
Datum searched_element,
value;
bool isnull;
int position;
ArrayMetaState *my_extra;
ArrayIterator array_iterator;
if (PG_ARGISNULL(0) || PG_ARGISNULL(1))
{
elog(ERROR, "neither parameter should be null");
}
array = PG_GETARG_ARRAYTYPE_P(0);
if (INT2OID != ARR_ELEMTYPE(array))
{
elog(ERROR, "only support smallint arrays");
}
/* should be a single dimensioned array */
if (ARR_NDIM(array) > 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("can only work with single-dimension array")));
if (PG_ARGISNULL(1))
{
elog(ERROR, "does not expect null as searched-for element");
}
else
{
searched_element = PG_GETARG_DATUM(1);
}
position = (ARR_LBOUND(array))[0] - 1;
/*
* We arrange to look up type info for array_create_iterator only once per
* series of calls, assuming the element type doesn't change underneath
* us.
*/
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
if (my_extra == NULL)
{
fcinfo->flinfo->fn_extra = MemoryContextAlloc(fcinfo->flinfo->fn_mcxt,
sizeof(ArrayMetaState));
my_extra = (ArrayMetaState *) fcinfo->flinfo->fn_extra;
get_typlenbyvalalign(INT2OID,
&my_extra->typlen,
&my_extra->typbyval,
&my_extra->typalign);
my_extra->element_type = INT2OID;
}
/* Examine each array element until the element is >= searched_element. */
array_iterator = array_create_iterator(array, 0, my_extra);
while (array_iterate(array_iterator, &value, &isnull))
{
position++;
if (isnull)
{
elog(ERROR, "No element in array should be null");
}
if (DatumGetInt16(value) >= DatumGetInt16(searched_element))
break;
}
array_free_iterator(array_iterator);
/* Avoid leaking memory when handed toasted input */
PG_FREE_IF_COPY(array, 0);
PG_RETURN_INT16(position);
}

View File

@ -9,7 +9,7 @@
#include <stdint.h>
#endif
#ifdef HAVE_INTTYPES_H
#include <inttypes.h>
#include <inttypes.h>
#endif
#if !defined(PG_VERSION_NUM) || (PG_VERSION_NUM < 80300)
@ -36,10 +36,13 @@
#define MAX_IO_VALUES 2
/* hash function signatures */
void hlib_murmur3(const void *data, size_t len, uint64_t *io);
void hlib_murmur3(const void *data, size_t len, uint64_t *io);
/* SQL function */
Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS);
Datum pg_murmur3_hash_string(PG_FUNCTION_ARGS);
#define ARRAY_POSITION_LEAST_FN_NAME "array_position_least"
Datum array_position_least(PG_FUNCTION_ARGS);
#endif

View File

@ -20,6 +20,7 @@ typedef union ScanDesc {
*/
typedef struct InternalScannerCtx {
Relation tablerel, indexrel;
TupleInfo tinfo;
ScanDesc scan;
ScannerCtx *sctx;
} InternalScannerCtx;
@ -30,7 +31,7 @@ typedef struct InternalScannerCtx {
typedef struct Scanner {
Relation (*open)(InternalScannerCtx *ctx);
ScanDesc (*beginscan)(InternalScannerCtx *ctx);
HeapTuple (*getnext)(InternalScannerCtx *ctx);
bool (*getnext)(InternalScannerCtx *ctx);
void (*endscan)(InternalScannerCtx *ctx);
void (*close)(InternalScannerCtx *ctx);
} Scanner;
@ -50,9 +51,10 @@ static ScanDesc heap_scanner_beginscan(InternalScannerCtx *ctx)
return ctx->scan;
}
static HeapTuple heap_scanner_getnext(InternalScannerCtx *ctx)
static bool heap_scanner_getnext(InternalScannerCtx *ctx)
{
return heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection);
ctx->tinfo.tuple = heap_getnext(ctx->scan.heap_scan, ctx->sctx->scandirection);
return HeapTupleIsValid(ctx->tinfo.tuple);
}
static void heap_scanner_endscan(InternalScannerCtx *ctx)
@ -79,14 +81,18 @@ static ScanDesc index_scanner_beginscan(InternalScannerCtx *ctx)
ctx->scan.index_scan = index_beginscan(ctx->tablerel, ctx->indexrel,
SnapshotSelf, sctx->nkeys,
sctx->norderbys);
ctx->scan.index_scan->xs_want_itup = ctx->sctx->want_itup;
index_rescan(ctx->scan.index_scan, sctx->scankey,
sctx->nkeys, NULL, sctx->norderbys);
return ctx->scan;
}
static HeapTuple index_scanner_getnext(InternalScannerCtx *ctx)
static bool index_scanner_getnext(InternalScannerCtx *ctx)
{
return index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection);
ctx->tinfo.tuple = index_getnext(ctx->scan.index_scan, ctx->sctx->scandirection);
ctx->tinfo.ituple = ctx->scan.index_scan->xs_itup;
ctx->tinfo.ituple_desc = ctx->scan.index_scan->xs_itupdesc;
return HeapTupleIsValid(ctx->tinfo.tuple);
}
static void index_scanner_endscan(InternalScannerCtx *ctx)
@ -129,8 +135,8 @@ static Scanner scanners[] = {
*/
int scanner_scan(ScannerCtx *ctx)
{
HeapTuple tuple;
TupleDesc tuple_desc;
bool is_valid;
int num_tuples = 0;
Scanner *scanner = &scanners[ctx->scantype];
InternalScannerCtx ictx = {
@ -142,21 +148,19 @@ int scanner_scan(ScannerCtx *ctx)
tuple_desc = RelationGetDescr(ictx.tablerel);
ictx.tinfo.scanrel = ictx.tablerel;
ictx.tinfo.desc = tuple_desc;
/* Call pre-scan handler, if any. */
if (ctx->prescan != NULL)
ctx->prescan(ctx->data);
tuple = scanner->getnext(&ictx);
is_valid = scanner->getnext(&ictx);
while (HeapTupleIsValid(tuple))
while (is_valid)
{
TupleInfo ti = {
.scanrel = ictx.tablerel,
.tuple = tuple,
.desc = tuple_desc,
};
if (ctx->filter == NULL || ctx->filter(&ti, ctx->data))
if (ctx->filter == NULL || ctx->filter(&ictx.tinfo, ctx->data))
{
num_tuples++;
@ -165,7 +169,7 @@ int scanner_scan(ScannerCtx *ctx)
Buffer buffer;
HeapUpdateFailureData hufd;
ti.lockresult = heap_lock_tuple(ictx.tablerel, tuple,
ictx.tinfo.lockresult = heap_lock_tuple(ictx.tablerel, ictx.tinfo.tuple,
GetCurrentCommandId(false),
ctx->tuplock.lockmode,
ctx->tuplock.waitpolicy,
@ -176,11 +180,11 @@ int scanner_scan(ScannerCtx *ctx)
}
/* Abort the scan if the handler wants us to */
if (!ctx->tuple_found(&ti, ctx->data))
if (!ctx->tuple_found(&ictx.tinfo, ctx->data))
break;
}
tuple = scanner->getnext(&ictx);
is_valid = scanner->getnext(&ictx);
}
/* Call post-scan handler, if any. */

View File

@ -18,6 +18,9 @@ typedef struct TupleInfo
Relation scanrel;
HeapTuple tuple;
TupleDesc desc;
/* return index tuple if it was requested -- only for index scans */
IndexTuple ituple;
TupleDesc ituple_desc;
/*
* If the user requested a tuple lock, the result of the lock is passed on
* in lockresult.
@ -31,6 +34,7 @@ typedef struct ScannerCtx {
ScannerType scantype;
ScanKey scankey;
int nkeys, norderbys;
bool want_itup;
LOCKMODE lockmode;
struct {
LockTupleMode lockmode;

View File

@ -91,13 +91,13 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table
3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3
4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4
5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 |
7 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_7_data | | 1
8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2
9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3
10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4
11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 |
(12 rows)
SELECT * FROM _iobeamdb_catalog.chunk_replica_node;
@ -159,12 +159,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table
3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3
4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4
5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 |
8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2
9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3
10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4
11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 |
(10 rows)
SELECT * FROM _iobeamdb_catalog.chunk_replica_node;
@ -221,12 +221,12 @@ WHERE h.schema_name = 'public' AND (h.table_name = 'drop_chunk_test1' OR h.table
3 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_3_data | 3 | 3
4 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_4_data | 4 | 4
5 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_5_data | 5 | 5
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 | 6
6 | 1 | 1 | _iobeamdb_internal | _hyper_1_1_0_6_data | 6 |
8 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_8_data | 2 | 2
9 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_9_data | 3 | 3
10 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_10_data | 4 | 4
11 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_11_data | 5 | 5
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 | 6
12 | 2 | 2 | _iobeamdb_internal | _hyper_2_2_0_12_data | 6 |
(9 rows)
SELECT * FROM _iobeamdb_catalog.chunk_replica_node;

View File

@ -90,7 +90,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c
----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------
4 | 3 | | 1 | 4 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_4_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
5 | 3 | 2 | 2 | 5 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_5_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
6 | 3 | 3 | 3 | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
6 | 3 | 3 | | 6 | 3 | single | _iobeamdb_internal | _hyper_2_3_0_6_data | 3 | 3 | 2 | 0 | _iobeamdb_internal | _hyper_2_3_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
(3 rows)
\d+ "_iobeamdb_internal".*
@ -351,7 +351,7 @@ Inherits: _iobeamdb_internal._hyper_2_3_0_partition
device_id | text | | extended | |
Check constraints:
"partition" CHECK (_iobeamdb_catalog.get_partition_for_key(device_id, 32768) >= '0'::smallint AND _iobeamdb_catalog.get_partition_for_key(device_id, 32768) <= '32767'::smallint)
"time_range" CHECK ("time" >= '3'::bigint AND "time" <= '3'::bigint) NOT VALID
"time_range" CHECK ("time" >= '3'::bigint) NOT VALID
Inherits: _iobeamdb_internal._hyper_2_3_0_partition
Table "_iobeamdb_internal._hyper_2_3_0_partition"
@ -400,7 +400,7 @@ SELECT * FROM _iobeamdb_catalog.chunk;
3 | 2 | |
4 | 3 | | 1
5 | 3 | 2 | 2
6 | 3 | 3 | 3
6 | 3 | 3 |
(6 rows)
SELECT * FROM _iobeamdb_catalog.chunk_replica_node;

View File

@ -91,7 +91,7 @@ SELECT * FROM _iobeamdb_catalog.chunk c
----+--------------+------------+----------+----------+----------------------+---------------+--------------------+---------------------+----+--------------+---------------+------------+--------------------+------------------------+----+-------------+--------------------+------------------------+-------------------------+--------------------+-----------------+--------------------+-----------+------------------+------------------+------------+------------------
3 | 2 | | 1 | 3 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_3_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
4 | 2 | 2 | 2 | 4 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_4_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
5 | 2 | 3 | 3 | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
5 | 2 | 3 | | 5 | 2 | single | _iobeamdb_internal | _hyper_2_2_0_5_data | 2 | 2 | 2 | 0 | _iobeamdb_internal | _hyper_2_2_0_partition | 2 | public | chunk_closing_test | _iobeamdb_internal | _hyper_2 | _iobeamdb_internal | _hyper_2_root | 1 | STICKY | time | bigint | single | 10000
(3 rows)
\c single

View File

@ -61,9 +61,9 @@ INSERT INTO PUBLIC."testNs"("timeCustom", device_id, series_0, series_1) VALUES
SELECT * FROM PUBLIC."testNs";
timeCustom | device_id | series_0 | series_1 | series_2 | series_bool
--------------------------+-----------+----------+----------+----------+-------------
Tue Nov 10 23:00:02 2009 | dev1 | 2.5 | 3 | |
Thu Nov 12 01:00:00 2009 | dev1 | 1.5 | 1 | |
Thu Nov 12 01:00:00 2009 | dev1 | 1.5 | 2 | |
Tue Nov 10 23:00:02 2009 | dev1 | 2.5 | 3 | |
Tue Nov 10 23:00:00 2009 | dev2 | 1.5 | 1 | |
Tue Nov 10 23:00:00 2009 | dev2 | 1.5 | 2 | |
(5 rows)