Fix issue creating dimensional constraints

During chunk creation, the chunk's dimensional CHECK constraints are
created via an "upcall" to PL/pgSQL code. However, creating
dimensional constraints in PL/pgSQL code sometimes fails, especially
during high-concurrency inserts, because PL/pgSQL code scans metadata
using a snapshot that might not see the same metadata as the C
code. As a result, chunk creation sometimes fail during constraint
creation.

To fix this issue, implement dimensional CHECK-constraint creation in
C code. Other constraints (FK, PK, etc.) are still created via an
upcall, but should probably also be rewritten in C. However, since
these constraints don't depend on recently updated metadata, this is
left to a future change.

Fixes #5456
This commit is contained in:
Erik Nordström 2023-03-16 16:45:28 +01:00 committed by Erik Nordström
parent 72c0f5b25e
commit a51d21efbe
14 changed files with 412 additions and 215 deletions

View File

@ -24,6 +24,7 @@ accidentally triggering the load of a previous DB version.**
* #5442 Decompression may have lost DEFAULT values
* #5446 Add checks for malloc failure in libpq calls
* #5470 Ensure superuser perms during copy/move chunk
* #5459 Fix issue creating dimensional constraints
**Thanks**
* @nikolaps for reporting an issue with the COPY fetcher

View File

@ -22,12 +22,7 @@ BEGIN
SELECT * INTO STRICT hypertable_row FROM _timescaledb_catalog.hypertable h WHERE h.id = chunk_row.hypertable_id;
IF chunk_constraint_row.dimension_slice_id IS NOT NULL THEN
check_sql = _timescaledb_internal.dimension_slice_get_constraint_sql(chunk_constraint_row.dimension_slice_id);
IF check_sql IS NOT NULL THEN
def := format('CHECK (%s)', check_sql);
ELSE
def := NULL;
END IF;
RAISE 'cannot create dimension constraint %', chunk_constraint_row;
ELSIF chunk_constraint_row.hypertable_constraint_name IS NOT NULL THEN
SELECT oid, contype INTO STRICT constraint_oid, constraint_type FROM pg_constraint

View File

@ -366,7 +366,8 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.range_value_to_pretty(
$BODY$
DECLARE
BEGIN
IF NOT _timescaledb_internal.dimension_is_finite(time_value) THEN
IF NOT (time_value > (-9223372036854775808)::bigint AND
time_value < 9223372036854775807::bigint) THEN
RETURN '';
END IF;
IF time_value IS NULL THEN

View File

@ -21,3 +21,6 @@ CREATE FUNCTION _timescaledb_internal.recompress_chunk_segmentwise(REGCLASS, BOO
AS '@MODULE_PATHNAME@', 'ts_recompress_chunk_segmentwise' LANGUAGE C STRICT VOLATILE;
CREATE FUNCTION _timescaledb_internal.get_compressed_chunk_index_for_recompression(REGCLASS) RETURNS REGCLASS
AS '@MODULE_PATHNAME@', 'ts_get_compressed_chunk_index_for_recompression' LANGUAGE C STRICT VOLATILE;
DROP FUNCTION _timescaledb_internal.dimension_is_finite;
DROP FUNCTION _timescaledb_internal.dimension_slice_get_constraint_sql;

View File

@ -12,3 +12,90 @@ DROP TABLE IF EXISTS _timescaledb_catalog.continuous_aggs_watermark;
DROP FUNCTION IF EXISTS _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER);
DROP FUNCTION _timescaledb_internal.recompress_chunk_segmentwise(REGCLASS, BOOLEAN);
DROP FUNCTION _timescaledb_internal.get_compressed_chunk_index_for_recompression(REGCLASS);
CREATE OR REPLACE FUNCTION _timescaledb_internal.dimension_is_finite(
val BIGINT
)
RETURNS BOOLEAN LANGUAGE SQL IMMUTABLE PARALLEL SAFE AS
$BODY$
--end values of bigint reserved for infinite
SELECT val > (-9223372036854775808)::bigint AND val < 9223372036854775807::bigint
$BODY$ SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_internal.dimension_slice_get_constraint_sql(
dimension_slice_id INTEGER
)
RETURNS TEXT LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
dimension_slice_row _timescaledb_catalog.dimension_slice;
dimension_row _timescaledb_catalog.dimension;
dimension_def TEXT;
dimtype REGTYPE;
parts TEXT[];
BEGIN
SELECT * INTO STRICT dimension_slice_row
FROM _timescaledb_catalog.dimension_slice
WHERE id = dimension_slice_id;
SELECT * INTO STRICT dimension_row
FROM _timescaledb_catalog.dimension
WHERE id = dimension_slice_row.dimension_id;
IF dimension_row.partitioning_func_schema IS NOT NULL AND
dimension_row.partitioning_func IS NOT NULL THEN
SELECT prorettype INTO STRICT dimtype
FROM pg_catalog.pg_proc pro
WHERE pro.oid = format('%I.%I', dimension_row.partitioning_func_schema, dimension_row.partitioning_func)::regproc::oid;
dimension_def := format('%1$I.%2$I(%3$I)',
dimension_row.partitioning_func_schema,
dimension_row.partitioning_func,
dimension_row.column_name);
ELSE
dimension_def := format('%1$I', dimension_row.column_name);
dimtype := dimension_row.column_type;
END IF;
IF dimension_row.num_slices IS NOT NULL THEN
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_start) THEN
parts = parts || format(' %1$s >= %2$L ', dimension_def, dimension_slice_row.range_start);
END IF;
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_end) THEN
parts = parts || format(' %1$s < %2$L ', dimension_def, dimension_slice_row.range_end);
END IF;
IF array_length(parts, 1) = 0 THEN
RETURN NULL;
END IF;
return array_to_string(parts, 'AND');
ELSE
-- only works with time for now
IF _timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimtype) =
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype) THEN
RAISE 'time-based constraints have the same start and end values for column "%": %',
dimension_row.column_name,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype);
END IF;
parts = ARRAY[]::text[];
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_start) THEN
parts = parts || format(' %1$s >= %2$s ',
dimension_def,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimtype));
END IF;
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_end) THEN
parts = parts || format(' %1$s < %2$s ',
dimension_def,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype));
END IF;
return array_to_string(parts, 'AND');
END IF;
END
$BODY$ SET search_path TO pg_catalog, pg_temp;

View File

@ -5,93 +5,6 @@
-- This file contains functions associated with creating new
-- hypertables.
CREATE OR REPLACE FUNCTION _timescaledb_internal.dimension_is_finite(
val BIGINT
)
RETURNS BOOLEAN LANGUAGE SQL IMMUTABLE PARALLEL SAFE AS
$BODY$
--end values of bigint reserved for infinite
SELECT val > (-9223372036854775808)::bigint AND val < 9223372036854775807::bigint
$BODY$ SET search_path TO pg_catalog, pg_temp;
CREATE OR REPLACE FUNCTION _timescaledb_internal.dimension_slice_get_constraint_sql(
dimension_slice_id INTEGER
)
RETURNS TEXT LANGUAGE PLPGSQL VOLATILE AS
$BODY$
DECLARE
dimension_slice_row _timescaledb_catalog.dimension_slice;
dimension_row _timescaledb_catalog.dimension;
dimension_def TEXT;
dimtype REGTYPE;
parts TEXT[];
BEGIN
SELECT * INTO STRICT dimension_slice_row
FROM _timescaledb_catalog.dimension_slice
WHERE id = dimension_slice_id;
SELECT * INTO STRICT dimension_row
FROM _timescaledb_catalog.dimension
WHERE id = dimension_slice_row.dimension_id;
IF dimension_row.partitioning_func_schema IS NOT NULL AND
dimension_row.partitioning_func IS NOT NULL THEN
SELECT prorettype INTO STRICT dimtype
FROM pg_catalog.pg_proc pro
WHERE pro.oid = format('%I.%I', dimension_row.partitioning_func_schema, dimension_row.partitioning_func)::regproc::oid;
dimension_def := format('%1$I.%2$I(%3$I)',
dimension_row.partitioning_func_schema,
dimension_row.partitioning_func,
dimension_row.column_name);
ELSE
dimension_def := format('%1$I', dimension_row.column_name);
dimtype := dimension_row.column_type;
END IF;
IF dimension_row.num_slices IS NOT NULL THEN
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_start) THEN
parts = parts || format(' %1$s >= %2$L ', dimension_def, dimension_slice_row.range_start);
END IF;
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_end) THEN
parts = parts || format(' %1$s < %2$L ', dimension_def, dimension_slice_row.range_end);
END IF;
IF array_length(parts, 1) = 0 THEN
RETURN NULL;
END IF;
return array_to_string(parts, 'AND');
ELSE
-- only works with time for now
IF _timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimtype) =
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype) THEN
RAISE 'time-based constraints have the same start and end values for column "%": %',
dimension_row.column_name,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype);
END IF;
parts = ARRAY[]::text[];
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_start) THEN
parts = parts || format(' %1$s >= %2$s ',
dimension_def,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_start, dimtype));
END IF;
IF _timescaledb_internal.dimension_is_finite(dimension_slice_row.range_end) THEN
parts = parts || format(' %1$s < %2$s ',
dimension_def,
_timescaledb_internal.time_literal_sql(dimension_slice_row.range_end, dimtype));
END IF;
return array_to_string(parts, 'AND');
END IF;
END
$BODY$ SET search_path TO pg_catalog, pg_temp;
-- Outputs the create_hypertable command to recreate the given hypertable.
--
-- This is currently used internally for our single hypertable backup tool

View File

@ -13,6 +13,7 @@
#include <catalog/indexing.h>
#include <catalog/namespace.h>
#include <catalog/pg_class.h>
#include <catalog/pg_constraint.h>
#include <catalog/pg_inherits.h>
#include <catalog/pg_trigger.h>
#include <catalog/pg_type.h>
@ -33,6 +34,7 @@
#include <utils/datum.h>
#include <utils/hsearch.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/syscache.h>
#include <utils/timestamp.h>
@ -122,7 +124,7 @@ chunk_stub_is_valid(const ChunkStub *stub, int16 expected_slices)
}
typedef ChunkResult (*on_chunk_stub_func)(ChunkScanCtx *ctx, ChunkStub *stub);
static void chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *point);
static void chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hypertable *ht, const Point *point);
static void chunk_scan_ctx_destroy(ChunkScanCtx *ctx);
static void chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube);
static int chunk_scan_ctx_foreach_chunk_stub(ChunkScanCtx *ctx, on_chunk_stub_func on_chunk,
@ -311,7 +313,7 @@ do_dimension_alignment(ChunkScanCtx *scanctx, ChunkStub *stub)
{
CollisionInfo *info = scanctx->data;
Hypercube *cube = info->cube;
const Hyperspace *space = scanctx->space;
const Hyperspace *space = scanctx->ht->space;
ChunkResult res = CHUNK_IGNORED;
int i;
@ -417,7 +419,7 @@ do_collision_resolution(ChunkScanCtx *scanctx, ChunkStub *stub)
{
CollisionInfo *info = scanctx->data;
Hypercube *cube = info->cube;
const Hyperspace *space = scanctx->space;
const Hyperspace *space = scanctx->ht->space;
ChunkResult res = CHUNK_IGNORED;
int i;
@ -459,7 +461,7 @@ check_for_collisions(ChunkScanCtx *scanctx, ChunkStub *stub)
{
CollisionInfo *info = scanctx->data;
Hypercube *cube = info->cube;
const Hyperspace *space = scanctx->space;
const Hyperspace *space = scanctx->ht->space;
/* Check if this chunk collides with our hypercube */
if (stub->cube->num_slices == space->num_dimensions && ts_hypercubes_collide(cube, stub->cube))
@ -486,7 +488,7 @@ chunk_collides(const Hypertable *ht, const Hypercube *hc)
.colliding_chunk = NULL,
};
chunk_scan_ctx_init(&scanctx, ht->space, NULL);
chunk_scan_ctx_init(&scanctx, ht, NULL);
/* Scan for all chunks that collide with the hypercube of the new chunk */
chunk_collision_scan(&scanctx, hc);
@ -554,7 +556,7 @@ chunk_collision_resolve(const Hypertable *ht, Hypercube *cube, const Point *p)
.colliding_chunk = NULL,
};
chunk_scan_ctx_init(&scanctx, ht->space, p);
chunk_scan_ctx_init(&scanctx, ht, p);
/* Scan for all chunks that collide with the hypercube of the new chunk */
chunk_collision_scan(&scanctx, cube);
@ -1004,14 +1006,10 @@ chunk_insert_into_metadata_after_lock(const Chunk *chunk)
}
static void
chunk_create_table_constraints(const Chunk *chunk)
chunk_create_table_constraints(const Hypertable *ht, const Chunk *chunk)
{
/* Create the chunk's constraints, triggers, and indexes */
ts_chunk_constraints_create(chunk->constraints,
chunk->table_id,
chunk->fd.id,
chunk->hypertable_relid,
chunk->fd.hypertable_id);
ts_chunk_constraints_create(ht, chunk);
if (chunk->relkind == RELKIND_RELATION && !IS_OSM_CHUNK(chunk))
{
@ -1174,7 +1172,7 @@ chunk_create_from_hypercube_after_lock(const Hypertable *ht, Hypercube *cube,
chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_create_table_constraints(chunk);
chunk_create_table_constraints(ht, chunk);
return chunk;
}
@ -1279,7 +1277,7 @@ chunk_create_from_hypercube_and_table_after_lock(const Hypertable *ht, Hypercube
chunk_add_constraints(chunk);
chunk_insert_into_metadata_after_lock(chunk);
chunk_add_inheritance(chunk, ht);
chunk_create_table_constraints(chunk);
chunk_create_table_constraints(ht, chunk);
return chunk;
}
@ -1494,7 +1492,7 @@ ts_chunk_id_find_in_subspace(Hypertable *ht, List *dimension_vecs)
List *chunk_ids = NIL;
ChunkScanCtx ctx;
chunk_scan_ctx_init(&ctx, ht->space, /* point = */ NULL);
chunk_scan_ctx_init(&ctx, ht, /* point = */ NULL);
ScanIterator iterator = ts_chunk_constraint_scan_iterator_create(CurrentMemoryContext);
@ -1757,7 +1755,7 @@ chunk_create_from_stub(ChunkStubScanCtx *stubctx)
* tables during scans.
*/
static void
chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *point)
chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hypertable *ht, const Point *point)
{
struct HASHCTL hctl = {
.keysize = sizeof(int32),
@ -1767,7 +1765,7 @@ chunk_scan_ctx_init(ChunkScanCtx *ctx, const Hyperspace *hs, const Point *point)
memset(ctx, 0, sizeof(*ctx));
ctx->htab = hash_create("chunk-scan-context", 20, &hctl, HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
ctx->space = hs;
ctx->ht = ht;
ctx->point = point;
ctx->lockmode = NoLock;
}
@ -1820,7 +1818,7 @@ chunk_collision_scan(ChunkScanCtx *scanctx, const Hypercube *cube)
int i;
/* Scan all dimensions for colliding slices */
for (i = 0; i < scanctx->space->num_dimensions; i++)
for (i = 0; i < scanctx->ht->space->num_dimensions; i++)
{
DimensionVec *vec;
DimensionSlice *slice = cube->slices[i];
@ -1950,7 +1948,7 @@ chunk_resurrect(const Hypertable *ht, int chunk_id)
chunk->data_nodes = chunk_assign_data_nodes(chunk, ht);
}
chunk->table_id = chunk_create_table(chunk, ht);
chunk_create_table_constraints(chunk);
chunk_create_table_constraints(ht, chunk);
/* Finally, update the chunk tuple to no longer be a tombstone */
chunk->fd.dropped = false;
@ -2009,13 +2007,14 @@ chunk_point_find_chunk_id(const Hypertable *ht, const Point *p)
/* The scan context will keep the state accumulated during the scan */
ChunkScanCtx ctx;
chunk_scan_ctx_init(&ctx, ht->space, p);
chunk_scan_ctx_init(&ctx, ht, p);
/* Scan all dimensions for slices enclosing the point */
List *all_slices = NIL;
for (int dimension_index = 0; dimension_index < ctx.space->num_dimensions; dimension_index++)
for (int dimension_index = 0; dimension_index < ctx.ht->space->num_dimensions;
dimension_index++)
{
ts_dimension_slice_scan_list(ctx.space->dimensions[dimension_index].fd.id,
ts_dimension_slice_scan_list(ctx.ht->space->dimensions[dimension_index].fd.id,
p->coordinates[dimension_index],
&all_slices);
}
@ -2061,7 +2060,7 @@ chunk_point_find_chunk_id(const Hypertable *ht, const Point *p)
* i.e., a complete hypercube. Only one chunk matches a given hyperspace
* point, so we can stop early.
*/
if (entry->num_dimension_constraints == ctx.space->num_dimensions)
if (entry->num_dimension_constraints == ctx.ht->space->num_dimensions)
{
matching_chunk_id = entry->chunk_id;
break;
@ -2109,7 +2108,7 @@ chunks_find_all_in_range_limit(const Hypertable *ht, const Dimension *time_dim,
tuplock);
/* The scan context will keep the state accumulated during the scan */
chunk_scan_ctx_init(ctx, ht->space, NULL);
chunk_scan_ctx_init(ctx, ht, NULL);
/* No abort when the first chunk is found */
ctx->early_abort = false;
@ -3233,26 +3232,21 @@ ts_chunk_get_all_chunk_ids(LOCKMODE lockmode)
static ChunkResult
chunk_recreate_constraint(ChunkScanCtx *ctx, ChunkStub *stub)
{
ChunkConstraints *ccs = stub->constraints;
ChunkStubScanCtx stubctx = {
.stub = stub,
};
Chunk *chunk;
int i;
chunk = chunk_create_from_stub(&stubctx);
Chunk *chunk = chunk_create_from_stub(&stubctx);
if (stubctx.is_dropped)
elog(ERROR, "should not be recreating constraints on dropped chunks");
for (i = 0; i < ccs->num_constraints; i++)
ts_chunk_constraint_recreate(&ccs->constraints[i], chunk->table_id);
ts_chunk_constraints_recreate(ctx->ht, chunk);
return CHUNK_PROCESSED;
}
void
ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_id)
ts_chunk_recreate_all_constraints_for_dimension(Hypertable *ht, int32 dimension_id)
{
DimensionVec *slices;
ChunkScanCtx chunkctx;
@ -3263,7 +3257,7 @@ ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_
if (NULL == slices)
return;
chunk_scan_ctx_init(&chunkctx, hs, NULL);
chunk_scan_ctx_init(&chunkctx, ht, NULL);
for (i = 0; i < slices->num_slices; i++)
ts_chunk_constraint_scan_by_dimension_slice(slices->slices[i],
@ -3308,7 +3302,7 @@ ts_chunk_drop_fks(const Chunk *const chunk)
* are dropped during compression.
*/
void
ts_chunk_create_fks(const Chunk *const chunk)
ts_chunk_create_fks(const Hypertable *ht, const Chunk *const chunk)
{
Relation rel;
List *fks;
@ -3322,7 +3316,7 @@ ts_chunk_create_fks(const Chunk *const chunk)
foreach (lc, fks)
{
ForeignKeyCacheInfo *fk = lfirst_node(ForeignKeyCacheInfo, lc);
ts_chunk_constraint_create_on_chunk(chunk, fk->conoid);
ts_chunk_constraint_create_on_chunk(ht, chunk, fk->conoid);
}
}
@ -4630,7 +4624,7 @@ add_foreign_table_as_chunk(Oid relid, Hypertable *parent_ht)
chunk->fd.id,
chunk->relkind,
chunk->hypertable_relid);
chunk_create_table_constraints(chunk);
chunk_create_table_constraints(parent_ht, chunk);
/* Add dimension constriants for the chunk */
ts_chunk_constraints_add_dimension_constraints(chunk->constraints, chunk->fd.id, chunk->cube);
ts_chunk_constraints_insert_metadata(chunk->constraints);
@ -4638,10 +4632,11 @@ add_foreign_table_as_chunk(Oid relid, Hypertable *parent_ht)
}
void
ts_chunk_merge_on_dimension(Chunk *chunk, const Chunk *merge_chunk, int32 dimension_id)
ts_chunk_merge_on_dimension(const Hypertable *ht, Chunk *chunk, const Chunk *merge_chunk,
int32 dimension_id)
{
const DimensionSlice *slice, *merge_slice;
int num_ccs, i;
int num_ccs = 0;
bool dimension_slice_found = false;
if (chunk->hypertable_relid != merge_chunk->hypertable_relid)
@ -4722,8 +4717,24 @@ ts_chunk_merge_on_dimension(Chunk *chunk, const Chunk *merge_chunk, int32 dimens
ts_chunk_constraint_update_slice_id(chunk->fd.id, slice->fd.id, new_slice->fd.id);
ChunkConstraints *ccs = ts_chunk_constraints_alloc(1, CurrentMemoryContext);
num_ccs =
ts_chunk_constraint_scan_by_dimension_slice_id(new_slice->fd.id, ccs, CurrentMemoryContext);
ScanIterator iterator =
ts_scan_iterator_create(CHUNK_CONSTRAINT, AccessShareLock, CurrentMemoryContext);
ts_chunk_constraint_scan_iterator_set_slice_id(&iterator, new_slice->fd.id);
ts_scanner_foreach(&iterator)
{
bool isnull;
Datum d;
d = slot_getattr(ts_scan_iterator_slot(&iterator), Anum_chunk_constraint_chunk_id, &isnull);
if (!isnull && DatumGetInt32(d) == chunk->fd.id)
{
num_ccs++;
ts_chunk_constraints_add_from_tuple(ccs, ts_scan_iterator_tuple_info(&iterator));
}
}
if (num_ccs <= 0)
ereport(ERROR,
@ -4732,21 +4743,45 @@ ts_chunk_merge_on_dimension(Chunk *chunk, const Chunk *merge_chunk, int32 dimens
get_rel_name(chunk->table_id),
new_slice->fd.id)));
/* We have to recreate the chunk constraints since we are changing
* table constraints when updating the slice.
*/
for (i = 0; i < ccs->capacity; i++)
/* Update the slice in the chunk's hypercube. Needed to make recreate constraints work. */
for (int i = 0; i < chunk->cube->num_slices; i++)
{
ChunkConstraint cc = ccs->constraints[i];
if (cc.fd.chunk_id == chunk->fd.id)
if (chunk->cube->slices[i]->fd.dimension_id == dimension_id)
{
ts_process_utility_set_expect_chunk_modification(true);
ts_chunk_constraint_recreate(&cc, chunk->table_id);
ts_process_utility_set_expect_chunk_modification(false);
chunk->cube->slices[i] = new_slice;
break;
}
}
/* Delete the old constraint */
for (int i = 0; i < chunk->constraints->num_constraints; i++)
{
const ChunkConstraint *cc = &chunk->constraints->constraints[i];
if (cc->fd.dimension_slice_id == slice->fd.id)
{
ObjectAddress constrobj = {
.classId = ConstraintRelationId,
.objectId = get_relation_constraint_oid(chunk->table_id,
NameStr(cc->fd.constraint_name),
false),
};
performDeletion(&constrobj, DROP_RESTRICT, 0);
break;
}
}
/* We have to recreate the chunk constraints since we are changing
* table constraints when updating the slice.
*/
ChunkConstraints *oldccs = chunk->constraints;
chunk->constraints = ccs;
ts_process_utility_set_expect_chunk_modification(true);
ts_chunk_constraints_create(ht, chunk);
ts_process_utility_set_expect_chunk_modification(false);
chunk->constraints = oldccs;
ts_chunk_drop(merge_chunk, DROP_RESTRICT, 1);
}

View File

@ -105,7 +105,7 @@ typedef struct ChunkScanCtx
{
HTAB *htab;
char relkind; /* Create chunks of this relkind */
const Hyperspace *space;
const Hypertable *ht;
const Point *point;
unsigned int num_complete_chunks;
int num_processed;
@ -176,9 +176,9 @@ extern TSDLLEXPORT int32 ts_chunk_get_id_by_relid(Oid relid);
extern bool ts_chunk_exists_relid(Oid relid);
extern TSDLLEXPORT int ts_chunk_num_of_chunks_created_after(const Chunk *chunk);
extern TSDLLEXPORT bool ts_chunk_exists_with_compression(int32 hypertable_id);
extern void ts_chunk_recreate_all_constraints_for_dimension(Hyperspace *hs, int32 dimension_id);
extern void ts_chunk_recreate_all_constraints_for_dimension(Hypertable *ht, int32 dimension_id);
extern TSDLLEXPORT void ts_chunk_drop_fks(const Chunk *const chunk);
extern TSDLLEXPORT void ts_chunk_create_fks(const Chunk *const chunk);
extern TSDLLEXPORT void ts_chunk_create_fks(const Hypertable *ht, const Chunk *const chunk);
extern int ts_chunk_delete_by_hypertable_id(int32 hypertable_id);
extern int ts_chunk_delete_by_name(const char *schema, const char *table, DropBehavior behavior);
extern bool ts_chunk_set_name(Chunk *chunk, const char *newname);
@ -236,8 +236,8 @@ extern void ts_chunk_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id
extern bool ts_chunk_lock_if_exists(Oid chunk_oid, LOCKMODE chunk_lockmode);
extern int ts_chunk_oid_cmp(const void *p1, const void *p2);
int ts_chunk_get_osm_chunk_id(int hypertable_id);
extern TSDLLEXPORT void ts_chunk_merge_on_dimension(Chunk *chunk, const Chunk *merge_chunk,
int32 dimension_id);
extern TSDLLEXPORT void ts_chunk_merge_on_dimension(const Hypertable *ht, Chunk *chunk,
const Chunk *merge_chunk, int32 dimension_id);
#define chunk_get_by_name(schema_name, table_name, fail_if_not_found) \
ts_chunk_get_by_name_with_memory_context(schema_name, \

View File

@ -10,12 +10,15 @@
#include <catalog/indexing.h>
#include <catalog/objectaddress.h>
#include <catalog/pg_constraint.h>
#include <catalog/heap.h>
#include <commands/tablecmds.h>
#include <funcapi.h>
#include <nodes/makefuncs.h>
#include <storage/lockdefs.h>
#include <utils/builtins.h>
#include <utils/hsearch.h>
#include <utils/lsyscache.h>
#include <utils/palloc.h>
#include <utils/relcache.h>
#include <utils/rel.h>
#include <utils/syscache.h>
@ -34,6 +37,7 @@
#include "hypertable.h"
#include "errors.h"
#include "process_utility.h"
#include "partitioning.h"
#define DEFAULT_EXTRA_CONSTRAINTS_SIZE 4
@ -263,6 +267,112 @@ ts_chunk_constraints_add_from_tuple(ChunkConstraints *ccs, const TupleInfo *ti)
return constraints;
}
/*
* Create a dimensional CHECK constraint for a partitioning dimension.
*/
static Constraint *
create_dimension_check_constraint(const Dimension *dim, const DimensionSlice *slice,
const char *name)
{
Constraint *constr = NULL;
bool isvarlena;
Node *dimdef;
ColumnRef *colref;
Datum startdat, enddat;
List *compexprs = NIL;
Oid outfuncid;
if (slice->fd.range_start == PG_INT64_MIN && slice->fd.range_end == PG_INT64_MAX)
return NULL;
colref = makeNode(ColumnRef);
colref->fields = list_make1(makeString(pstrdup(NameStr(dim->fd.column_name))));
colref->location = -1;
/* Convert the dimensional ranges to the appropriate text/string
* representation for the time type. For dimensions with a
* partitioning/time function, use the function's output type. */
if (dim->partitioning != NULL)
{
/* Both open and closed dimensions can have a partitioning function */
PartitioningInfo *partinfo = dim->partitioning;
List *funcname = list_make2(makeString(NameStr(partinfo->partfunc.schema)),
makeString(NameStr(partinfo->partfunc.name)));
dimdef = (Node *) makeFuncCall(funcname,
list_make1(colref),
#if PG14_GE
COERCE_EXPLICIT_CALL,
#endif
-1);
if (IS_OPEN_DIMENSION(dim))
{
/* The dimension has a time function to compute the time value so
* need to convert the range values to the time type returned by
* the partitioning function. */
getTypeOutputInfo(partinfo->partfunc.rettype, &outfuncid, &isvarlena);
startdat = ts_internal_to_time_value(slice->fd.range_start, partinfo->partfunc.rettype);
enddat = ts_internal_to_time_value(slice->fd.range_end, partinfo->partfunc.rettype);
}
else
{
/* Closed dimension, just use the integer output function */
getTypeOutputInfo(INT8OID, &outfuncid, &isvarlena);
startdat = Int64GetDatum(slice->fd.range_start);
enddat = Int64GetDatum(slice->fd.range_end);
}
}
else
{
/* Must be open dimension, since no partitioning function */
Assert(IS_OPEN_DIMENSION(dim));
dimdef = (Node *) colref;
getTypeOutputInfo(dim->fd.column_type, &outfuncid, &isvarlena);
startdat = ts_internal_to_time_value(slice->fd.range_start, dim->fd.column_type);
enddat = ts_internal_to_time_value(slice->fd.range_end, dim->fd.column_type);
}
/* Convert internal format datums to string (output) datums */
startdat = OidFunctionCall1(outfuncid, startdat);
enddat = OidFunctionCall1(outfuncid, enddat);
/* Elide range constraint for +INF or -INF */
if (slice->fd.range_start != PG_INT64_MIN)
{
A_Const *start_const = makeNode(A_Const);
memcpy(&start_const->val, makeString(DatumGetCString(startdat)), sizeof(start_const->val));
start_const->location = -1;
A_Expr *ge_expr = makeSimpleA_Expr(AEXPR_OP, ">=", dimdef, (Node *) start_const, -1);
compexprs = lappend(compexprs, ge_expr);
}
if (slice->fd.range_end != PG_INT64_MAX)
{
A_Const *end_const = makeNode(A_Const);
memcpy(&end_const->val, makeString(DatumGetCString(enddat)), sizeof(end_const->val));
end_const->location = -1;
A_Expr *lt_expr = makeSimpleA_Expr(AEXPR_OP, "<", dimdef, (Node *) end_const, -1);
compexprs = lappend(compexprs, lt_expr);
}
constr = makeNode(Constraint);
constr->contype = CONSTR_CHECK;
constr->conname = name ? pstrdup(name) : NULL;
constr->deferrable = false;
constr->skip_validation = true;
constr->initially_valid = true;
Assert(list_length(compexprs) >= 1);
if (list_length(compexprs) == 2)
constr->raw_expr = (Node *) makeBoolExpr(AND_EXPR, compexprs, -1);
else if (list_length(compexprs) == 1)
constr->raw_expr = linitial(compexprs);
return constr;
}
/*
* Add a constraint to a chunk table.
*/
@ -290,15 +400,17 @@ chunk_constraint_create_on_table(const ChunkConstraint *cc, Oid chunk_oid)
}
/*
* Create a constraint on a chunk table, including adding relevant metadata to
* the catalog.
* Create a non-dimensional constraint on a chunk table (foreign key, trigger
* constraint, etc.), including adding relevant metadata to the catalog.
*/
static Oid
chunk_constraint_create(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id,
create_non_dimensional_constraint(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id,
Oid hypertable_oid, int32 hypertable_id)
{
Oid chunk_constraint_oid;
Assert(!is_dimension_constraint(cc));
ts_process_utility_set_expect_chunk_modification(true);
chunk_constraint_oid = chunk_constraint_create_on_table(cc, chunk_oid);
ts_process_utility_set_expect_chunk_modification(false);
@ -312,8 +424,6 @@ chunk_constraint_create(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id
if (!OidIsValid(chunk_constraint_oid))
return InvalidOid;
if (!is_dimension_constraint(cc))
{
Oid hypertable_constraint_oid =
get_relation_constraint_oid(hypertable_oid,
NameStr(cc->fd.hypertable_constraint_name),
@ -332,26 +442,80 @@ chunk_constraint_create(const ChunkConstraint *cc, Oid chunk_oid, int32 chunk_id
ReleaseSysCache(tuple);
}
}
return chunk_constraint_oid;
}
static const DimensionSlice *
get_slice_with_id(const Hypercube *cube, int32 id)
{
int i;
for (i = 0; i < cube->num_slices; i++)
{
const DimensionSlice *slice = cube->slices[i];
if (slice->fd.id == id)
return slice;
}
return NULL;
}
/*
* Create a set of constraints on a chunk table.
*/
void
ts_chunk_constraints_create(const ChunkConstraints *ccs, Oid chunk_oid, int32 chunk_id,
Oid hypertable_oid, int32 hypertable_id)
ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk)
{
const ChunkConstraints *ccs = chunk->constraints;
List *newconstrs = NIL;
int i;
for (i = 0; i < ccs->num_constraints; i++)
chunk_constraint_create(&ccs->constraints[i],
chunk_oid,
chunk_id,
hypertable_oid,
hypertable_id);
{
const ChunkConstraint *cc = &ccs->constraints[i];
if (is_dimension_constraint(cc))
{
const DimensionSlice *slice = get_slice_with_id(chunk->cube, cc->fd.dimension_slice_id);
const Dimension *dim;
Constraint *constr;
dim = ts_hyperspace_get_dimension_by_id(ht->space, slice->fd.dimension_id);
Assert(dim);
constr = create_dimension_check_constraint(dim, slice, NameStr(cc->fd.constraint_name));
/* In some cases, a CHECK constraint is not needed. For instance,
* if the range is -INF to +INF. */
if (constr != NULL)
newconstrs = lappend(newconstrs, constr);
}
else
{
create_non_dimensional_constraint(cc,
chunk->table_id,
chunk->fd.id,
ht->main_table_relid,
ht->fd.id);
}
}
if (newconstrs != NIL)
{
List PG_USED_FOR_ASSERTS_ONLY *cookedconstrs = NIL;
Relation rel = table_open(chunk->table_id, AccessExclusiveLock);
cookedconstrs = AddRelationNewConstraints(rel,
NIL /* List *newColDefaults */,
newconstrs,
false /* allow_merge */,
true /* is_local */,
false /* is_internal */,
NULL /* query string */);
table_close(rel, NoLock);
Assert(list_length(cookedconstrs) == list_length(newconstrs));
CommandCounterIncrement();
}
}
ScanIterator
@ -460,7 +624,7 @@ ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *slice, ChunkSc
ts_scanner_foreach(&iterator)
{
const Hyperspace *hs = ctx->space;
const Hyperspace *hs = ctx->ht->space;
ChunkStub *stub;
ChunkScanEntry *entry;
bool found;
@ -493,7 +657,7 @@ ts_chunk_constraint_scan_by_dimension_slice(const DimensionSlice *slice, ChunkSc
/* A stub is complete when we've added slices for all its dimensions,
* i.e., a complete hypercube */
if (chunk_stub_is_complete(stub, ctx->space))
if (chunk_stub_is_complete(stub, ctx->ht->space))
{
ctx->num_complete_chunks++;
@ -691,7 +855,7 @@ ts_chunk_constraints_add_inheritable_check_constraints(ChunkConstraints *ccs, in
}
void
ts_chunk_constraint_create_on_chunk(const Chunk *chunk, Oid constraint_oid)
ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk, Oid constraint_oid)
{
HeapTuple tuple;
Form_pg_constraint con;
@ -712,12 +876,11 @@ ts_chunk_constraint_create_on_chunk(const Chunk *chunk, Oid constraint_oid)
NameStr(con->conname));
ts_chunk_constraint_insert(cc);
chunk_constraint_create(cc,
create_non_dimensional_constraint(cc,
chunk->table_id,
chunk->fd.id,
chunk->hypertable_relid,
chunk->fd.hypertable_id);
ht->main_table_relid,
ht->fd.id);
}
ReleaseSysCache(tuple);
@ -874,15 +1037,25 @@ ts_chunk_constraint_delete_by_dimension_slice_id(int32 dimension_slice_id)
}
void
ts_chunk_constraint_recreate(const ChunkConstraint *cc, Oid chunk_oid)
ts_chunk_constraints_recreate(const Hypertable *ht, const Chunk *chunk)
{
const ChunkConstraints *ccs = chunk->constraints;
int i;
for (i = 0; i < ccs->num_constraints; i++)
{
const ChunkConstraint *cc = &ccs->constraints[i];
ObjectAddress constrobj = {
.classId = ConstraintRelationId,
.objectId = get_relation_constraint_oid(chunk_oid, NameStr(cc->fd.constraint_name), false),
.objectId = get_relation_constraint_oid(chunk->table_id,
NameStr(cc->fd.constraint_name),
false),
};
performDeletion(&constrobj, DROP_RESTRICT, 0);
chunk_constraint_create_on_table(cc, chunk_oid);
}
ts_chunk_constraints_create(ht, chunk);
}
static void

View File

@ -59,10 +59,9 @@ extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_constraints(ChunkCon
extern TSDLLEXPORT int ts_chunk_constraints_add_inheritable_check_constraints(
ChunkConstraints *ccs, int32 chunk_id, const char chunk_relkind, Oid hypertable_oid);
extern TSDLLEXPORT void ts_chunk_constraints_insert_metadata(const ChunkConstraints *ccs);
extern TSDLLEXPORT void ts_chunk_constraints_create(const ChunkConstraints *ccs, Oid chunk_oid,
int32 chunk_id, Oid hypertable_oid,
int32 hypertable_id);
extern void ts_chunk_constraint_create_on_chunk(const Chunk *chunk, Oid constraint_oid);
extern TSDLLEXPORT void ts_chunk_constraints_create(const Hypertable *ht, const Chunk *chunk);
extern void ts_chunk_constraint_create_on_chunk(const Hypertable *ht, const Chunk *chunk,
Oid constraint_oid);
extern int ts_chunk_constraint_delete_by_hypertable_constraint_name(
int32 chunk_id, const char *hypertable_constraint_name, bool delete_metadata,
bool drop_constraint);
@ -72,7 +71,7 @@ extern int ts_chunk_constraint_delete_by_constraint_name(int32 chunk_id,
const char *constraint_name,
bool delete_metadata,
bool drop_constraint);
extern void ts_chunk_constraint_recreate(const ChunkConstraint *cc, Oid chunk_oid);
extern void ts_chunk_constraints_recreate(const Hypertable *ht, const Chunk *chunk);
extern int ts_chunk_constraint_rename_hypertable_constraint(int32 chunk_id, const char *old_name,
const char *new_name);
extern int ts_chunk_constraint_adjust_meta(int32 chunk_id, const char *ht_constraint_name,

View File

@ -2187,7 +2187,7 @@ process_add_constraint_chunk(Hypertable *ht, Oid chunk_relid, void *arg)
Oid hypertable_constraint_oid = *((Oid *) arg);
Chunk *chunk = ts_chunk_get_by_relid(chunk_relid, true);
ts_chunk_constraint_create_on_chunk(chunk, hypertable_constraint_oid);
ts_chunk_constraint_create_on_chunk(ht, chunk, hypertable_constraint_oid);
}
static void
@ -3097,7 +3097,7 @@ process_alter_column_type_end(Hypertable *ht, AlterTableCmd *cmd)
ts_dimension_set_type(dim, new_type);
ts_process_utility_set_expect_chunk_modification(true);
ts_chunk_recreate_all_constraints_for_dimension(ht->space, dim->fd.id);
ts_chunk_recreate_all_constraints_for_dimension(ht, dim->fd.id);
ts_process_utility_set_expect_chunk_modification(false);
}

View File

@ -525,11 +525,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
* Do this after compressing the chunk to avoid holding strong, unnecessary locks on the
* referenced table during compression.
*/
ts_chunk_constraints_create(compress_ht_chunk->constraints,
compress_ht_chunk->table_id,
compress_ht_chunk->fd.id,
compress_ht_chunk->hypertable_relid,
compress_ht_chunk->fd.hypertable_id);
ts_chunk_constraints_create(cxt.compress_ht, compress_ht_chunk);
ts_trigger_create_all_on_chunk(compress_ht_chunk);
ts_chunk_set_compressed_chunk(cxt.srcht_chunk, compress_ht_chunk->fd.id);
}
@ -551,7 +547,7 @@ compress_chunk_impl(Oid hypertable_relid, Oid chunk_relid)
colinfo_array,
htcols_listlen);
ts_chunk_merge_on_dimension(mergable_chunk, cxt.srcht_chunk, time_dim->fd.id);
ts_chunk_merge_on_dimension(cxt.srcht, mergable_chunk, cxt.srcht_chunk, time_dim->fd.id);
if (chunk_unordered)
{
@ -654,7 +650,7 @@ decompress_chunk_impl(Oid uncompressed_hypertable_relid, Oid uncompressed_chunk_
decompress_chunk(compressed_chunk->table_id, uncompressed_chunk->table_id);
/* Recreate FK constraints, since they were dropped during compression. */
ts_chunk_create_fks(uncompressed_chunk);
ts_chunk_create_fks(uncompressed_hypertable, uncompressed_chunk);
/* Delete the compressed chunk */
ts_compression_chunk_size_delete(uncompressed_chunk->fd.id);
@ -809,11 +805,7 @@ tsl_create_compressed_chunk(PG_FUNCTION_ARGS)
compress_ht_chunk = create_compress_chunk(cxt.compress_ht, cxt.srcht_chunk, chunk_table);
/* Copy chunk constraints (including fkey) to compressed chunk */
ts_chunk_constraints_create(compress_ht_chunk->constraints,
compress_ht_chunk->table_id,
compress_ht_chunk->fd.id,
compress_ht_chunk->hypertable_relid,
compress_ht_chunk->fd.hypertable_id);
ts_chunk_constraints_create(cxt.compress_ht, compress_ht_chunk);
ts_trigger_create_all_on_chunk(compress_ht_chunk);
/* Drop all FK constraints on the uncompressed chunk. This is needed to allow

View File

@ -63,8 +63,6 @@ ORDER BY pronamespace::regnamespace::text COLLATE "C", p.oid::regprocedure::text
_timescaledb_internal.data_node_compressed_chunk_stats(name,name,name)
_timescaledb_internal.data_node_hypertable_info(name,name,name)
_timescaledb_internal.data_node_index_size(name,name,name)
_timescaledb_internal.dimension_is_finite(bigint)
_timescaledb_internal.dimension_slice_get_constraint_sql(integer)
_timescaledb_internal.drop_chunk(regclass)
_timescaledb_internal.drop_dist_ht_invalidation_trigger(integer)
_timescaledb_internal.drop_stale_chunks(name,integer[])

View File

@ -26,8 +26,8 @@ ts_test_merge_chunks_on_dimension(PG_FUNCTION_ARGS)
Chunk *chunk = ts_chunk_get_by_relid(chunk_id, true);
Chunk *merge_chunk = ts_chunk_get_by_relid(merge_chunk_id, true);
ts_chunk_merge_on_dimension(chunk, merge_chunk, dimension_id);
Hypertable *ht = ts_hypertable_get_by_id(chunk->fd.hypertable_id);
ts_chunk_merge_on_dimension(ht, chunk, merge_chunk, dimension_id);
PG_RETURN_VOID();
}