mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 18:43:18 +08:00
Delete related catalog rows when continuous aggs are dropped
This PR deletes related rows from the following tables * completed_threshold * invalidation threshold * hypertable invalidation log The latter two tables are only affected if no other continuous aggs exist on the raw hyperatble. This commit also adds locks to prevent concurrent raw table inserts and any access to the materialization table when dropping caggs. It also moves all locks to the beginning of the function so that the lock order is easier to track and reason about. Also added a few formatting fixes.
This commit is contained in:
parent
1cbd8c74f7
commit
19d47daf23
@ -230,7 +230,7 @@ CREATE TABLE IF NOT EXISTS _timescaledb_catalog.continuous_agg (
|
||||
bucket_width BIGINT NOT NULL,
|
||||
job_id INTEGER UNIQUE NOT NULL REFERENCES _timescaledb_config.bgw_job(id) ON DELETE RESTRICT,
|
||||
refresh_lag BIGINT NOT NULL,
|
||||
user_view_query pg_node_tree NOT NULL,
|
||||
user_view_query pg_node_tree NOT NULL,
|
||||
UNIQUE(user_view_schema, user_view_name),
|
||||
UNIQUE(partial_view_schema, partial_view_name)
|
||||
);
|
||||
|
@ -42,6 +42,116 @@ init_scan_by_mat_hypertable_id(ScanIterator *iterator, const int32 mat_hypertabl
|
||||
Int32GetDatum(mat_hypertable_id));
|
||||
}
|
||||
|
||||
static void
|
||||
init_completed_threshold_scan_by_mat_id(ScanIterator *iterator, const int32 mat_hypertable_id)
|
||||
{
|
||||
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
|
||||
CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
|
||||
CONTINUOUS_AGGS_COMPLETED_THRESHOLD_PKEY);
|
||||
|
||||
ts_scan_iterator_scan_key_init(iterator,
|
||||
Anum_continuous_aggs_completed_threshold_pkey_materialization_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(mat_hypertable_id));
|
||||
}
|
||||
|
||||
static void
|
||||
init_invalidation_threshold_scan_by_hypertable_id(ScanIterator *iterator,
|
||||
const int32 raw_hypertable_id)
|
||||
{
|
||||
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
|
||||
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
|
||||
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY);
|
||||
|
||||
ts_scan_iterator_scan_key_init(iterator,
|
||||
Anum_continuous_aggs_invalidation_threshold_pkey_hypertable_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(raw_hypertable_id));
|
||||
}
|
||||
|
||||
static void
|
||||
init_hypertable_invalidation_log_scan_by_hypertable_id(ScanIterator *iterator,
|
||||
const int32 raw_hypertable_id)
|
||||
{
|
||||
iterator->ctx.index = catalog_get_index(ts_catalog_get(),
|
||||
CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG,
|
||||
CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG_IDX);
|
||||
|
||||
ts_scan_iterator_scan_key_init(
|
||||
iterator,
|
||||
Anum_continuous_aggs_hypertable_invalidation_log_idx_hypertable_id,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ,
|
||||
Int32GetDatum(raw_hypertable_id));
|
||||
}
|
||||
|
||||
static int32
|
||||
number_of_continuous_aggs_attached(int32 raw_hypertable_id)
|
||||
{
|
||||
ScanIterator iterator =
|
||||
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
|
||||
int32 count = 0;
|
||||
|
||||
ts_scanner_foreach(&iterator)
|
||||
{
|
||||
FormData_continuous_agg *data =
|
||||
(FormData_continuous_agg *) GETSTRUCT(ts_scan_iterator_tuple(&iterator));
|
||||
if (data->raw_hypertable_id == raw_hypertable_id)
|
||||
count++;
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
static void
|
||||
completed_threshold_delete(int32 materialization_id)
|
||||
{
|
||||
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_COMPLETED_THRESHOLD,
|
||||
RowExclusiveLock,
|
||||
CurrentMemoryContext);
|
||||
|
||||
init_completed_threshold_scan_by_mat_id(&iterator, materialization_id);
|
||||
|
||||
ts_scanner_foreach(&iterator)
|
||||
{
|
||||
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
|
||||
ts_catalog_delete(ti->scanrel, ti->tuple);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
invalidation_threshold_delete(int32 raw_hypertable_id)
|
||||
{
|
||||
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD,
|
||||
RowExclusiveLock,
|
||||
CurrentMemoryContext);
|
||||
|
||||
init_invalidation_threshold_scan_by_hypertable_id(&iterator, raw_hypertable_id);
|
||||
|
||||
ts_scanner_foreach(&iterator)
|
||||
{
|
||||
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
|
||||
ts_catalog_delete(ti->scanrel, ti->tuple);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
hypertable_invalidation_log_delete(int32 raw_hypertable_id)
|
||||
{
|
||||
ScanIterator iterator = ts_scan_iterator_create(CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG,
|
||||
RowExclusiveLock,
|
||||
CurrentMemoryContext);
|
||||
|
||||
init_hypertable_invalidation_log_scan_by_hypertable_id(&iterator, raw_hypertable_id);
|
||||
|
||||
ts_scanner_foreach(&iterator)
|
||||
{
|
||||
TupleInfo *ti = ts_scan_iterator_tuple_info(&iterator);
|
||||
ts_catalog_delete(ti->scanrel, ti->tuple);
|
||||
}
|
||||
}
|
||||
|
||||
static void
|
||||
continuous_agg_init(ContinuousAgg *cagg, FormData_continuous_agg *fd)
|
||||
{
|
||||
@ -89,16 +199,16 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
|
||||
{
|
||||
ScanIterator iterator =
|
||||
ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext);
|
||||
ObjectAddress user_view, partial_view, rawht_trig;
|
||||
Catalog *catalog = ts_catalog_get();
|
||||
ObjectAddress user_view = { .objectId = InvalidOid }, partial_view = { .objectId = InvalidOid },
|
||||
rawht_trig = { .objectId = InvalidOid };
|
||||
Hypertable *mat_hypertable, *raw_hypertable;
|
||||
int count = 0;
|
||||
Oid rawht_trigoid;
|
||||
int32 count = 0;
|
||||
bool raw_hypertable_has_other_caggs = true;
|
||||
bool raw_hypertable_exists;
|
||||
|
||||
/* NOTE: the lock order matters, see tsl/src/materialization.c. Perform all locking upfront */
|
||||
|
||||
/* NOTE: the order in which we drop matters, it must obey obey the lock order,
|
||||
* see tsl/src/materialization.c
|
||||
*/
|
||||
// TODO decide on where the bgw job table should be locked
|
||||
/* Delete view itself */
|
||||
if (drop_user_view)
|
||||
{
|
||||
user_view = (ObjectAddress){
|
||||
@ -108,11 +218,62 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
|
||||
get_namespace_oid(NameStr(agg->data.user_view_schema), false)),
|
||||
};
|
||||
LockRelationOid(user_view.objectId, AccessExclusiveLock);
|
||||
performDeletion(&user_view, DROP_RESTRICT, 0);
|
||||
}
|
||||
|
||||
/* Delete catalog entry. */
|
||||
raw_hypertable = ts_hypertable_get_by_id(agg->data.raw_hypertable_id);
|
||||
/* The raw hypertable might be already dropped if this is a cascade from that drop */
|
||||
raw_hypertable_exists =
|
||||
(raw_hypertable != NULL && OidIsValid(raw_hypertable->main_table_relid));
|
||||
if (raw_hypertable_exists)
|
||||
/* AccessExclusiveLock is needed to drop triggers.
|
||||
* Also prevent concurrent DML commands */
|
||||
LockRelationOid(raw_hypertable->main_table_relid, AccessExclusiveLock);
|
||||
mat_hypertable = ts_hypertable_get_by_id(agg->data.mat_hypertable_id);
|
||||
/* AccessExclusiveLock is needed to drop this table. */
|
||||
LockRelationOid(mat_hypertable->main_table_relid, AccessExclusiveLock);
|
||||
|
||||
/* lock catalogs */
|
||||
LockRelationOid(catalog_get_table_id(catalog, BGW_JOB), RowExclusiveLock);
|
||||
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGG), RowExclusiveLock);
|
||||
raw_hypertable_has_other_caggs = number_of_continuous_aggs_attached(raw_hypertable->fd.id) > 1;
|
||||
if (!raw_hypertable_has_other_caggs)
|
||||
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_HYPERTABLE_INVALIDATION_LOG),
|
||||
RowExclusiveLock);
|
||||
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_COMPLETED_THRESHOLD),
|
||||
RowExclusiveLock);
|
||||
if (!raw_hypertable_has_other_caggs)
|
||||
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
|
||||
RowExclusiveLock);
|
||||
|
||||
/* The trigger will be dropped if the hypertable still exists and no other caggs attached */
|
||||
if (!raw_hypertable_has_other_caggs && raw_hypertable_exists)
|
||||
{
|
||||
Oid rawht_trigoid =
|
||||
get_trigger_oid(raw_hypertable->main_table_relid, CAGGINVAL_TRIGGER_NAME, false);
|
||||
rawht_trig = (ObjectAddress){ .classId = TriggerRelationId,
|
||||
.objectId = rawht_trigoid,
|
||||
.objectSubId = 0 };
|
||||
/* raw hypertable is locked above */
|
||||
LockRelationOid(rawht_trigoid, AccessExclusiveLock);
|
||||
}
|
||||
|
||||
partial_view = (ObjectAddress){
|
||||
.classId = RelationRelationId,
|
||||
.objectId =
|
||||
get_relname_relid(NameStr(agg->data.partial_view_name),
|
||||
get_namespace_oid(NameStr(agg->data.partial_view_schema), false)),
|
||||
};
|
||||
/* The partial view may already be dropped by PG's dependency system (e.g. the raw table was
|
||||
* dropped) */
|
||||
if (OidIsValid(partial_view.objectId))
|
||||
LockRelationOid(partial_view.objectId, AccessExclusiveLock);
|
||||
|
||||
/* END OF LOCKING. Perform actual deletions now. */
|
||||
|
||||
if (OidIsValid(user_view.objectId))
|
||||
performDeletion(&user_view, DROP_RESTRICT, 0);
|
||||
|
||||
/* Delete catalog entry. */
|
||||
init_scan_by_mat_hypertable_id(&iterator, agg->data.mat_hypertable_id);
|
||||
ts_scanner_foreach(&iterator)
|
||||
{
|
||||
@ -122,36 +283,25 @@ drop_continuous_agg(ContinuousAgg *agg, bool drop_user_view)
|
||||
/* delete the job */
|
||||
ts_bgw_job_delete_by_id(form->job_id);
|
||||
ts_catalog_delete(ti->scanrel, ti->tuple);
|
||||
|
||||
/* delete all related rows */
|
||||
if (!raw_hypertable_has_other_caggs)
|
||||
hypertable_invalidation_log_delete(form->raw_hypertable_id);
|
||||
|
||||
completed_threshold_delete(form->mat_hypertable_id);
|
||||
|
||||
if (!raw_hypertable_has_other_caggs)
|
||||
invalidation_threshold_delete(form->raw_hypertable_id);
|
||||
count++;
|
||||
}
|
||||
Assert(count == 1);
|
||||
|
||||
/* drop the invalidation trigger on the raw hypertable if the table
|
||||
* has not been dropped already
|
||||
*/
|
||||
raw_hypertable = ts_hypertable_get_by_id(agg->data.raw_hypertable_id);
|
||||
if (raw_hypertable && raw_hypertable->main_table_relid != InvalidOid)
|
||||
{
|
||||
rawht_trigoid =
|
||||
get_trigger_oid(raw_hypertable->main_table_relid, CAGGINVAL_TRIGGER_NAME, false);
|
||||
rawht_trig = (ObjectAddress){ .classId = TriggerRelationId,
|
||||
.objectId = rawht_trigoid,
|
||||
.objectSubId = 0 };
|
||||
if (OidIsValid(rawht_trig.objectId))
|
||||
performDeletion(&rawht_trig, DROP_RESTRICT, 0);
|
||||
}
|
||||
|
||||
/* delete the materialization table */
|
||||
ts_hypertable_drop(mat_hypertable);
|
||||
|
||||
/* Drop partial view */
|
||||
partial_view = (ObjectAddress){
|
||||
.classId = RelationRelationId,
|
||||
.objectId =
|
||||
get_relname_relid(NameStr(agg->data.partial_view_name),
|
||||
get_namespace_oid(NameStr(agg->data.partial_view_schema), false)),
|
||||
};
|
||||
/* The partial view may already be dropped by PG's dependency system (e.g. the raw table was
|
||||
* dropped) */
|
||||
if (OidIsValid(partial_view.objectId))
|
||||
performDeletion(&partial_view, DROP_RESTRICT, 0);
|
||||
}
|
||||
|
@ -71,7 +71,7 @@ select * from mat_m1 order by a ;
|
||||
|
||||
--check triggers on user hypertable --
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
select tgname, tgtype, tgenabled , relname from pg_trigger, pg_class
|
||||
select tgname, tgtype, tgenabled , relname from pg_trigger, pg_class
|
||||
where tgrelid = pg_class.oid and pg_class.relname like 'foo'
|
||||
order by tgname;
|
||||
tgname | tgtype | tgenabled | relname
|
||||
@ -558,7 +558,15 @@ NOTICE: adding not-null constraint to column "time_partition_col"
|
||||
DROP TABLE conditions;
|
||||
ERROR: cannot drop table conditions because other objects depend on it
|
||||
\set ON_ERROR_STOP 1
|
||||
SELECT h.schema_name AS "MAT_SCHEMA_NAME",
|
||||
--insert data now
|
||||
insert into conditions
|
||||
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 55, 75, 40, 70, NULL;
|
||||
insert into conditions
|
||||
select generate_series('2018-11-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 35, 45, 50, 40, NULL;
|
||||
insert into conditions
|
||||
select generate_series('2018-11-01 00:00'::timestamp, '2018-12-15 00:00'::timestamp, '1 day'), 'LA', 73, 55, NULL, 28, NULL;
|
||||
SELECT ca.raw_hypertable_id as "RAW_HYPERTABLE_ID",
|
||||
h.schema_name AS "MAT_SCHEMA_NAME",
|
||||
h.table_name AS "MAT_TABLE_NAME",
|
||||
partial_view_name as "PART_VIEW_NAME",
|
||||
partial_view_schema as "PART_VIEW_SCHEMA"
|
||||
@ -566,8 +574,33 @@ FROM _timescaledb_catalog.continuous_agg ca
|
||||
INNER JOIN _timescaledb_catalog.hypertable h ON(h.id = ca.mat_hypertable_id)
|
||||
WHERE user_view_name = 'mat_drop_test'
|
||||
\gset
|
||||
REFRESH MATERIALIZED VIEW mat_drop_test;
|
||||
INFO: new materialization range for public.conditions (time column timec) (1546041600000000)
|
||||
INFO: materializing continuous aggregate public.mat_drop_test: new range up to 1546041600000000
|
||||
--force invalidation
|
||||
insert into conditions
|
||||
select generate_series('2017-11-01 00:00'::timestamp, '2017-12-15 00:00'::timestamp, '1 day'), 'LA', 73, 55, NULL, 28, NULL;
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_invalidation_threshold;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
|
||||
count
|
||||
-------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
DROP TABLE conditions CASCADE;
|
||||
NOTICE: drop cascades to view _timescaledb_internal.ts_internal_mat_drop_testview
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
--catalog entry should be gone
|
||||
SELECT count(*)
|
||||
FROM _timescaledb_catalog.continuous_agg ca
|
||||
@ -577,6 +610,24 @@ WHERE user_view_name = 'mat_drop_test';
|
||||
0
|
||||
(1 row)
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_invalidation_threshold;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
|
||||
count
|
||||
-------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM _timescaledb_config.bgw_job;
|
||||
id | application_name | job_type | schedule_interval | max_runtime | max_retries | retry_period
|
||||
----+------------------+----------+-------------------+-------------+-------------+--------------
|
||||
|
@ -56,7 +56,7 @@ select * from mat_m1 order by a ;
|
||||
|
||||
--check triggers on user hypertable --
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
select tgname, tgtype, tgenabled , relname from pg_trigger, pg_class
|
||||
select tgname, tgtype, tgenabled , relname from pg_trigger, pg_class
|
||||
where tgrelid = pg_class.oid and pg_class.relname like 'foo'
|
||||
order by tgname;
|
||||
|
||||
@ -426,7 +426,18 @@ group by time_bucket('1day', timec);
|
||||
DROP TABLE conditions;
|
||||
\set ON_ERROR_STOP 1
|
||||
|
||||
SELECT h.schema_name AS "MAT_SCHEMA_NAME",
|
||||
--insert data now
|
||||
|
||||
insert into conditions
|
||||
select generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'POR', 55, 75, 40, 70, NULL;
|
||||
insert into conditions
|
||||
select generate_series('2018-11-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '1 day'), 'NYC', 35, 45, 50, 40, NULL;
|
||||
insert into conditions
|
||||
select generate_series('2018-11-01 00:00'::timestamp, '2018-12-15 00:00'::timestamp, '1 day'), 'LA', 73, 55, NULL, 28, NULL;
|
||||
|
||||
|
||||
SELECT ca.raw_hypertable_id as "RAW_HYPERTABLE_ID",
|
||||
h.schema_name AS "MAT_SCHEMA_NAME",
|
||||
h.table_name AS "MAT_TABLE_NAME",
|
||||
partial_view_name as "PART_VIEW_NAME",
|
||||
partial_view_schema as "PART_VIEW_SCHEMA"
|
||||
@ -435,12 +446,25 @@ INNER JOIN _timescaledb_catalog.hypertable h ON(h.id = ca.mat_hypertable_id)
|
||||
WHERE user_view_name = 'mat_drop_test'
|
||||
\gset
|
||||
|
||||
REFRESH MATERIALIZED VIEW mat_drop_test;
|
||||
|
||||
--force invalidation
|
||||
insert into conditions
|
||||
select generate_series('2017-11-01 00:00'::timestamp, '2017-12-15 00:00'::timestamp, '1 day'), 'LA', 73, 55, NULL, 28, NULL;
|
||||
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_invalidation_threshold;
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
|
||||
|
||||
DROP TABLE conditions CASCADE;
|
||||
|
||||
--catalog entry should be gone
|
||||
SELECT count(*)
|
||||
FROM _timescaledb_catalog.continuous_agg ca
|
||||
WHERE user_view_name = 'mat_drop_test';
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_invalidation_threshold;
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_completed_threshold;
|
||||
select count(*) from _timescaledb_catalog.continuous_aggs_hypertable_invalidation_log;
|
||||
|
||||
SELECT * FROM _timescaledb_config.bgw_job;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user