mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 02:23:49 +08:00
The catalog table continuous_aggs_bucket_function is currently only used for variable bucket sizes. Information about the fixed-size buckets is stored in the table continuous_agg only. This causes some problems (e.g., we have redundant fields for the bucket_size, fixes size buckets with offsets are not supported, ...). This commit is the first in a row of commits that refactor the catalog for the CAgg time_bucket function. The goals are: * Remove the CAgg redundant attributes in the catalog * Create an entry in continuous_aggs_bucket_function for all CAggs that use time_bucket This first commit refactors the continuous_aggs_bucket_function table and prepares it for more generic use. Not all attributes are used yet, but these will change in follow-up PRs.
144 lines
6.7 KiB
PL/PgSQL
144 lines
6.7 KiB
PL/PgSQL
-- Remove multi-node CAGG support
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_cagg_log_add_entry(integer,bigint,bigint);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_hyper_log_add_entry(integer,bigint,bigint);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.materialization_invalidation_log_delete(integer);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_process_cagg_log(integer,integer,regtype,bigint,bigint,integer[],bigint[],bigint[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_process_cagg_log(integer,integer,regtype,bigint,bigint,integer[],bigint[],bigint[],text[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_process_hypertable_log(integer,integer,regtype,integer[],bigint[],bigint[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.invalidation_process_hypertable_log(integer,integer,regtype,integer[],bigint[],bigint[],text[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_internal.hypertable_invalidation_log_delete(integer);
|
|
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_cagg_log_add_entry(integer,bigint,bigint);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_hyper_log_add_entry(integer,bigint,bigint);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.materialization_invalidation_log_delete(integer);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_process_cagg_log(integer,integer,regtype,bigint,bigint,integer[],bigint[],bigint[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_process_cagg_log(integer,integer,regtype,bigint,bigint,integer[],bigint[],bigint[],text[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_process_hypertable_log(integer,integer,regtype,integer[],bigint[],bigint[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.invalidation_process_hypertable_log(integer,integer,regtype,integer[],bigint[],bigint[],text[]);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.hypertable_invalidation_log_delete(integer);
|
|
|
|
-- Remove chunk metadata when marked as dropped
|
|
CREATE FUNCTION _timescaledb_functions.remove_dropped_chunk_metadata(_hypertable_id INTEGER)
|
|
RETURNS INTEGER LANGUAGE plpgsql AS $$
|
|
DECLARE
|
|
_chunk_id INTEGER;
|
|
_removed INTEGER := 0;
|
|
BEGIN
|
|
FOR _chunk_id IN
|
|
SELECT id FROM _timescaledb_catalog.chunk
|
|
WHERE hypertable_id = _hypertable_id
|
|
AND dropped IS TRUE
|
|
AND NOT EXISTS (
|
|
SELECT FROM information_schema.tables
|
|
WHERE tables.table_schema = chunk.schema_name
|
|
AND tables.table_name = chunk.table_name
|
|
)
|
|
AND NOT EXISTS (
|
|
SELECT FROM _timescaledb_catalog.hypertable
|
|
JOIN _timescaledb_catalog.continuous_agg ON continuous_agg.raw_hypertable_id = hypertable.id
|
|
WHERE hypertable.id = chunk.hypertable_id
|
|
-- for the old caggs format we need to keep chunk metadata for dropped chunks
|
|
AND continuous_agg.finalized IS FALSE
|
|
)
|
|
LOOP
|
|
_removed := _removed + 1;
|
|
RAISE INFO 'Removing metadata of chunk % from hypertable %', _chunk_id, _hypertable_id;
|
|
WITH _dimension_slice_remove AS (
|
|
DELETE FROM _timescaledb_catalog.dimension_slice
|
|
USING _timescaledb_catalog.chunk_constraint
|
|
WHERE dimension_slice.id = chunk_constraint.dimension_slice_id
|
|
AND chunk_constraint.chunk_id = _chunk_id
|
|
RETURNING _timescaledb_catalog.dimension_slice.id
|
|
)
|
|
DELETE FROM _timescaledb_catalog.chunk_constraint
|
|
USING _dimension_slice_remove
|
|
WHERE chunk_constraint.dimension_slice_id = _dimension_slice_remove.id;
|
|
|
|
DELETE FROM _timescaledb_internal.bgw_policy_chunk_stats
|
|
WHERE bgw_policy_chunk_stats.chunk_id = _chunk_id;
|
|
|
|
DELETE FROM _timescaledb_catalog.chunk_index
|
|
WHERE chunk_index.chunk_id = _chunk_id;
|
|
|
|
DELETE FROM _timescaledb_catalog.compression_chunk_size
|
|
WHERE compression_chunk_size.chunk_id = _chunk_id
|
|
OR compression_chunk_size.compressed_chunk_id = _chunk_id;
|
|
|
|
DELETE FROM _timescaledb_catalog.chunk
|
|
WHERE chunk.id = _chunk_id
|
|
OR chunk.compressed_chunk_id = _chunk_id;
|
|
END LOOP;
|
|
|
|
RETURN _removed;
|
|
END;
|
|
$$ SET search_path TO pg_catalog, pg_temp;
|
|
|
|
SELECT _timescaledb_functions.remove_dropped_chunk_metadata(id) AS chunks_metadata_removed
|
|
FROM _timescaledb_catalog.hypertable;
|
|
|
|
--
|
|
-- Rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function`
|
|
--
|
|
|
|
CREATE OR REPLACE FUNCTION _timescaledb_functions.cagg_get_bucket_function(
|
|
mat_hypertable_id INTEGER
|
|
) RETURNS regprocedure AS '@MODULE_PATHNAME@', 'ts_continuous_agg_get_bucket_function' LANGUAGE C STRICT VOLATILE;
|
|
|
|
-- Since we need now the regclass of the used bucket function, we have to recover it
|
|
-- by parsing the view query by calling 'cagg_get_bucket_function'.
|
|
CREATE TABLE _timescaledb_catalog._tmp_continuous_aggs_bucket_function AS
|
|
SELECT
|
|
mat_hypertable_id,
|
|
_timescaledb_functions.cagg_get_bucket_function(mat_hypertable_id),
|
|
bucket_width,
|
|
origin,
|
|
NULL::text AS bucket_offset,
|
|
timezone,
|
|
false AS bucket_fixed_width
|
|
FROM
|
|
_timescaledb_catalog.continuous_aggs_bucket_function
|
|
ORDER BY
|
|
mat_hypertable_id;
|
|
|
|
ALTER EXTENSION timescaledb
|
|
DROP TABLE _timescaledb_catalog.continuous_aggs_bucket_function;
|
|
|
|
DROP TABLE _timescaledb_catalog.continuous_aggs_bucket_function;
|
|
|
|
CREATE TABLE _timescaledb_catalog.continuous_aggs_bucket_function (
|
|
mat_hypertable_id integer NOT NULL,
|
|
-- The bucket function
|
|
bucket_func regprocedure NOT NULL,
|
|
-- `bucket_width` argument of the function, e.g. "1 month"
|
|
bucket_width text NOT NULL,
|
|
-- optional `origin` argument of the function provided by the user
|
|
bucket_origin text,
|
|
-- optional `offset` argument of the function provided by the user
|
|
bucket_offset text,
|
|
-- optional `timezone` argument of the function provided by the user
|
|
bucket_timezone text,
|
|
-- fixed or variable sized bucket
|
|
bucket_fixed_width bool NOT NULL,
|
|
-- table constraints
|
|
CONSTRAINT continuous_aggs_bucket_function_pkey PRIMARY KEY (mat_hypertable_id),
|
|
CONSTRAINT continuous_aggs_bucket_function_mat_hypertable_id_fkey FOREIGN KEY (mat_hypertable_id) REFERENCES _timescaledb_catalog.hypertable (id) ON DELETE CASCADE
|
|
);
|
|
|
|
INSERT INTO _timescaledb_catalog.continuous_aggs_bucket_function
|
|
SELECT * FROM _timescaledb_catalog._tmp_continuous_aggs_bucket_function;
|
|
|
|
DROP TABLE _timescaledb_catalog._tmp_continuous_aggs_bucket_function;
|
|
|
|
SELECT pg_catalog.pg_extension_config_dump('_timescaledb_catalog.continuous_aggs_bucket_function', '');
|
|
|
|
GRANT SELECT ON TABLE _timescaledb_catalog.continuous_aggs_bucket_function TO PUBLIC;
|
|
|
|
ANALYZE _timescaledb_catalog.continuous_aggs_bucket_function;
|
|
|
|
ALTER EXTENSION timescaledb DROP FUNCTION _timescaledb_functions.cagg_get_bucket_function(INTEGER);
|
|
DROP FUNCTION IF EXISTS _timescaledb_functions.cagg_get_bucket_function(INTEGER);
|
|
|
|
--
|
|
-- End rebuild the catalog table `_timescaledb_catalog.continuous_aggs_bucket_function`
|
|
--
|