DROP PROCEDURE @extschema@.cagg_migrate (REGCLASS, BOOLEAN, BOOLEAN); DROP PROCEDURE _timescaledb_internal.cagg_migrate_create_plan (_timescaledb_catalog.continuous_agg, TEXT, BOOLEAN, BOOLEAN); CREATE PROCEDURE _timescaledb_internal.cagg_migrate_create_plan ( _cagg_data _timescaledb_catalog.continuous_agg, _cagg_name_new TEXT, _override BOOLEAN DEFAULT FALSE, _drop_old BOOLEAN DEFAULT FALSE ) LANGUAGE plpgsql AS $BODY$ DECLARE _sql TEXT; _matht RECORD; _time_interval INTERVAL; _integer_interval BIGINT; _watermark TEXT; _policies JSONB; _bucket_column_name TEXT; _bucket_column_type TEXT; _interval_type TEXT; _interval_value TEXT; BEGIN IF _timescaledb_internal.cagg_migrate_plan_exists(_cagg_data.mat_hypertable_id) IS TRUE THEN RAISE EXCEPTION 'plan already exists for materialized hypertable %', _cagg_data.mat_hypertable_id; END IF; INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan (mat_hypertable_id) VALUES (_cagg_data.mat_hypertable_id); SELECT schema_name, table_name INTO _matht FROM _timescaledb_catalog.hypertable WHERE id = _cagg_data.mat_hypertable_id; SELECT time_interval, integer_interval, column_name, column_type INTO _time_interval, _integer_interval, _bucket_column_name, _bucket_column_type FROM timescaledb_information.dimensions WHERE hypertable_schema = _matht.schema_name AND hypertable_name = _matht.table_name AND dimension_type = 'Time'; IF _integer_interval IS NOT NULL THEN _interval_value := _integer_interval::TEXT; _interval_type := _bucket_column_type; IF _bucket_column_type = 'bigint' THEN _watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::bigint, '-9223372036854775808'::bigint)::TEXT; ELSIF _bucket_column_type = 'integer' THEN _watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::integer, '-2147483648'::integer)::TEXT; ELSE _watermark := COALESCE(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)::smallint, '-32768'::smallint)::TEXT; END IF; ELSE _interval_value := _time_interval::TEXT; _interval_type := 'interval'; _watermark := COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(_cagg_data.mat_hypertable_id)), '-infinity'::timestamptz)::TEXT; END IF; -- get all scheduled policies except the refresh SELECT jsonb_build_object('policies', array_agg(id)) INTO _policies FROM _timescaledb_config.bgw_job WHERE hypertable_id = _cagg_data.mat_hypertable_id AND proc_name IS DISTINCT FROM 'policy_refresh_continuous_aggregate' AND scheduled IS TRUE AND id >= 1000; INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) VALUES (_cagg_data.mat_hypertable_id, 'SAVE WATERMARK', jsonb_build_object('watermark', _watermark)), (_cagg_data.mat_hypertable_id, 'CREATE NEW CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new)), (_cagg_data.mat_hypertable_id, 'DISABLE POLICIES', _policies), (_cagg_data.mat_hypertable_id, 'REFRESH NEW CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'window_start', _watermark, 'window_start_type', _bucket_column_type)); -- Finish the step because don't require any extra step UPDATE _timescaledb_catalog.continuous_agg_migrate_plan_step SET status = 'FINISHED', start_ts = now(), end_ts = clock_timestamp() WHERE type = 'SAVE WATERMARK'; _sql := format ( $$ WITH boundaries AS ( SELECT min(%1$I), max(%1$I), %1$L AS bucket_column_name, %2$L AS bucket_column_type, %3$L AS cagg_name_new FROM %4$I.%5$I WHERE %1$I < CAST(%6$L AS %2$s) ) INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) SELECT %7$L, 'COPY DATA', jsonb_build_object ( 'start_ts', start::text, 'end_ts', (start + CAST(%8$L AS %9$s))::text, 'bucket_column_name', bucket_column_name, 'bucket_column_type', bucket_column_type, 'cagg_name_new', cagg_name_new ) FROM boundaries, LATERAL generate_series(min, max, CAST(%8$L AS %9$s)) AS start; $$, _bucket_column_name, _bucket_column_type, _cagg_name_new, _cagg_data.user_view_schema, _cagg_data.user_view_name, _watermark, _cagg_data.mat_hypertable_id, _interval_value, _interval_type ); EXECUTE _sql; INSERT INTO _timescaledb_catalog.continuous_agg_migrate_plan_step (mat_hypertable_id, type, config) VALUES (_cagg_data.mat_hypertable_id, 'OVERRIDE CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)), (_cagg_data.mat_hypertable_id, 'DROP OLD CAGG', jsonb_build_object('cagg_name_new', _cagg_name_new, 'override', _override, 'drop_old', _drop_old)), (_cagg_data.mat_hypertable_id, 'COPY POLICIES', _policies || jsonb_build_object('cagg_name_new', _cagg_name_new)), (_cagg_data.mat_hypertable_id, 'ENABLE POLICIES', NULL); END; $BODY$ SET search_path TO pg_catalog, pg_temp; CREATE PROCEDURE @extschema@.cagg_migrate ( _cagg REGCLASS, _override BOOLEAN DEFAULT FALSE, _drop_old BOOLEAN DEFAULT FALSE ) LANGUAGE plpgsql AS $BODY$ DECLARE _cagg_schema TEXT; _cagg_name TEXT; _cagg_name_new TEXT; _cagg_data _timescaledb_catalog.continuous_agg; BEGIN SELECT nspname, relname INTO _cagg_schema, _cagg_name FROM pg_catalog.pg_class JOIN pg_catalog.pg_namespace ON pg_namespace.oid OPERATOR(pg_catalog.=) pg_class.relnamespace WHERE pg_class.oid OPERATOR(pg_catalog.=) _cagg::pg_catalog.oid; -- maximum size of an identifier in Postgres is 63 characters, se we need to left space for '_new' _cagg_name_new := pg_catalog.format('%s_new', pg_catalog.substr(_cagg_name, 1, 59)); -- pre-validate the migration and get some variables _cagg_data := _timescaledb_internal.cagg_migrate_pre_validation(_cagg_schema, _cagg_name, _cagg_name_new); -- create new migration plan CALL _timescaledb_internal.cagg_migrate_create_plan(_cagg_data, _cagg_name_new, _override, _drop_old); COMMIT; -- execute the migration plan CALL _timescaledb_internal.cagg_migrate_execute_plan(_cagg_data); -- finish the migration plan UPDATE _timescaledb_catalog.continuous_agg_migrate_plan SET end_ts = pg_catalog.clock_timestamp() WHERE mat_hypertable_id OPERATOR(pg_catalog.=) _cagg_data.mat_hypertable_id; END; $BODY$; -- Issue #4727 ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step DROP CONSTRAINT IF EXISTS continuous_agg_migrate_plan_step_check2; ALTER TABLE _timescaledb_catalog.continuous_agg_migrate_plan_step ADD CONSTRAINT continuous_agg_migrate_plan_step_check2 CHECK (type IN ('CREATE NEW CAGG', 'DISABLE POLICIES', 'COPY POLICIES', 'ENABLE POLICIES', 'SAVE WATERMARK', 'REFRESH NEW CAGG', 'COPY DATA'));