timescaledb/sql/policy_internal.sql
gayyappan fffd6c2350 Use plpgsql procedure for executing compression policy
This PR removes the C code that executes the compression
policy. Instead we use a PL/pgSQL procedure to execute
the policy.

PG13.4 and PG12.8 introduced some changes
that require PortalContexts while executing transactions.
The compression policy procedure compresses chunks in
multiple transactions. We have seen some issues with snapshots
and portal management in the policy code (due to the
PG13.4 code changes). SPI API has transaction-portal management
code. However, the compression policy code does not use SPI
interfaces. But it is fairly easy to just convert this into
a PL/pgSQL procedure (which calls SPI) rather than replicating
portal managment code in C to manage multiple txns in the
compression policy.

This PR also disallows decompress_chunk, compress_chunk and
recompress_chunk in txn read only mode.

Fixes #3656
2021-10-13 09:11:59 -04:00

182 lines
6.1 KiB
PL/PgSQL

-- This file and its contents are licensed under the Apache License 2.0.
-- Please see the included NOTICE for copyright information and
-- LICENSE-APACHE for a copy of the license.
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_retention(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_retention_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_reorder(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_reorder_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_recompression(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_recompression_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE _timescaledb_internal.policy_refresh_continuous_aggregate(job_id INTEGER, config JSONB)
AS '@MODULE_PATHNAME@', 'ts_policy_refresh_cagg_proc'
LANGUAGE C;
CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression_interval( job_id INTEGER,
htid INTEGER,
lag INTERVAL,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
AS $$
DECLARE
htoid regclass;
chunk_rec record;
numchunks integer := 1;
BEGIN
SELECT format('%I.%I',schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;
FOR chunk_rec IN
SELECT show.oid, ch.schema_name, ch.table_name, ch.status
FROM show_chunks( htoid, older_than => lag) as show(oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname and ch.schema_name = pgns.nspname and ch.hypertable_id = htid
WHERE ch.dropped is false and (ch.status = 0 OR ch.status = 3)
LOOP
IF chunk_rec.status = 0 THEN
PERFORM compress_chunk( chunk_rec.oid );
ELSIF chunk_rec.status = 3 AND recompress_enabled = 'true' THEN
PERFORM recompress_chunk( chunk_rec.oid );
END IF;
COMMIT;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;
numchunks := numchunks + 1;
IF maxchunks > 0 AND numchunks >= maxchunks THEN
EXIT;
END IF;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression_integer( job_id INTEGER,
htid INTEGER,
lag BIGINT,
maxchunks INTEGER,
verbose_log BOOLEAN,
recompress_enabled BOOLEAN)
AS $$
DECLARE
htoid regclass;
chunk_rec record;
numchunks integer := 0;
lag_integer BIGINT;
BEGIN
SELECT format('%I.%I',schema_name, table_name) INTO htoid
FROM _timescaledb_catalog.hypertable
WHERE id = htid;
--for the integer case , we have to compute the lag w.r.t
-- the integer_now function and then pass on to show_chunks
lag_integer := _timescaledb_internal.subtract_integer_from_now( htoid, lag);
FOR chunk_rec IN
SELECT show.oid, ch.schema_name, ch.table_name, ch.status
FROM show_chunks( htoid, older_than => lag_integer) SHOW (oid)
INNER JOIN pg_class pgc ON pgc.oid = show.oid
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname and ch.schema_name = pgns.nspname and ch.hypertable_id = htid
WHERE ch.dropped is false and (ch.status = 0 OR ch.status = 3)
LOOP
IF chunk_rec.status = 0 THEN
PERFORM compress_chunk( chunk_rec.oid );
ELSIF chunk_rec.status = 3 AND recompress_enabled = 'true' THEN
PERFORM recompress_chunk( chunk_rec.oid );
END IF;
COMMIT;
IF verbose_log THEN
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
END IF;
numchunks := numchunks + 1;
IF maxchunks > 0 AND numchunks >= maxchunks THEN
EXIT;
END IF;
END LOOP;
END;
$$ LANGUAGE PLPGSQL;
CREATE OR REPLACE PROCEDURE
_timescaledb_internal.policy_compression( job_id INTEGER, config JSONB)
AS $$
DECLARE
dimtype regtype;
compress_after text;
lag_interval interval;
lag_integer bigint;
htid integer;
htoid regclass;
chunk_rec record;
verbose_log bool;
maxchunks integer := 0;
numchunks integer := 1;
recompress_enabled bool;
BEGIN
IF config IS NULL THEN
RAISE EXCEPTION 'job % has null config', job_id;
END IF;
htid := jsonb_object_field_text (config, 'hypertable_id')::integer;
IF htid is NULL THEN
RAISE EXCEPTION 'job % config must have hypertable_id', job_id;
END IF;
verbose_log := jsonb_object_field_text (config, 'verbose_log')::boolean;
IF verbose_log is NULL THEN
verbose_log = 'false';
END IF;
maxchunks := jsonb_object_field_text (config, 'maxchunks_to_compress')::integer;
IF maxchunks IS NULL THEN
maxchunks = 0;
END IF;
recompress_enabled := jsonb_object_field_text (config, 'recompress')::boolean;
IF recompress_enabled IS NULL THEN
recompress_enabled = 'true';
END IF;
compress_after := jsonb_object_field_text(config, 'compress_after');
IF compress_after IS NULL THEN
RAISE EXCEPTION 'job % config must have compress_after', job_id;
END IF;
-- find primary dimension type --
SELECT column_type INTO STRICT dimtype
FROM ( SELECT ht.schema_name, ht.table_name, dim.column_name, dim.column_type,
row_number() over(partition by ht.id order by dim.id) as rn
FROM _timescaledb_catalog.hypertable ht ,
_timescaledb_catalog.dimension dim
WHERE ht.id = dim.hypertable_id and ht.id = htid ) q
WHERE rn = 1;
CASE WHEN (dimtype = 'TIMESTAMP'::regtype
OR dimtype = 'TIMESTAMPTZ'::regtype
OR dimtype = 'DATE'::regtype) THEN
lag_interval := jsonb_object_field_text(config, 'compress_after')::interval ;
CALL _timescaledb_internal.policy_compression_interval(
job_id, htid, lag_interval,
maxchunks, verbose_log, recompress_enabled);
ELSE
lag_integer := jsonb_object_field_text(config, 'compress_after')::bigint;
CALL _timescaledb_internal.policy_compression_integer(
job_id, htid, lag_integer,
maxchunks, verbose_log, recompress_enabled );
END CASE;
END;
$$ LANGUAGE PLPGSQL;