mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 18:13:18 +08:00
Improve restriction scanning during hypertable expansion
Improve the performance of metadata scanning during hypertable expansion. When a hypertable is expanded to include all children chunks, only the chunks that match the query restrictions are included. To find the matching chunks, the planner first scans for all matching dimension slices. The chunks that reference those slices are the chunks to include in the expansion. This change optimizes the scanning for slices by avoiding repeated open/close of the dimension slice metadata table and index. At the same time, related dimension slice scanning functions have been refactored along the same line. An index on the chunk constraint metadata table is also changed to allow scanning on dimension_slice_id. Previously, dimension_slice_id was the second key in the index, which made scans on this key less efficient.
This commit is contained in:
parent
966c5eb2c2
commit
c1cf067c4f
@ -165,10 +165,9 @@ CREATE TABLE _timescaledb_catalog.chunk_constraint (
|
||||
UNIQUE (chunk_id, constraint_name)
|
||||
);
|
||||
|
||||
CREATE INDEX chunk_constraint_dimension_slice_id_idx ON _timescaledb_catalog.chunk_constraint (dimension_slice_id);
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_constraint', '');
|
||||
|
||||
CREATE INDEX chunk_constraint_chunk_id_dimension_slice_id_idx ON _timescaledb_catalog.chunk_constraint (chunk_id, dimension_slice_id);
|
||||
|
||||
CREATE SEQUENCE _timescaledb_catalog.chunk_constraint_name;
|
||||
|
||||
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.chunk_constraint_name', '');
|
||||
|
@ -3,3 +3,5 @@ RETURNS TABLE (total_size BIGINT, heap_size BIGINT, index_size BIGINT, toast_siz
|
||||
AS '@MODULE_PATHNAME@', 'ts_relation_size' LANGUAGE C VOLATILE;
|
||||
|
||||
DROP VIEW IF EXISTS _timescaledb_internal.hypertable_chunk_local_size;
|
||||
DROP INDEX IF EXISTS _timescaledb_catalog.chunk_constraint_chunk_id_dimension_slice_id_idx;
|
||||
CREATE INDEX chunk_constraint_dimension_slice_id_idx ON _timescaledb_catalog.chunk_constraint (dimension_slice_id);
|
||||
|
@ -1,2 +1,4 @@
|
||||
DROP VIEW _timescaledb_internal.hypertable_chunk_local_size;
|
||||
DROP FUNCTION _timescaledb_internal.relation_size(relation REGCLASS);
|
||||
DROP INDEX _timescaledb_catalog.chunk_constraint_dimension_slice_id_idx;
|
||||
CREATE INDEX chunk_constraint_chunk_id_dimension_slice_id_idx ON _timescaledb_catalog.chunk_constraint (chunk_id, dimension_slice_id);
|
||||
|
@ -1541,7 +1541,7 @@ ts_chunk_build_from_tuple_and_stub(Chunk **chunkptr, TupleInfo *ti, const ChunkS
|
||||
}
|
||||
else
|
||||
{
|
||||
ScanIterator it = ts_dimension_slice_scan_iterator_create(ti->mctx);
|
||||
ScanIterator it = ts_dimension_slice_scan_iterator_create(NULL, ti->mctx);
|
||||
chunk->cube = ts_hypercube_from_constraints(chunk->constraints, &it);
|
||||
ts_scan_iterator_close(&it);
|
||||
}
|
||||
@ -2314,7 +2314,7 @@ ts_chunk_get_window(int32 dimension_id, int64 point, int count, MemoryContext mc
|
||||
continue;
|
||||
chunk->constraints = ts_chunk_constraint_scan_by_chunk_id(chunk->fd.id, 1, mctx);
|
||||
|
||||
it = ts_dimension_slice_scan_iterator_create(mctx);
|
||||
it = ts_dimension_slice_scan_iterator_create(NULL, mctx);
|
||||
chunk->cube = ts_hypercube_from_constraints(chunk->constraints, &it);
|
||||
ts_scan_iterator_close(&it);
|
||||
|
||||
|
@ -367,14 +367,13 @@ ts_chunk_constraint_scan_iterator_set_slice_id(ScanIterator *it, int32 slice_id)
|
||||
{
|
||||
it->ctx.index = catalog_get_index(ts_catalog_get(),
|
||||
CHUNK_CONSTRAINT,
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
|
||||
CHUNK_CONSTRAINT_DIMENSION_SLICE_ID_IDX);
|
||||
ts_scan_iterator_scan_key_reset(it);
|
||||
ts_scan_iterator_scan_key_init(
|
||||
it,
|
||||
Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_dimension_slice_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(slice_id));
|
||||
ts_scan_iterator_scan_key_init(it,
|
||||
Anum_chunk_constraint_dimension_slice_id_idx_dimension_slice_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(slice_id));
|
||||
}
|
||||
|
||||
void
|
||||
@ -382,10 +381,10 @@ ts_chunk_constraint_scan_iterator_set_chunk_id(ScanIterator *it, int32 chunk_id)
|
||||
{
|
||||
it->ctx.index = catalog_get_index(ts_catalog_get(),
|
||||
CHUNK_CONSTRAINT,
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX);
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_CONSTRAINT_NAME_IDX);
|
||||
ts_scan_iterator_scan_key_reset(it);
|
||||
ts_scan_iterator_scan_key_init(it,
|
||||
Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_chunk_id,
|
||||
Anum_chunk_constraint_chunk_id_constraint_name_idx_chunk_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(chunk_id));
|
||||
|
@ -300,27 +300,37 @@ ts_chunk_scan_by_constraints(const Hyperspace *hs, const List *dimension_vecs,
|
||||
* constraints. Scan the chunk constraints again to get all
|
||||
* constraints.
|
||||
*/
|
||||
for (i = 0; i < chunk_count; i++)
|
||||
|
||||
if (chunk_count > 0)
|
||||
{
|
||||
Chunk *chunk = chunks[i];
|
||||
int num_constraints_hint = chunk->constraints->num_constraints;
|
||||
/*
|
||||
* This chunk constraint scan uses a different index, so need to close
|
||||
* and restart the scan.
|
||||
*/
|
||||
ts_scan_iterator_close(&constr_it);
|
||||
|
||||
chunk->constraints = ts_chunk_constraints_alloc(num_constraints_hint, orig_mcxt);
|
||||
|
||||
ts_chunk_constraint_scan_iterator_set_chunk_id(&constr_it, chunk->fd.id);
|
||||
ts_scan_iterator_rescan(&constr_it);
|
||||
|
||||
while (ts_scan_iterator_next(&constr_it) != NULL)
|
||||
for (i = 0; i < chunk_count; i++)
|
||||
{
|
||||
TupleInfo *constr_ti = ts_scan_iterator_tuple_info(&constr_it);
|
||||
MemoryContextSwitchTo(per_tuple_mcxt);
|
||||
ts_chunk_constraints_add_from_tuple(chunk->constraints, constr_ti);
|
||||
MemoryContextSwitchTo(work_mcxt);
|
||||
Chunk *chunk = chunks[i];
|
||||
int num_constraints_hint = chunk->constraints->num_constraints;
|
||||
|
||||
chunk->constraints = ts_chunk_constraints_alloc(num_constraints_hint, orig_mcxt);
|
||||
|
||||
ts_chunk_constraint_scan_iterator_set_chunk_id(&constr_it, chunk->fd.id);
|
||||
ts_scan_iterator_start_or_restart_scan(&constr_it);
|
||||
|
||||
while (ts_scan_iterator_next(&constr_it) != NULL)
|
||||
{
|
||||
TupleInfo *constr_ti = ts_scan_iterator_tuple_info(&constr_it);
|
||||
MemoryContextSwitchTo(per_tuple_mcxt);
|
||||
ts_chunk_constraints_add_from_tuple(chunk->constraints, constr_ti);
|
||||
MemoryContextSwitchTo(work_mcxt);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Assert(CurrentMemoryContext == work_mcxt);
|
||||
ts_scan_iterator_close(&constr_it);
|
||||
Assert(CurrentMemoryContext == work_mcxt);
|
||||
|
||||
/*
|
||||
* Step 4: Fill in data nodes for remote chunks.
|
||||
|
@ -153,6 +153,7 @@ dimension_vec_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
DimensionVec **slices = data;
|
||||
DimensionSlice *slice;
|
||||
MemoryContext old;
|
||||
|
||||
switch (ti->lockresult)
|
||||
{
|
||||
@ -169,8 +170,11 @@ dimension_vec_tuple_found(TupleInfo *ti, void *data)
|
||||
break;
|
||||
}
|
||||
|
||||
old = MemoryContextSwitchTo(ti->mctx);
|
||||
slice = dimension_slice_from_slot(ti->slot);
|
||||
Assert(NULL != slice);
|
||||
*slices = ts_dimension_vec_add_slice(slices, slice);
|
||||
MemoryContextSwitchTo(old);
|
||||
|
||||
return SCAN_CONTINUE;
|
||||
}
|
||||
@ -264,24 +268,28 @@ ts_dimension_slice_scan_limit(int32 dimension_id, int64 coordinate, int limit,
|
||||
return ts_dimension_vec_sort(&slices);
|
||||
}
|
||||
|
||||
static void
|
||||
dimension_slice_scan_with_strategies(int32 dimension_id, StrategyNumber start_strategy,
|
||||
int64 start_value, StrategyNumber end_strategy,
|
||||
int64 end_value, void *data, tuple_found_func tuple_found,
|
||||
int limit, const ScanTupLock *tuplock)
|
||||
int
|
||||
ts_dimension_slice_scan_iterator_set_range(ScanIterator *it, int32 dimension_id,
|
||||
StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value)
|
||||
{
|
||||
ScanKeyData scankey[3];
|
||||
int nkeys = 1;
|
||||
Catalog *catalog = ts_catalog_get();
|
||||
|
||||
it->ctx.index = catalog_get_index(catalog,
|
||||
DIMENSION_SLICE,
|
||||
DIMENSION_SLICE_DIMENSION_ID_RANGE_START_RANGE_END_IDX);
|
||||
ts_scan_iterator_scan_key_reset(it);
|
||||
ts_scan_iterator_scan_key_init(
|
||||
it,
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_dimension_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(dimension_id));
|
||||
|
||||
/*
|
||||
* Perform an index scan for slices matching the dimension's ID and which
|
||||
* enclose the coordinate.
|
||||
*/
|
||||
ScanKeyInit(&scankey[0],
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_dimension_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(dimension_id));
|
||||
if (start_strategy != InvalidStrategy)
|
||||
{
|
||||
Oid opno = get_opfamily_member(INTEGER_BTREE_FAM_OID, INT8OID, INT8OID, start_strategy);
|
||||
@ -289,11 +297,12 @@ dimension_slice_scan_with_strategies(int32 dimension_id, StrategyNumber start_st
|
||||
|
||||
Assert(OidIsValid(proc));
|
||||
|
||||
ScanKeyInit(&scankey[nkeys++],
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_start,
|
||||
start_strategy,
|
||||
proc,
|
||||
Int64GetDatum(start_value));
|
||||
ts_scan_iterator_scan_key_init(
|
||||
it,
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_start,
|
||||
start_strategy,
|
||||
proc,
|
||||
Int64GetDatum(start_value));
|
||||
}
|
||||
if (end_strategy != InvalidStrategy)
|
||||
{
|
||||
@ -325,22 +334,15 @@ dimension_slice_scan_with_strategies(int32 dimension_id, StrategyNumber start_st
|
||||
end_value = PG_INT64_MAX;
|
||||
}
|
||||
|
||||
ScanKeyInit(&scankey[nkeys++],
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_end,
|
||||
end_strategy,
|
||||
proc,
|
||||
Int64GetDatum(end_value));
|
||||
ts_scan_iterator_scan_key_init(
|
||||
it,
|
||||
Anum_dimension_slice_dimension_id_range_start_range_end_idx_range_end,
|
||||
end_strategy,
|
||||
proc,
|
||||
Int64GetDatum(end_value));
|
||||
}
|
||||
|
||||
dimension_slice_scan_limit_internal(DIMENSION_SLICE_DIMENSION_ID_RANGE_START_RANGE_END_IDX,
|
||||
scankey,
|
||||
nkeys,
|
||||
tuple_found,
|
||||
data,
|
||||
limit,
|
||||
AccessShareLock,
|
||||
tuplock,
|
||||
CurrentMemoryContext);
|
||||
return it->ctx.nkeys;
|
||||
}
|
||||
|
||||
/*
|
||||
@ -354,16 +356,45 @@ ts_dimension_slice_scan_range_limit(int32 dimension_id, StrategyNumber start_str
|
||||
int limit, const ScanTupLock *tuplock)
|
||||
{
|
||||
DimensionVec *slices = ts_dimension_vec_create(limit > 0 ? limit : DIMENSION_VEC_DEFAULT_SIZE);
|
||||
ScanIterator it = ts_dimension_slice_scan_iterator_create(tuplock, CurrentMemoryContext);
|
||||
|
||||
dimension_slice_scan_with_strategies(dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value,
|
||||
&slices,
|
||||
dimension_vec_tuple_found,
|
||||
limit,
|
||||
tuplock);
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value);
|
||||
it.ctx.limit = limit;
|
||||
|
||||
ts_scanner_foreach(&it)
|
||||
{
|
||||
const TupleInfo *ti = ts_scan_iterator_tuple_info(&it);
|
||||
DimensionSlice *slice;
|
||||
MemoryContext old;
|
||||
|
||||
switch (ti->lockresult)
|
||||
{
|
||||
case TM_SelfModified:
|
||||
case TM_Ok:
|
||||
old = MemoryContextSwitchTo(ti->mctx);
|
||||
slice = dimension_slice_from_slot(ti->slot);
|
||||
Assert(NULL != slice);
|
||||
slices = ts_dimension_vec_add_slice(&slices, slice);
|
||||
MemoryContextSwitchTo(old);
|
||||
break;
|
||||
case TM_Deleted:
|
||||
case TM_Updated:
|
||||
/* Treat as not found */
|
||||
break;
|
||||
default:
|
||||
elog(ERROR, "unexpected tuple lock status: %d", ti->lockresult);
|
||||
pg_unreachable();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
Assert(limit <= 0 || slices->num_slices <= limit);
|
||||
ts_scan_iterator_close(&it);
|
||||
|
||||
return ts_dimension_vec_sort(&slices);
|
||||
}
|
||||
@ -670,11 +701,11 @@ ts_dimension_slice_scan_by_id_and_lock(int32 dimension_slice_id, const ScanTupLo
|
||||
}
|
||||
|
||||
ScanIterator
|
||||
ts_dimension_slice_scan_iterator_create(MemoryContext result_mcxt)
|
||||
ts_dimension_slice_scan_iterator_create(const ScanTupLock *tuplock, MemoryContext result_mcxt)
|
||||
{
|
||||
ScanIterator it = ts_scan_iterator_create(DIMENSION_SLICE, AccessShareLock, result_mcxt);
|
||||
it.ctx.index = catalog_get_index(ts_catalog_get(), DIMENSION_SLICE, DIMENSION_SLICE_ID_IDX);
|
||||
it.ctx.flags |= SCANNER_F_NOEND_AND_NOCLOSE;
|
||||
it.ctx.tuplock = tuplock;
|
||||
|
||||
return it;
|
||||
}
|
||||
@ -683,6 +714,7 @@ void
|
||||
ts_dimension_slice_scan_iterator_set_slice_id(ScanIterator *it, int32 slice_id,
|
||||
const ScanTupLock *tuplock)
|
||||
{
|
||||
it->ctx.index = catalog_get_index(ts_catalog_get(), DIMENSION_SLICE, DIMENSION_SLICE_ID_IDX);
|
||||
ts_scan_iterator_scan_key_reset(it);
|
||||
ts_scan_iterator_scan_key_init(it,
|
||||
Anum_dimension_slice_id_idx_id,
|
||||
@ -903,101 +935,59 @@ ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n)
|
||||
return ret;
|
||||
}
|
||||
|
||||
typedef struct ChunkStatInfo
|
||||
{
|
||||
int32 chunk_id;
|
||||
int32 job_id;
|
||||
} ChunkStatInfo;
|
||||
|
||||
/* Check that a a) job has not already been executed for the chunk and b) chunk is not compressed
|
||||
* (a compressed chunk should not be reordered).*/
|
||||
static ScanTupleResult
|
||||
dimension_slice_check_chunk_stats_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
ListCell *lc;
|
||||
DimensionSlice *slice = dimension_slice_from_slot(ti->slot);
|
||||
List *chunk_ids = NIL;
|
||||
ChunkStatInfo *info = data;
|
||||
|
||||
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext);
|
||||
|
||||
foreach (lc, chunk_ids)
|
||||
{
|
||||
/* Look for a chunk that a) doesn't have a job stat (reorder ) and b) is not compressed
|
||||
* (should not reorder a compressed chunk) */
|
||||
int chunk_id = lfirst_int(lc);
|
||||
BgwPolicyChunkStats *chunk_stat = ts_bgw_policy_chunk_stats_find(info->job_id, chunk_id);
|
||||
|
||||
if ((chunk_stat == NULL || chunk_stat->fd.num_times_job_run == 0) &&
|
||||
ts_chunk_get_compression_status(chunk_id) == CHUNK_COMPRESS_NONE)
|
||||
{
|
||||
/* Save the chunk_id */
|
||||
info->chunk_id = chunk_id;
|
||||
return SCAN_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
return SCAN_CONTINUE;
|
||||
}
|
||||
|
||||
int
|
||||
int32
|
||||
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
|
||||
StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value)
|
||||
{
|
||||
ChunkStatInfo info = {
|
||||
.job_id = job_id,
|
||||
.chunk_id = -1,
|
||||
};
|
||||
int32 result_chunk_id = -1;
|
||||
ScanIterator it = ts_dimension_slice_scan_iterator_create(NULL, CurrentMemoryContext);
|
||||
bool done = false;
|
||||
|
||||
dimension_slice_scan_with_strategies(dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value,
|
||||
&info,
|
||||
dimension_slice_check_chunk_stats_tuple_found,
|
||||
-1,
|
||||
NULL);
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value);
|
||||
ts_scan_iterator_start_scan(&it);
|
||||
|
||||
return info.chunk_id;
|
||||
}
|
||||
|
||||
typedef struct CompressChunkSearch
|
||||
{
|
||||
List *chunk_ids; /* list of chunk ids that match search */
|
||||
int32 maxchunks; /*max number of chunks to return */
|
||||
bool compress;
|
||||
bool recompress;
|
||||
} CompressChunkSearch;
|
||||
|
||||
static ScanTupleResult
|
||||
dimension_slice_check_is_chunk_uncompressed_tuple_found(TupleInfo *ti, void *data)
|
||||
{
|
||||
ListCell *lc;
|
||||
DimensionSlice *slice = dimension_slice_from_slot(ti->slot);
|
||||
List *chunk_ids = NIL;
|
||||
CompressChunkSearch *d = data;
|
||||
|
||||
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice, &chunk_ids, CurrentMemoryContext);
|
||||
|
||||
foreach (lc, chunk_ids)
|
||||
while (!done)
|
||||
{
|
||||
int32 chunk_id = lfirst_int(lc);
|
||||
ChunkCompressionStatus st = ts_chunk_get_compression_status(chunk_id);
|
||||
if ((d->compress && st == CHUNK_COMPRESS_NONE) ||
|
||||
(d->recompress && st == CHUNK_COMPRESS_UNORDERED))
|
||||
const TupleInfo *ti = ts_scan_iterator_next(&it);
|
||||
ListCell *lc;
|
||||
DimensionSlice *slice;
|
||||
List *chunk_ids = NIL;
|
||||
|
||||
if (NULL == ti)
|
||||
break;
|
||||
|
||||
slice = dimension_slice_from_slot(ti->slot);
|
||||
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice,
|
||||
&chunk_ids,
|
||||
CurrentMemoryContext);
|
||||
|
||||
foreach (lc, chunk_ids)
|
||||
{
|
||||
/* found a chunk that is not compressed or needs recompress
|
||||
* caller needs to check the correct chunk status
|
||||
*/
|
||||
d->chunk_ids = lappend_int(d->chunk_ids, chunk_id);
|
||||
if (d->maxchunks > 0 && list_length(d->chunk_ids) >= d->maxchunks)
|
||||
return SCAN_DONE;
|
||||
/* Look for a chunk that a) doesn't have a job stat (reorder ) and b) is not compressed
|
||||
* (should not reorder a compressed chunk) */
|
||||
int32 chunk_id = lfirst_int(lc);
|
||||
BgwPolicyChunkStats *chunk_stat = ts_bgw_policy_chunk_stats_find(job_id, chunk_id);
|
||||
|
||||
if ((chunk_stat == NULL || chunk_stat->fd.num_times_job_run == 0) &&
|
||||
ts_chunk_get_compression_status(chunk_id) == CHUNK_COMPRESS_NONE)
|
||||
{
|
||||
/* Save the chunk_id */
|
||||
result_chunk_id = chunk_id;
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return SCAN_CONTINUE;
|
||||
ts_scan_iterator_close(&it);
|
||||
|
||||
return result_chunk_id;
|
||||
}
|
||||
|
||||
List *
|
||||
@ -1006,19 +996,58 @@ ts_dimension_slice_get_chunkids_to_compress(int32 dimension_id, StrategyNumber s
|
||||
int64 end_value, bool compress, bool recompress,
|
||||
int32 numchunks)
|
||||
{
|
||||
CompressChunkSearch data = { .compress = compress,
|
||||
.recompress = recompress,
|
||||
.chunk_ids = NIL,
|
||||
.maxchunks = numchunks > 0 ? numchunks : -1 };
|
||||
dimension_slice_scan_with_strategies(dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value,
|
||||
&data,
|
||||
dimension_slice_check_is_chunk_uncompressed_tuple_found,
|
||||
-1,
|
||||
NULL);
|
||||
List *chunk_ids = NIL;
|
||||
int32 maxchunks = numchunks > 0 ? numchunks : -1;
|
||||
ScanIterator it = ts_dimension_slice_scan_iterator_create(NULL, CurrentMemoryContext);
|
||||
bool done = false;
|
||||
|
||||
return data.chunk_ids;
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
dimension_id,
|
||||
start_strategy,
|
||||
start_value,
|
||||
end_strategy,
|
||||
end_value);
|
||||
ts_scan_iterator_start_scan(&it);
|
||||
|
||||
while (!done)
|
||||
{
|
||||
DimensionSlice *slice;
|
||||
TupleInfo *ti;
|
||||
ListCell *lc;
|
||||
List *slice_chunk_ids = NIL;
|
||||
|
||||
ti = ts_scan_iterator_next(&it);
|
||||
|
||||
if (NULL == ti)
|
||||
break;
|
||||
|
||||
slice = dimension_slice_from_slot(ti->slot);
|
||||
ts_chunk_constraint_scan_by_dimension_slice_to_list(slice,
|
||||
&slice_chunk_ids,
|
||||
CurrentMemoryContext);
|
||||
foreach (lc, slice_chunk_ids)
|
||||
{
|
||||
int32 chunk_id = lfirst_int(lc);
|
||||
ChunkCompressionStatus st = ts_chunk_get_compression_status(chunk_id);
|
||||
|
||||
if ((compress && st == CHUNK_COMPRESS_NONE) ||
|
||||
(recompress && st == CHUNK_COMPRESS_UNORDERED))
|
||||
{
|
||||
/* found a chunk that is not compressed or needs recompress
|
||||
* caller needs to check the correct chunk status
|
||||
*/
|
||||
chunk_ids = lappend_int(chunk_ids, chunk_id);
|
||||
|
||||
if (maxchunks > 0 && list_length(chunk_ids) >= maxchunks)
|
||||
{
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ts_scan_iterator_close(&it);
|
||||
|
||||
return chunk_ids;
|
||||
}
|
||||
|
@ -76,21 +76,26 @@ extern int ts_dimension_slice_cmp(const DimensionSlice *left, const DimensionSli
|
||||
extern int ts_dimension_slice_cmp_coordinate(const DimensionSlice *slice, int64 coord);
|
||||
|
||||
extern TSDLLEXPORT DimensionSlice *ts_dimension_slice_nth_latest_slice(int32 dimension_id, int n);
|
||||
extern TSDLLEXPORT int
|
||||
ts_dimension_slice_oldest_valid_chunk_for_reorder(int32 job_id, int32 dimension_id,
|
||||
StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value);
|
||||
extern TSDLLEXPORT int32 ts_dimension_slice_oldest_valid_chunk_for_reorder(
|
||||
int32 job_id, int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value);
|
||||
extern TSDLLEXPORT List *ts_dimension_slice_get_chunkids_to_compress(
|
||||
int32 dimension_id, StrategyNumber start_strategy, int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value, bool compress, bool recompress, int32 numchunks);
|
||||
|
||||
extern DimensionSlice *ts_dimension_slice_from_tuple(TupleInfo *ti);
|
||||
extern ScanIterator ts_dimension_slice_scan_iterator_create(MemoryContext result_mcxt);
|
||||
extern ScanIterator ts_dimension_slice_scan_iterator_create(const ScanTupLock *tuplock,
|
||||
MemoryContext result_mcxt);
|
||||
extern void ts_dimension_slice_scan_iterator_set_slice_id(ScanIterator *it, int32 slice_id,
|
||||
const ScanTupLock *tuplock);
|
||||
extern DimensionSlice *ts_dimension_slice_scan_iterator_get_by_id(ScanIterator *it, int32 slice_id,
|
||||
const ScanTupLock *tuplock);
|
||||
|
||||
extern int ts_dimension_slice_scan_iterator_set_range(ScanIterator *it, int32 dimension_id,
|
||||
StrategyNumber start_strategy,
|
||||
int64 start_value,
|
||||
StrategyNumber end_strategy, int64 end_value);
|
||||
|
||||
#define dimension_slice_insert(slice) ts_dimension_slice_insert_multi(&(slice), 1)
|
||||
|
||||
#define dimension_slice_scan(dimension_id, coordinate, tuplock) \
|
||||
|
@ -67,7 +67,8 @@ ts_dimension_vec_sort(DimensionVec **vecptr)
|
||||
{
|
||||
DimensionVec *vec = *vecptr;
|
||||
|
||||
qsort(vec->slices, vec->num_slices, sizeof(DimensionSlice *), cmp_slices);
|
||||
if (vec->num_slices > 1)
|
||||
qsort(vec->slices, vec->num_slices, sizeof(DimensionSlice *), cmp_slices);
|
||||
|
||||
return vec;
|
||||
}
|
||||
@ -77,7 +78,8 @@ ts_dimension_vec_sort_reverse(DimensionVec **vecptr)
|
||||
{
|
||||
DimensionVec *vec = *vecptr;
|
||||
|
||||
qsort(vec->slices, vec->num_slices, sizeof(DimensionSlice *), cmp_slices_reverse);
|
||||
if (vec->num_slices > 1)
|
||||
qsort(vec->slices, vec->num_slices, sizeof(DimensionSlice *), cmp_slices_reverse);
|
||||
|
||||
return vec;
|
||||
}
|
||||
|
@ -20,10 +20,11 @@
|
||||
#include "dimension_vector.h"
|
||||
#include "partitioning.h"
|
||||
#include "chunk_scan.h"
|
||||
#include "scan_iterator.h"
|
||||
|
||||
typedef struct DimensionRestrictInfo
|
||||
{
|
||||
Dimension *dimension;
|
||||
const Dimension *dimension;
|
||||
} DimensionRestrictInfo;
|
||||
|
||||
typedef struct DimensionRestrictInfoOpen
|
||||
@ -50,7 +51,7 @@ typedef struct DimensionValues
|
||||
} DimensionValues;
|
||||
|
||||
static DimensionRestrictInfoOpen *
|
||||
dimension_restrict_info_open_create(Dimension *d)
|
||||
dimension_restrict_info_open_create(const Dimension *d)
|
||||
{
|
||||
DimensionRestrictInfoOpen *new = palloc(sizeof(DimensionRestrictInfoOpen));
|
||||
|
||||
@ -61,7 +62,7 @@ dimension_restrict_info_open_create(Dimension *d)
|
||||
}
|
||||
|
||||
static DimensionRestrictInfoClosed *
|
||||
dimension_restrict_info_closed_create(Dimension *d)
|
||||
dimension_restrict_info_closed_create(const Dimension *d)
|
||||
{
|
||||
DimensionRestrictInfoClosed *new = palloc(sizeof(DimensionRestrictInfoClosed));
|
||||
|
||||
@ -72,7 +73,7 @@ dimension_restrict_info_closed_create(Dimension *d)
|
||||
}
|
||||
|
||||
static DimensionRestrictInfo *
|
||||
dimension_restrict_info_create(Dimension *d)
|
||||
dimension_restrict_info_create(const Dimension *d)
|
||||
{
|
||||
switch (d->type)
|
||||
{
|
||||
@ -232,71 +233,6 @@ dimension_restrict_info_add(DimensionRestrictInfo *dri, int strategy, Oid collat
|
||||
}
|
||||
}
|
||||
|
||||
static DimensionVec *
|
||||
dimension_restrict_info_open_slices(DimensionRestrictInfoOpen *dri)
|
||||
{
|
||||
/* basic idea: slice_end > lower_bound && slice_start < upper_bound */
|
||||
return ts_dimension_slice_scan_range_limit(dri->base.dimension->fd.id,
|
||||
dri->upper_strategy,
|
||||
dri->upper_bound,
|
||||
dri->lower_strategy,
|
||||
dri->lower_bound,
|
||||
0,
|
||||
NULL);
|
||||
}
|
||||
|
||||
static DimensionVec *
|
||||
dimension_restrict_info_closed_slices(DimensionRestrictInfoClosed *dri)
|
||||
{
|
||||
if (dri->strategy == BTEqualStrategyNumber)
|
||||
{
|
||||
/* slice_end >= value && slice_start <= value */
|
||||
ListCell *cell;
|
||||
DimensionVec *dim_vec = ts_dimension_vec_create(DIMENSION_VEC_DEFAULT_SIZE);
|
||||
|
||||
foreach (cell, dri->partitions)
|
||||
{
|
||||
int i;
|
||||
int32 partition = lfirst_int(cell);
|
||||
DimensionVec *tmp = ts_dimension_slice_scan_range_limit(dri->base.dimension->fd.id,
|
||||
BTLessEqualStrategyNumber,
|
||||
partition,
|
||||
BTGreaterEqualStrategyNumber,
|
||||
partition,
|
||||
0,
|
||||
NULL);
|
||||
|
||||
for (i = 0; i < tmp->num_slices; i++)
|
||||
dim_vec = ts_dimension_vec_add_unique_slice(&dim_vec, tmp->slices[i]);
|
||||
}
|
||||
return dim_vec;
|
||||
}
|
||||
|
||||
/* get all slices */
|
||||
return ts_dimension_slice_scan_range_limit(dri->base.dimension->fd.id,
|
||||
InvalidStrategy,
|
||||
-1,
|
||||
InvalidStrategy,
|
||||
-1,
|
||||
0,
|
||||
NULL);
|
||||
}
|
||||
|
||||
static DimensionVec *
|
||||
dimension_restrict_info_slices(DimensionRestrictInfo *dri)
|
||||
{
|
||||
switch (dri->dimension->type)
|
||||
{
|
||||
case DIMENSION_TYPE_OPEN:
|
||||
return dimension_restrict_info_open_slices((DimensionRestrictInfoOpen *) dri);
|
||||
case DIMENSION_TYPE_CLOSED:
|
||||
return dimension_restrict_info_closed_slices((DimensionRestrictInfoClosed *) dri);
|
||||
default:
|
||||
elog(ERROR, "unknown dimension type");
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct HypertableRestrictInfo
|
||||
{
|
||||
int num_base_restrictions; /* number of base restrictions
|
||||
@ -406,7 +342,6 @@ hypertable_restrict_info_add_expr(HypertableRestrictInfo *hri, PlannerInfo *root
|
||||
return false;
|
||||
|
||||
get_op_opfamily_properties(op_oid, tce->btree_opf, false, &strategy, &lefttype, &righttype);
|
||||
|
||||
dimvalues = func_get_dim_values(c, use_or);
|
||||
return dimension_restrict_info_add(dri, strategy, c->constcollid, dimvalues);
|
||||
}
|
||||
@ -525,33 +460,134 @@ ts_hypertable_restrict_info_has_restrictions(HypertableRestrictInfo *hri)
|
||||
return hri->num_base_restrictions > 0;
|
||||
}
|
||||
|
||||
static List *
|
||||
gather_restriction_dimension_vectors(HypertableRestrictInfo *hri)
|
||||
/*
|
||||
* Scan for dimension slices matching query constraints.
|
||||
*
|
||||
* Matching slices are appended to to the given dimension vector. Note that we
|
||||
* keep the table and index open as long as we do not change the number of
|
||||
* scan keys. If the keys change, but the number of keys is the same, we can
|
||||
* simply "rescan". If the number of keys change, however, we need to end the
|
||||
* scan and start again.
|
||||
*/
|
||||
static DimensionVec *
|
||||
scan_and_append_slices(ScanIterator *it, int old_nkeys, DimensionVec **dv, bool unique)
|
||||
{
|
||||
if (old_nkeys != -1 && old_nkeys != it->ctx.nkeys)
|
||||
ts_scan_iterator_end(it);
|
||||
|
||||
ts_scan_iterator_start_or_restart_scan(it);
|
||||
|
||||
while (ts_scan_iterator_next(it))
|
||||
{
|
||||
TupleInfo *ti = ts_scan_iterator_tuple_info(it);
|
||||
DimensionSlice *slice = ts_dimension_slice_from_tuple(ti);
|
||||
|
||||
if (NULL != slice)
|
||||
{
|
||||
if (unique)
|
||||
*dv = ts_dimension_vec_add_unique_slice(dv, slice);
|
||||
else
|
||||
*dv = ts_dimension_vec_add_slice(dv, slice);
|
||||
}
|
||||
}
|
||||
|
||||
return *dv;
|
||||
}
|
||||
|
||||
static List *
|
||||
gather_restriction_dimension_vectors(const HypertableRestrictInfo *hri)
|
||||
{
|
||||
int i;
|
||||
List *dimension_vecs = NIL;
|
||||
ScanIterator it;
|
||||
int i;
|
||||
int old_nkeys = -1;
|
||||
|
||||
it = ts_dimension_slice_scan_iterator_create(NULL, CurrentMemoryContext);
|
||||
|
||||
for (i = 0; i < hri->num_dimensions; i++)
|
||||
{
|
||||
DimensionRestrictInfo *dri = hri->dimension_restriction[i];
|
||||
DimensionVec *dv;
|
||||
const DimensionRestrictInfo *dri = hri->dimension_restriction[i];
|
||||
DimensionVec *dv = ts_dimension_vec_create(DIMENSION_VEC_DEFAULT_SIZE);
|
||||
|
||||
Assert(NULL != dri);
|
||||
|
||||
dv = dimension_restrict_info_slices(dri);
|
||||
switch (dri->dimension->type)
|
||||
{
|
||||
case DIMENSION_TYPE_OPEN:
|
||||
{
|
||||
const DimensionRestrictInfoOpen *open = (const DimensionRestrictInfoOpen *) dri;
|
||||
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
open->base.dimension->fd.id,
|
||||
open->upper_strategy,
|
||||
open->upper_bound,
|
||||
open->lower_strategy,
|
||||
open->lower_bound);
|
||||
|
||||
dv = scan_and_append_slices(&it, old_nkeys, &dv, false);
|
||||
break;
|
||||
}
|
||||
case DIMENSION_TYPE_CLOSED:
|
||||
{
|
||||
const DimensionRestrictInfoClosed *closed =
|
||||
(const DimensionRestrictInfoClosed *) dri;
|
||||
|
||||
if (closed->strategy == BTEqualStrategyNumber)
|
||||
{
|
||||
/* slice_end >= value && slice_start <= value */
|
||||
ListCell *cell;
|
||||
|
||||
foreach (cell, closed->partitions)
|
||||
{
|
||||
int32 partition = lfirst_int(cell);
|
||||
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
dri->dimension->fd.id,
|
||||
BTLessEqualStrategyNumber,
|
||||
partition,
|
||||
BTGreaterEqualStrategyNumber,
|
||||
partition);
|
||||
|
||||
dv = scan_and_append_slices(&it, old_nkeys, &dv, true);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
ts_dimension_slice_scan_iterator_set_range(&it,
|
||||
dri->dimension->fd.id,
|
||||
InvalidStrategy,
|
||||
-1,
|
||||
InvalidStrategy,
|
||||
-1);
|
||||
dv = scan_and_append_slices(&it, old_nkeys, &dv, false);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
default:
|
||||
elog(ERROR, "unknown dimension type");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Assert(dv->num_slices >= 0);
|
||||
|
||||
/*
|
||||
* If there are no matching slices in any single dimension, the result
|
||||
* will be empty
|
||||
* If there is a dimension where no slices match, the result will be
|
||||
* empty.
|
||||
*/
|
||||
if (dv->num_slices == 0)
|
||||
{
|
||||
ts_scan_iterator_close(&it);
|
||||
return NIL;
|
||||
}
|
||||
|
||||
dv = ts_dimension_vec_sort(&dv);
|
||||
dimension_vecs = lappend(dimension_vecs, dv);
|
||||
old_nkeys = it.ctx.nkeys;
|
||||
}
|
||||
|
||||
ts_scan_iterator_close(&it);
|
||||
|
||||
Assert(list_length(dimension_vecs) == hri->num_dimensions);
|
||||
|
||||
return dimension_vecs;
|
||||
|
@ -159,7 +159,7 @@ static const TableIndexDef catalog_table_index_definitions[_MAX_CATALOG_TABLES]
|
||||
.length = _MAX_CHUNK_CONSTRAINT_INDEX,
|
||||
.names = (char *[]) {
|
||||
[CHUNK_CONSTRAINT_CHUNK_ID_CONSTRAINT_NAME_IDX] = "chunk_constraint_chunk_id_constraint_name_key",
|
||||
[CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX] = "chunk_constraint_chunk_id_dimension_slice_id_idx",
|
||||
[CHUNK_CONSTRAINT_DIMENSION_SLICE_ID_IDX] = "chunk_constraint_dimension_slice_id_idx",
|
||||
},
|
||||
},
|
||||
[CHUNK_INDEX] = {
|
||||
|
@ -442,15 +442,14 @@ typedef FormData_chunk_constraint *Form_chunk_constraint;
|
||||
enum
|
||||
{
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_CONSTRAINT_NAME_IDX = 0,
|
||||
CHUNK_CONSTRAINT_CHUNK_ID_DIMENSION_SLICE_ID_IDX,
|
||||
CHUNK_CONSTRAINT_DIMENSION_SLICE_ID_IDX,
|
||||
_MAX_CHUNK_CONSTRAINT_INDEX,
|
||||
};
|
||||
|
||||
enum Anum_chunk_constraint_chunk_id_dimension_slice_id_idx
|
||||
enum Anum_chunk_constraint_dimension_slice_id_idx
|
||||
{
|
||||
Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_chunk_id = 1,
|
||||
Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_dimension_slice_id,
|
||||
_Anum_chunk_constraint_chunk_id_dimension_slice_id_idx_max,
|
||||
Anum_chunk_constraint_dimension_slice_id_idx_dimension_slice_id = 1,
|
||||
_Anum_chunk_constraint_dimension_slice_id_idx_max,
|
||||
};
|
||||
|
||||
enum Anum_chunk_constraint_chunk_id_constraint_name_idx
|
||||
|
@ -870,9 +870,9 @@ SELECT * FROM test.show_constraints(format('%I.%I', :'CHUNK_SCHEMA', :'CHUNK_NAM
|
||||
SELECT * FROM original_chunk_constraints_metadata;
|
||||
chunk_id | dimension_slice_id | constraint_name | hypertable_constraint_name
|
||||
----------+--------------------+---------------------------+----------------------------
|
||||
10 | 15 | constraint_15 |
|
||||
10 | | 10_1_chunkapi_device_fkey | chunkapi_device_fkey
|
||||
10 | | 10_2_chunkapi_pkey | chunkapi_pkey
|
||||
10 | 15 | constraint_15 |
|
||||
(3 rows)
|
||||
|
||||
SELECT
|
||||
@ -885,9 +885,9 @@ INNER JOIN _timescaledb_catalog.chunk ch ON (con.chunk_id = ch.id)
|
||||
WHERE ch.schema_name = :'CHUNK_SCHEMA' AND ch.table_name = :'CHUNK_NAME';
|
||||
chunk_id | dimension_slice_id | constraint_name | hypertable_constraint_name
|
||||
----------+--------------------+---------------------------+----------------------------
|
||||
11 | 16 | constraint_16 |
|
||||
11 | | 11_3_chunkapi_device_fkey | chunkapi_device_fkey
|
||||
11 | | 11_4_chunkapi_pkey | chunkapi_pkey
|
||||
11 | 16 | constraint_16 |
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE original_chunk_constraints;
|
||||
|
Loading…
x
Reference in New Issue
Block a user