Change close-chunk logic to use a c-based fastpath

This change is a performance improvement. Previously each insert called
a plpgsql function to check if there is a need to close the chunk. This
patch implements a c-only fastpath for the case when the table size is
less than the configured chunk size.
This commit is contained in:
Matvey Arye 2017-03-06 16:32:22 -05:00
parent 2379a4483a
commit 00b69ac010
9 changed files with 98 additions and 9 deletions

View File

@ -16,6 +16,7 @@ SRCS = \
src/metadata_queries.c \
src/cache.c \
src/cache_invalidate.c \
src/chunk.c \
src/scanner.c \
src/hypertable_cache.c \
src/chunk_cache.c \

View File

@ -29,6 +29,11 @@ WHERE c.partition_id = get_chunk_locked.partition_id AND
FOR SHARE;
$BODY$;
CREATE OR REPLACE FUNCTION _timescaledb_catalog.local_chunk_size(name, name) RETURNS bigint
AS '$libdir/timescaledb', 'local_chunk_size' LANGUAGE C IMMUTABLE STRICT;
--returns the current size of a chunk (in bytes) given its ID.
--The size is typically aligned with the page size in Postgres.
CREATE OR REPLACE FUNCTION _timescaledb_internal.get_local_chunk_size(
@ -38,20 +43,18 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.get_local_chunk_size(
$BODY$
DECLARE
chunk_replica_row _timescaledb_catalog.chunk_replica_node;
chunk_table_name TEXT;
BEGIN
SELECT *
INTO STRICT chunk_replica_row
FROM _timescaledb_catalog.chunk_replica_node crn
WHERE crn.chunk_id = get_local_chunk_size.chunk_id;
WHERE crn.chunk_id = get_local_chunk_size.chunk_id
AND crn.database_name = current_database();
IF chunk_replica_row.database_name != current_database() THEN
RAISE EXCEPTION 'get_local_chunk_size should only be called locally'
USING ERRCODE = 'IO501';
END IF;
chunk_table_name := format('%I.%I', chunk_replica_row.schema_name, chunk_replica_row.table_name);
RETURN pg_table_size(chunk_table_name :: REGCLASS);
RETURN _timescaledb_catalog.local_chunk_size(chunk_replica_row.schema_name, chunk_replica_row.table_name);
END
$BODY$;

20
src/chunk.c Normal file
View File

@ -0,0 +1,20 @@
#include "chunk.h"
#include <catalog/namespace.h>
#include <postgres.h>
#include <fmgr.h>
#include <utils/builtins.h>
#include <utils/lsyscache.h>
PG_FUNCTION_INFO_V1(local_chunk_size);
Datum
local_chunk_size(PG_FUNCTION_ARGS)
{
Name schema = PG_GETARG_NAME(0);
Name table = PG_GETARG_NAME(1);
Oid relOid = get_relname_relid(table->data, get_namespace_oid(schema->data, false));
Datum res = DirectFunctionCall1(pg_table_size, ObjectIdGetDatum(relOid));
return res;
}

9
src/chunk.h Normal file
View File

@ -0,0 +1,9 @@
#ifndef _IOBEAMDB_CHUNK_H_
#define _IOBEAMDB_CHUNK_H_
#include <postgres.h>
#include <fmgr.h>
extern Datum local_chunk_size(PG_FUNCTION_ARGS);
#endif /* _IOBEAMDB_CHUNK_H_ */

View File

@ -65,6 +65,7 @@ static Cache *hypertable_cache_current = NULL;
#define HT_TBL_COL_ID 1
#define HT_TBL_COL_TIME_COL_NAME 10
#define HT_TBL_COL_TIME_TYPE 11
#define HT_TBL_COL_CHUNK_SIZE 13
/* Primary key Index column number */
#define HT_IDX_COL_ID 1
@ -78,6 +79,7 @@ hypertable_tuple_found(TupleInfo * ti, void *data)
Datum id_datum = heap_getattr(ti->tuple, HT_TBL_COL_ID, ti->desc, &is_null);
Datum time_col_datum = heap_getattr(ti->tuple, HT_TBL_COL_TIME_COL_NAME, ti->desc, &is_null);
Datum time_type_datum = heap_getattr(ti->tuple, HT_TBL_COL_TIME_TYPE, ti->desc, &is_null);
Datum chunk_size_datum = heap_getattr(ti->tuple, HT_TBL_COL_CHUNK_SIZE, ti->desc, &is_null);
int32 id = DatumGetInt32(id_datum);
if (id != hctx->hypertable_id)
@ -89,6 +91,7 @@ hypertable_tuple_found(TupleInfo * ti, void *data)
he->id = hctx->hypertable_id;
strncpy(he->time_column_name, DatumGetCString(time_col_datum), NAMEDATALEN);
he->time_column_type = DatumGetObjectId(time_type_datum);
he->chunk_size_bytes = DatumGetInt64(chunk_size_datum);
return true;
}

View File

@ -20,10 +20,12 @@ typedef struct hypertable_cache_entry
char time_column_name[NAMEDATALEN];
Oid time_column_type;
int num_epochs;
int64 chunk_size_bytes;
/* Array of epoch_and_partitions_set*. Order by start_time */
epoch_and_partitions_set *epochs[MAX_EPOCHS_PER_HYPERTABLE_CACHE_ENTRY];
} hypertable_cache_entry;
hypertable_cache_entry *hypertable_cache_get_entry(Cache * cache, int32 hypertable_id);
epoch_and_partitions_set *

View File

@ -48,6 +48,7 @@
#include "scanner.h"
#include "catalog.h"
#include "pgmurmur3.h"
#include "chunk.h"
#include <utils/tqual.h>
#include <utils/rls.h>
@ -83,6 +84,31 @@ get_close_if_needed_fn()
return single;
}
static void
close_if_needed(hypertable_cache_entry * hci, chunk_cache_entry * chunk)
{
Name db_name;
Datum db_name_datum;
crn_row *crn;
db_name_datum = DirectFunctionCall1(current_database, PointerGetDatum(NULL));
db_name = DatumGetName(db_name_datum);
crn = crn_set_get_crn_row_for_db(chunk->crns, db_name->data);
if (crn != NULL)
{
Datum size_datum = DirectFunctionCall2(local_chunk_size, NameGetDatum(&crn->schema_name), NameGetDatum(&crn->table_name));
int64 size = DatumGetInt64(size_datum);
if (hci->chunk_size_bytes > size)
{
return;
}
}
FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id));
return;
}
/*
*
* Helper functions for inserting tuples into chunk tables
@ -338,7 +364,6 @@ copy_table_tuple_found(TupleInfo * ti, void *data)
if (ctx->chunk_ctx == NULL)
{
Datum was_closed_datum;
chunk_cache_entry *chunk;
Cache *pinned = chunk_crn_set_cache_pin();
@ -347,7 +372,7 @@ copy_table_tuple_found(TupleInfo * ti, void *data)
* performance)
*/
chunk = get_chunk_cache_entry(pinned, ctx->part, time_pt, false);
was_closed_datum = FunctionCall1(get_close_if_needed_fn(), Int32GetDatum(chunk->id));
close_if_needed(ctx->hci, chunk);
/*
* chunk may have been closed and thus changed /or/ need to get share

View File

@ -57,10 +57,11 @@ free_epoch(epoch_and_partitions_set * epoch)
}
#define CRN_QUERY_ARGS (Oid[]) { INT4OID }
#define CRN_QUERY "SELECT schema_name, table_name \
#define CRN_QUERY "SELECT schema_name, table_name, database_name \
FROM _timescaledb_catalog.chunk_replica_node AS crn \
WHERE crn.chunk_id = $1"
DEFINE_PLAN(get_crn_plan, CRN_QUERY, 1, CRN_QUERY_ARGS)
crn_set *
@ -106,13 +107,17 @@ fetch_crn_set(crn_set * entry, int32 chunk_id)
HeapTuple tuple = SPI_tuptable->vals[j];
crn_row *tab = SPI_palloc(sizeof(crn_row));
Name name = DatumGetName(SPI_getbinval(tuple, tupdesc, 1, &is_null));
Name name;
name = DatumGetName(SPI_getbinval(tuple, tupdesc, 1, &is_null));
memcpy(tab->schema_name.data, name, NAMEDATALEN);
name = DatumGetName(SPI_getbinval(tuple, tupdesc, 2, &is_null));
memcpy(tab->table_name.data, name, NAMEDATALEN);
name = DatumGetName(SPI_getbinval(tuple, tupdesc, 3, &is_null));
memcpy(tab->database_name.data, name, NAMEDATALEN);
tab_array[j] = tab;
}
SPI_finish();
@ -230,3 +235,20 @@ chunk_row_timepoint_is_member(const chunk_row * row, const int64 time_pt)
{
return row->start_time <= time_pt && row->end_time >= time_pt;
}
extern crn_row *
crn_set_get_crn_row_for_db(crn_set * set, char *dbname)
{
ListCell *lc;
foreach(lc, set->tables)
{
crn_row *cr = lfirst(lc);
if (strncmp(cr->database_name.data, dbname, NAMEDATALEN) == 0)
{
return cr;
}
}
return NULL;
}

View File

@ -24,8 +24,10 @@ typedef struct crn_row
{
NameData schema_name;
NameData table_name;
NameData database_name;
} crn_row;
typedef struct crn_set
{
int32 chunk_id;
@ -49,4 +51,6 @@ chunk_row *
bool chunk_row_timepoint_is_member(const chunk_row * row, const int64 time_pt);
extern crn_row *crn_set_get_crn_row_for_db(crn_set * set, char *dbname);
#endif /* TIMESCALEDB_METADATA_QUERIES_H */