mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-15 18:13:18 +08:00
Compare commits
6 Commits
462d63cc0b
...
b5b6396581
Author | SHA1 | Date | |
---|---|---|---|
|
b5b6396581 | ||
|
d1d74dcc92 | ||
|
9ac307db2e | ||
|
e3cabc8601 | ||
|
e3c78ca59a | ||
|
69a1bf3994 |
.unreleased
scripts
sql
src
tsl
1
.unreleased/pr_7837
Normal file
1
.unreleased/pr_7837
Normal file
@ -0,0 +1 @@
|
||||
Fixes: #7837 Ignore frozen chunks in compression policy
|
1
.unreleased/pr_7844
Normal file
1
.unreleased/pr_7844
Normal file
@ -0,0 +1 @@
|
||||
Implements: #7844 Add GUC to enable exclusive locking recompression
|
1
.unreleased/pr_7850
Normal file
1
.unreleased/pr_7850
Normal file
@ -0,0 +1 @@
|
||||
Fixes: #7850 Add is_current_xact_tuple to Arrow TTS
|
@ -16,21 +16,21 @@
|
||||
# databases.
|
||||
#
|
||||
# The following environment variables can be set:
|
||||
# - UPDATE_FROM_TAG is the version to update from (optional).
|
||||
# - CURRENT_VERSION is the version to update from (required).
|
||||
#
|
||||
# - UPDATE_TO_TAG is the version to update to (optional).
|
||||
# - NEXT_VERSION is the version to update to (required).
|
||||
#
|
||||
# - PGHOST is host to use for the connection (required).
|
||||
# - CONNECTION_STRING is the URL to use for the connection (required).
|
||||
#
|
||||
# - PGPORT is the port to use for the connection (required).
|
||||
#
|
||||
# - PGDATABASE is the database to use for the connection (required).
|
||||
#
|
||||
# - PGUSER is the username to use for the connection (required).
|
||||
#
|
||||
# - PGPASSWORD is the password to use for the connection
|
||||
# (optional). If not set, password from .pgpass will be used (if
|
||||
# available).
|
||||
|
||||
if [ "$#" -ne 3 ]; then
|
||||
echo "${0} <current_version> <next_version> <connection_string>"
|
||||
exit 2
|
||||
fi
|
||||
|
||||
CURRENT_VERSION=$1
|
||||
NEXT_VERSION=$2
|
||||
CONNECTION_STRING=$3
|
||||
|
||||
SCRIPT_DIR=$(dirname $0)
|
||||
BASE_DIR=${PWD}/${SCRIPT_DIR}/..
|
||||
@ -41,26 +41,11 @@ UPGRADE_OUT="$SCRATCHDIR/upgrade.out"
|
||||
CLEAN_OUT="$SCRATCHDIR/clean.out"
|
||||
RESTORE_OUT="$SCRATCHDIR/restore.out"
|
||||
TEST_VERSION=${TEST_VERSION:-v7}
|
||||
UPDATE_FROM_TAG=${UPDATE_FROM_TAG:-1.7.4}
|
||||
UPDATE_TO_TAG=${UPDATE_TO_TAG:-2.0.1}
|
||||
|
||||
# We do not have superuser privileges when running smoke tests.
|
||||
WITH_SUPERUSER=false
|
||||
WITH_ROLES=false
|
||||
|
||||
while getopts "s:t:" opt;
|
||||
do
|
||||
case $opt in
|
||||
s)
|
||||
UPDATE_FROM_TAG=$OPTARG
|
||||
;;
|
||||
t)
|
||||
UPDATE_TO_TAG=$OPTARG
|
||||
;;
|
||||
*)
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
shift $((OPTIND-1))
|
||||
|
||||
echo "**** pg_dump at " "$(which pg_dump)"
|
||||
@ -80,7 +65,7 @@ PSQL="psql -a -qX $PGOPTS"
|
||||
# picked up and used for the connection.
|
||||
if [[ $# -gt 0 ]]; then
|
||||
# shellcheck disable=SC2207 # Prefer mapfile or read -a to split command output (or quote to avoid splitting).
|
||||
parts=($(echo $1 | perl -mURI::Split=uri_split -ne '@F = uri_split($_); print join(" ", split(qr/[:@]/, $F[1]), substr($F[2], 1))'))
|
||||
parts=($(echo $CONNECTION_STRING | perl -mURI::Split=uri_split -ne '@F = uri_split($_); print join(" ", split(qr/[:@]/, $F[1]), substr($F[2], 1))'))
|
||||
export PGUSER=${parts[0]}
|
||||
if [[ ${#parts[@]} -eq 5 ]]; then
|
||||
# Cloud has 5 fields
|
||||
@ -94,7 +79,7 @@ if [[ $# -gt 0 ]]; then
|
||||
export PGPORT=${parts[2]}
|
||||
export PGDATABASE=${parts[3]}
|
||||
else
|
||||
echo "Malformed URL '$1'" 1>&2
|
||||
echo "Malformed URL '$CONNECTION_STRING'" 1>&2
|
||||
exit 2
|
||||
fi
|
||||
fi
|
||||
@ -135,7 +120,7 @@ cd ${BASE_DIR}/test/sql/updates
|
||||
$PSQL -c '\conninfo'
|
||||
|
||||
# shellcheck disable=SC2207 # Prefer mapfile or read -a to split command output (or quote to avoid splitting).
|
||||
missing=($(missing_versions $UPDATE_FROM_TAG $UPDATE_TO_TAG))
|
||||
missing=($(missing_versions $CURRENT_VERSION $NEXT_VERSION))
|
||||
if [[ ${#missing[@]} -gt 0 ]]; then
|
||||
echo "ERROR: Missing version(s) ${missing[*]} of 'timescaledb'"
|
||||
echo "Available versions: " "$($PSQL -tc "SELECT version FROM pg_available_extension_versions WHERE name = 'timescaledb'")"
|
||||
@ -151,14 +136,14 @@ echo "---- Connecting to ${FORGE_CONNINFO} and running setup ----"
|
||||
$PSQL -f cleanup.${TEST_VERSION}.sql >>$LOGFILE 2>&1
|
||||
$PSQL -c "DROP EXTENSION IF EXISTS timescaledb CASCADE" >>$LOGFILE 2>&1
|
||||
$PSQL -f pre.cleanup.sql >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${UPDATE_FROM_TAG}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${CURRENT_VERSION}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "\dx"
|
||||
|
||||
# Run setup on Upgrade
|
||||
$PSQL -f pre.smoke.sql >>$LOGFILE 2>&1
|
||||
$PSQL -f setup.${TEST_VERSION}.sql >>$LOGFILE 2>&1
|
||||
# Run update on Upgrade. You now have a 2.0.2 version in Upgrade.
|
||||
$PSQL -c "ALTER EXTENSION timescaledb UPDATE TO '${UPDATE_TO_TAG}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "ALTER EXTENSION timescaledb UPDATE TO '${NEXT_VERSION}'" >>$LOGFILE 2>&1
|
||||
|
||||
echo -n "Dumping the contents of Upgrade..."
|
||||
pg_dump -Fc -f $DUMPFILE >>$LOGFILE 2>&1
|
||||
@ -172,10 +157,10 @@ echo "done"
|
||||
|
||||
$PSQL -f cleanup.${TEST_VERSION}.sql >>$LOGFILE 2>&1
|
||||
|
||||
echo "---- Create a ${UPDATE_TO_TAG} version Clean ----"
|
||||
echo "---- Create a ${NEXT_VERSION} version Clean ----"
|
||||
$PSQL -c "DROP EXTENSION IF EXISTS timescaledb CASCADE" >>$LOGFILE 2>&1
|
||||
$PSQL -f pre.cleanup.sql >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${UPDATE_TO_TAG}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${NEXT_VERSION}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "\dx"
|
||||
|
||||
echo "---- Run the setup scripts on Clean, with post-update actions ----"
|
||||
@ -187,10 +172,10 @@ $PSQL -f post.${TEST_VERSION}.sql >$CLEAN_OUT
|
||||
|
||||
$PSQL -f cleanup.${TEST_VERSION}.sql >>$LOGFILE 2>&1
|
||||
|
||||
echo "---- Create a ${UPDATE_TO_TAG} version Restore ----"
|
||||
echo "---- Create a ${NEXT_VERSION} version Restore ----"
|
||||
$PSQL -c "DROP EXTENSION IF EXISTS timescaledb CASCADE" >>$LOGFILE 2>&1
|
||||
$PSQL -f pre.cleanup.sql >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${UPDATE_TO_TAG}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "CREATE EXTENSION timescaledb VERSION '${NEXT_VERSION}'" >>$LOGFILE 2>&1
|
||||
$PSQL -c "\dx"
|
||||
|
||||
echo "---- Restore the UpgradeDump into Restore ----"
|
||||
|
@ -52,6 +52,8 @@ DECLARE
|
||||
_message text;
|
||||
_detail text;
|
||||
_sqlstate text;
|
||||
-- fully compressed chunk status
|
||||
status_fully_compressed int := 1;
|
||||
-- chunk status bits:
|
||||
bit_compressed int := 1;
|
||||
bit_compressed_unordered int := 2;
|
||||
@ -95,58 +97,27 @@ BEGIN
|
||||
INNER JOIN pg_class pgc ON pgc.oid = show.oid
|
||||
INNER JOIN pg_namespace pgns ON pgc.relnamespace = pgns.oid
|
||||
INNER JOIN _timescaledb_catalog.chunk ch ON ch.table_name = pgc.relname AND ch.schema_name = pgns.nspname AND ch.hypertable_id = htid
|
||||
WHERE
|
||||
NOT ch.dropped AND NOT ch.osm_chunk
|
||||
AND (
|
||||
ch.status = 0 OR
|
||||
(
|
||||
ch.status & bit_compressed > 0 AND (
|
||||
ch.status & bit_compressed_unordered > 0 OR
|
||||
ch.status & bit_compressed_partial > 0
|
||||
)
|
||||
)
|
||||
)
|
||||
WHERE NOT ch.dropped
|
||||
AND NOT ch.osm_chunk
|
||||
-- Checking for chunks which are not fully compressed and not frozen
|
||||
AND ch.status != status_fully_compressed
|
||||
AND ch.status & bit_frozen = 0
|
||||
LOOP
|
||||
IF chunk_rec.status = 0 THEN
|
||||
BEGIN
|
||||
BEGIN
|
||||
IF chunk_rec.status = bit_compressed OR recompress_enabled IS TRUE THEN
|
||||
PERFORM @extschema@.compress_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_message = MESSAGE_TEXT,
|
||||
_detail = PG_EXCEPTION_DETAIL,
|
||||
_sqlstate = RETURNED_SQLSTATE;
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = _sqlstate;
|
||||
chunks_failure := chunks_failure + 1;
|
||||
END;
|
||||
ELSIF
|
||||
(
|
||||
chunk_rec.status & bit_compressed > 0 AND (
|
||||
chunk_rec.status & bit_compressed_unordered > 0 OR
|
||||
chunk_rec.status & bit_compressed_partial > 0
|
||||
)
|
||||
) AND recompress_enabled IS TRUE THEN
|
||||
BEGIN
|
||||
-- first check if there's an index. Might have to use a heuristic to determine if index usage would be efficient,
|
||||
-- or if we'd better fall back to decompressing & recompressing entire chunk
|
||||
IF _timescaledb_functions.get_compressed_chunk_index_for_recompression(chunk_rec.oid) IS NOT NULL THEN
|
||||
PERFORM _timescaledb_functions.recompress_chunk_segmentwise(chunk_rec.oid);
|
||||
ELSE
|
||||
PERFORM @extschema@.decompress_chunk(chunk_rec.oid, if_compressed => true);
|
||||
PERFORM @extschema@.compress_chunk(chunk_rec.oid, hypercore_use_access_method => useam);
|
||||
END IF;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_message = MESSAGE_TEXT,
|
||||
_detail = PG_EXCEPTION_DETAIL,
|
||||
_sqlstate = RETURNED_SQLSTATE;
|
||||
RAISE WARNING 'recompressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = _sqlstate;
|
||||
chunks_failure := chunks_failure + 1;
|
||||
END;
|
||||
END IF;
|
||||
numchunks := numchunks + 1;
|
||||
END IF;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
GET STACKED DIAGNOSTICS
|
||||
_message = MESSAGE_TEXT,
|
||||
_detail = PG_EXCEPTION_DETAIL,
|
||||
_sqlstate = RETURNED_SQLSTATE;
|
||||
RAISE WARNING 'compressing chunk "%" failed when compression policy is executed', chunk_rec.oid::regclass::text
|
||||
USING DETAIL = format('Message: (%s), Detail: (%s).', _message, _detail),
|
||||
ERRCODE = _sqlstate;
|
||||
chunks_failure := chunks_failure + 1;
|
||||
END;
|
||||
COMMIT;
|
||||
-- SET LOCAL is only active until end of transaction.
|
||||
-- While we could use SET at the start of the function we do not
|
||||
@ -156,7 +127,6 @@ BEGIN
|
||||
IF verbose_log THEN
|
||||
RAISE LOG 'job % completed processing chunk %.%', job_id, chunk_rec.schema_name, chunk_rec.table_name;
|
||||
END IF;
|
||||
numchunks := numchunks + 1;
|
||||
IF maxchunks > 0 AND numchunks >= maxchunks THEN
|
||||
EXIT;
|
||||
END IF;
|
||||
|
12
src/guc.c
12
src/guc.c
@ -152,6 +152,7 @@ TSDLLEXPORT bool ts_guc_auto_sparse_indexes = true;
|
||||
TSDLLEXPORT bool ts_guc_default_hypercore_use_access_method = false;
|
||||
bool ts_guc_enable_chunk_skipping = false;
|
||||
TSDLLEXPORT bool ts_guc_enable_segmentwise_recompression = true;
|
||||
TSDLLEXPORT bool ts_guc_enable_exclusive_locking_recompression = false;
|
||||
TSDLLEXPORT bool ts_guc_enable_bool_compression = false;
|
||||
|
||||
/* Only settable in debug mode for testing */
|
||||
@ -752,6 +753,17 @@ _guc_init(void)
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_exclusive_locking_recompression"),
|
||||
"Enable exclusive locking recompression",
|
||||
"Enable getting exclusive lock on chunk during segmentwise "
|
||||
"recompression",
|
||||
&ts_guc_enable_exclusive_locking_recompression,
|
||||
false,
|
||||
PGC_USERSET,
|
||||
0,
|
||||
NULL,
|
||||
NULL,
|
||||
NULL);
|
||||
|
||||
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_bool_compression"),
|
||||
"Enable experimental bool compression functionality",
|
||||
|
@ -69,6 +69,7 @@ extern TSDLLEXPORT bool ts_guc_enable_delete_after_compression;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_merge_on_cagg_refresh;
|
||||
extern bool ts_guc_enable_chunk_skipping;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_segmentwise_recompression;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_exclusive_locking_recompression;
|
||||
extern TSDLLEXPORT bool ts_guc_enable_bool_compression;
|
||||
|
||||
/* Only settable in debug mode for testing */
|
||||
|
@ -933,7 +933,7 @@ tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress
|
||||
if (ts_chunk_is_compressed(chunk))
|
||||
{
|
||||
CompressionSettings *chunk_settings = ts_compression_settings_get(chunk->table_id);
|
||||
bool valid_orderby_settings = chunk_settings->fd.orderby;
|
||||
bool valid_orderby_settings = chunk_settings && chunk_settings->fd.orderby;
|
||||
if (recompress)
|
||||
{
|
||||
CompressionSettings *ht_settings = ts_compression_settings_get(chunk->hypertable_relid);
|
||||
|
@ -133,11 +133,13 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
|
||||
(errmsg("acquiring locks for recompression: \"%s.%s\"",
|
||||
NameStr(uncompressed_chunk->fd.schema_name),
|
||||
NameStr(uncompressed_chunk->fd.table_name))));
|
||||
|
||||
LOCKMODE recompression_lockmode =
|
||||
ts_guc_enable_exclusive_locking_recompression ? ExclusiveLock : ShareUpdateExclusiveLock;
|
||||
/* lock both chunks, compressed and uncompressed */
|
||||
Relation uncompressed_chunk_rel =
|
||||
table_open(uncompressed_chunk->table_id, ShareUpdateExclusiveLock);
|
||||
Relation compressed_chunk_rel =
|
||||
table_open(compressed_chunk->table_id, ShareUpdateExclusiveLock);
|
||||
table_open(uncompressed_chunk->table_id, recompression_lockmode);
|
||||
Relation compressed_chunk_rel = table_open(compressed_chunk->table_id, recompression_lockmode);
|
||||
|
||||
bool has_unique_constraints =
|
||||
ts_indexing_relation_has_primary_or_unique_index(uncompressed_chunk_rel);
|
||||
@ -256,7 +258,9 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
|
||||
"Using index \"%s\" for recompression",
|
||||
get_rel_name(row_compressor.index_oid));
|
||||
|
||||
Relation index_rel = index_open(row_compressor.index_oid, ExclusiveLock);
|
||||
LOCKMODE index_lockmode =
|
||||
ts_guc_enable_exclusive_locking_recompression ? ExclusiveLock : RowExclusiveLock;
|
||||
Relation index_rel = index_open(row_compressor.index_oid, index_lockmode);
|
||||
ereport(DEBUG1,
|
||||
(errmsg("locks acquired for recompression: \"%s.%s\"",
|
||||
NameStr(uncompressed_chunk->fd.schema_name),
|
||||
@ -424,9 +428,8 @@ recompress_chunk_segmentwise_impl(Chunk *uncompressed_chunk)
|
||||
snapshot))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg(
|
||||
"cannot proceed with recompression due to concurrent updates on "
|
||||
"compressed data")));
|
||||
errmsg("aborting recompression due to concurrent updates on "
|
||||
"compressed data, retrying with next policy run")));
|
||||
CommandCounterIncrement();
|
||||
|
||||
if (should_free)
|
||||
@ -577,8 +580,8 @@ finish:
|
||||
*/
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("cannot proceed with recompression due to concurrent DML on uncompressed "
|
||||
"data")));
|
||||
errmsg("aborting recompression due to concurrent DML on uncompressed "
|
||||
"data, retrying with next policy run")));
|
||||
}
|
||||
|
||||
table_close(uncompressed_chunk_rel, NoLock);
|
||||
@ -672,9 +675,8 @@ fetch_uncompressed_chunk_into_tuplesort(Tuplesortstate *tuplesortstate,
|
||||
if (!delete_tuple_for_recompression(uncompressed_chunk_rel, &slot->tts_tid, snapshot))
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
|
||||
errmsg("cannot proceed with recompression due to concurrent updates on "
|
||||
"uncompressed "
|
||||
"data")));
|
||||
errmsg("aborting recompression due to concurrent updates on "
|
||||
"uncompressed data, retrying with next policy run")));
|
||||
}
|
||||
ExecDropSingleTupleTableSlot(slot);
|
||||
table_endscan(scan);
|
||||
|
@ -746,6 +746,21 @@ tts_arrow_copy_heap_tuple(TupleTableSlot *slot)
|
||||
return tuple;
|
||||
}
|
||||
|
||||
#if PG17_GE
|
||||
static bool
|
||||
tts_arrow_is_current_xact_tuple(TupleTableSlot *slot)
|
||||
{
|
||||
ArrowTupleTableSlot *aslot = (ArrowTupleTableSlot *) slot;
|
||||
Assert(!TTS_EMPTY(slot));
|
||||
if (NULL == aslot->child_slot)
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("don't have transaction information in this context")));
|
||||
|
||||
return aslot->child_slot->tts_ops->is_current_xact_tuple(aslot->child_slot);
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Produce a Minimal tuple copy from the slot Datum values.
|
||||
*
|
||||
@ -837,15 +852,20 @@ arrow_slot_set_index_attrs(TupleTableSlot *slot, Bitmapset *attrs)
|
||||
MemoryContextSwitchTo(oldmcxt);
|
||||
}
|
||||
|
||||
const TupleTableSlotOps TTSOpsArrowTuple = { .base_slot_size = sizeof(ArrowTupleTableSlot),
|
||||
.init = tts_arrow_init,
|
||||
.release = tts_arrow_release,
|
||||
.clear = tts_arrow_clear,
|
||||
.getsomeattrs = tts_arrow_getsomeattrs,
|
||||
.getsysattr = tts_arrow_getsysattr,
|
||||
.materialize = tts_arrow_materialize,
|
||||
.copyslot = tts_arrow_copyslot,
|
||||
.get_heap_tuple = NULL,
|
||||
.get_minimal_tuple = NULL,
|
||||
.copy_heap_tuple = tts_arrow_copy_heap_tuple,
|
||||
.copy_minimal_tuple = tts_arrow_copy_minimal_tuple };
|
||||
const TupleTableSlotOps TTSOpsArrowTuple = {
|
||||
.base_slot_size = sizeof(ArrowTupleTableSlot),
|
||||
.init = tts_arrow_init,
|
||||
.release = tts_arrow_release,
|
||||
.clear = tts_arrow_clear,
|
||||
.getsomeattrs = tts_arrow_getsomeattrs,
|
||||
.getsysattr = tts_arrow_getsysattr,
|
||||
.materialize = tts_arrow_materialize,
|
||||
.copyslot = tts_arrow_copyslot,
|
||||
.get_heap_tuple = NULL,
|
||||
.get_minimal_tuple = NULL,
|
||||
.copy_heap_tuple = tts_arrow_copy_heap_tuple,
|
||||
.copy_minimal_tuple = tts_arrow_copy_minimal_tuple,
|
||||
#if PG17_GE
|
||||
.is_current_xact_tuple = tts_arrow_is_current_xact_tuple,
|
||||
#endif
|
||||
};
|
||||
|
@ -677,6 +677,100 @@ SELECT delete_job(:JOB_RECOMPRESS);
|
||||
|
||||
(1 row)
|
||||
|
||||
--TEST 7
|
||||
--compression policy should ignore frozen partially compressed chunks
|
||||
CREATE TABLE test_table_frozen(time TIMESTAMPTZ, val SMALLINT);
|
||||
SELECT create_hypertable('test_table_frozen', 'time', chunk_time_interval => '1 day'::interval);
|
||||
NOTICE: adding not-null constraint to column "time"
|
||||
create_hypertable
|
||||
---------------------------------
|
||||
(18,public,test_table_frozen,t)
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_table_frozen SELECT time, (random()*10)::smallint
|
||||
FROM generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '10 min') AS time;
|
||||
ALTER TABLE test_table_frozen SET (timescaledb.compress);
|
||||
WARNING: there was some uncertainty picking the default segment by for the hypertable: You do not have any indexes on columns that can be used for segment_by and thus we are not using segment_by for compression. Please make sure you are not missing any indexes
|
||||
NOTICE: default segment by for hypertable "test_table_frozen" is set to ""
|
||||
NOTICE: default order by for hypertable "test_table_frozen" is set to ""time" DESC"
|
||||
select add_compression_policy( 'test_table_frozen', compress_after=> '1 day'::interval ) as compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
id | application_name | schedule_interval | max_runtime | max_retries | retry_period | proc_schema | proc_name | owner | scheduled | fixed_schedule | initial_start | hypertable_id | config | check_schema | check_name | timezone
|
||||
------+---------------------------+-------------------+-------------+-------------+--------------+------------------------+--------------------+-------------------+-----------+----------------+---------------+---------------+----------------------------------------------------+------------------------+--------------------------+----------
|
||||
1008 | Compression Policy [1008] | @ 12 hours | @ 0 | -1 | @ 1 hour | _timescaledb_functions | policy_compression | default_perm_user | t | f | | 18 | {"hypertable_id": 18, "compress_after": "@ 1 day"} | _timescaledb_functions | policy_compression_check |
|
||||
(1 row)
|
||||
|
||||
SELECT show_chunks('test_table_frozen') as first_chunk LIMIT 1 \gset
|
||||
--will compress all chunks that need compression
|
||||
CALL run_job(:compressjob_id);
|
||||
-- make the chunks partial
|
||||
INSERT INTO test_table_frozen SELECT time, (random()*10)::smallint
|
||||
FROM generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '10 min') AS time;
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
id | status
|
||||
----+--------
|
||||
69 | 9
|
||||
(1 row)
|
||||
|
||||
-- freeze first chunk
|
||||
SELECT _timescaledb_functions.freeze_chunk(:'first_chunk');
|
||||
freeze_chunk
|
||||
--------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- first chunk status is 1 (Compressed) + 8 (Partially compressed) + 4 (Frozen) = 13
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
id | status
|
||||
----+--------
|
||||
69 | 13
|
||||
(1 row)
|
||||
|
||||
--should recompress all chunks except first since its frozen
|
||||
CALL run_job(:compressjob_id);
|
||||
-- first chunk status is unchanged
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
id | status
|
||||
----+--------
|
||||
69 | 13
|
||||
(1 row)
|
||||
|
||||
-- unfreeze first chunk
|
||||
SELECT _timescaledb_functions.unfreeze_chunk(:'first_chunk');
|
||||
unfreeze_chunk
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- should be able to recompress the chunk since its unfrozen
|
||||
CALL run_job(:compressjob_id);
|
||||
-- first chunk status is Compressed (1)
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
id | status
|
||||
----+--------
|
||||
69 | 1
|
||||
(1 row)
|
||||
|
||||
-- Teardown test
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
REVOKE CREATE ON SCHEMA public FROM NOLOGIN_ROLE;
|
||||
|
@ -114,8 +114,7 @@ where time = '2022-06-01 10:14' and device = 1;
|
||||
-- Add a new policy that doesn't specify hypercore. It should still
|
||||
-- recompress hypercores.
|
||||
select add_compression_policy('readings',
|
||||
compress_after => '1 day'::interval,
|
||||
hypercore_use_access_method => false)
|
||||
compress_after => '1 day'::interval)
|
||||
as compression_job \gset
|
||||
-- Run the policy job again to recompress
|
||||
call run_job(:'compression_job');
|
||||
|
@ -719,3 +719,67 @@ NOTICE: segmentwise recompression is disabled, performing full recompression on
|
||||
_timescaledb_internal._hyper_22_24_chunk
|
||||
(1 row)
|
||||
|
||||
RESET timescaledb.enable_segmentwise_recompression;
|
||||
--- Test behaviour of enable_exclusive_locking_recompression GUC
|
||||
CREATE TABLE exclusive_test(time timestamptz not null, a int, b int, c int);
|
||||
SELECT create_hypertable('exclusive_test', by_range('time', INTERVAL '1 day'));
|
||||
create_hypertable
|
||||
-------------------
|
||||
(24,t)
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE guc_test set (timescaledb.compress, timescaledb.compress_segmentby = 'a, b');
|
||||
INSERT INTO guc_test VALUES ('2024-10-30 14:04:00.501519-06'::timestamptz, 1, 1, 1);
|
||||
SELECT show_chunks as chunk_to_compress FROM show_chunks('guc_test') LIMIT 1 \gset
|
||||
SELECT compress_chunk(:'chunk_to_compress');
|
||||
compress_chunk
|
||||
------------------------------------------
|
||||
_timescaledb_internal._hyper_22_24_chunk
|
||||
(1 row)
|
||||
|
||||
INSERT INTO guc_test VALUES ('2024-10-30 14:14:00.501519-06'::timestamptz, 1, 1, 2);
|
||||
-- Default behavior will try to get exclusive lock at the end of operation
|
||||
-- in order to change the chunk status. Here it will succeed since there
|
||||
-- isn't any concurrent operations.
|
||||
RESET timescaledb.enable_exclusive_locking_recompression;
|
||||
BEGIN;
|
||||
SELECT _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress');
|
||||
recompress_chunk_segmentwise
|
||||
------------------------------------------
|
||||
_timescaledb_internal._hyper_22_24_chunk
|
||||
(1 row)
|
||||
|
||||
SELECT c.relname FROM pg_locks l
|
||||
INNER JOIN pg_class c ON c.oid = l.relation
|
||||
WHERE locktype = 'relation' AND mode = 'ExclusiveLock'
|
||||
ORDER BY 1;
|
||||
relname
|
||||
--------------------
|
||||
_hyper_22_24_chunk
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- If we enable this GUC, it should get exclusive locks on 3 relations:
|
||||
-- uncompressed chunk table, compressed chunk table, and compressed chunk index.
|
||||
-- This is done so that we keep locking consistency to legacy way of locking.
|
||||
SET timescaledb.enable_exclusive_locking_recompression TO ON;
|
||||
BEGIN;
|
||||
SELECT _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress');
|
||||
recompress_chunk_segmentwise
|
||||
------------------------------------------
|
||||
_timescaledb_internal._hyper_22_24_chunk
|
||||
(1 row)
|
||||
|
||||
SELECT c.relname FROM pg_locks l
|
||||
INNER JOIN pg_class c ON c.oid = l.relation
|
||||
WHERE locktype = 'relation' AND mode = 'ExclusiveLock'
|
||||
ORDER BY 1;
|
||||
relname
|
||||
-----------------------------------------------------------------
|
||||
_hyper_22_24_chunk
|
||||
compress_hyper_23_26_chunk
|
||||
compress_hyper_23_26_chunk_a_b__ts_meta_min_1__ts_meta_max__idx
|
||||
(3 rows)
|
||||
|
||||
ROLLBACK;
|
||||
RESET timescaledb.enable_exclusive_locking_recompression;
|
||||
|
@ -1563,7 +1563,7 @@ step I1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step I1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -1652,7 +1652,7 @@ step I1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step I1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -1741,7 +1741,7 @@ step I1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step I1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -1830,7 +1830,7 @@ step Iu1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Iu1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -1919,7 +1919,7 @@ step Iu1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Iu1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -2014,7 +2014,7 @@ step Iu1:
|
||||
<waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Iu1: <... completed>
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -2106,7 +2106,7 @@ step RC:
|
||||
step IN1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 200, 100) ON CONFLICT DO NOTHING; <waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step IN1: <... completed>
|
||||
step INc: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -2192,7 +2192,7 @@ step RC:
|
||||
step INu1: BEGIN; INSERT INTO ts_device_table VALUES (1, 1, 100, 99) ON CONFLICT(time, device) DO UPDATE SET value = 99; <waiting ...>
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step INu1: <... completed>
|
||||
step INc: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
@ -2288,7 +2288,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step I1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2377,7 +2377,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step I1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2466,7 +2466,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step I1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2555,7 +2555,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step Iu1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2650,7 +2650,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step Iu1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2745,7 +2745,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step Iu1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step Ic: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2837,7 +2837,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step IN1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step INc: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
@ -2923,7 +2923,7 @@ step RC:
|
||||
step UnlockChunk: ROLLBACK;
|
||||
step INu1: <... completed>
|
||||
step RC: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step INc: COMMIT;
|
||||
step SH: SELECT total_chunks, number_compressed_chunks from hypertable_compression_stats('ts_device_table');
|
||||
total_chunks|number_compressed_chunks
|
||||
|
@ -155,7 +155,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_insert_do_nothing:
|
||||
INSERT INTO sensor_data
|
||||
VALUES ('2022-01-01 20:00'::timestamptz, 1, 1.0, 1.0), ('2022-01-01 21:00'::timestamptz, 2, 2.0, 2.0)
|
||||
@ -215,7 +215,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_insert_existing_do_nothing:
|
||||
INSERT INTO sensor_data
|
||||
SELECT time, sensor_id, 1.0, 1.0 FROM sensor_data
|
||||
@ -274,7 +274,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -325,7 +325,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -410,7 +410,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -580,7 +580,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_upsert:
|
||||
INSERT INTO sensor_data
|
||||
VALUES ('2022-01-01 20:00'::timestamptz, 100, 9999, 9999), ('2022-01-01 21:00'::timestamptz, 101, 9999, 9999)
|
||||
@ -644,7 +644,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_upsert_existing:
|
||||
INSERT INTO sensor_data
|
||||
SELECT time, sensor_id, 9999, 9999 FROM sensor_data
|
||||
@ -711,7 +711,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -770,7 +770,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -849,7 +849,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_wait_for_finish:
|
||||
|
||||
step s2_commit:
|
||||
@ -1136,7 +1136,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_delete_compressed:
|
||||
DELETE FROM sensor_data WHERE sensor_id = 1;
|
||||
|
||||
@ -1190,7 +1190,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_delete_uncompressed:
|
||||
DELETE FROM sensor_data WHERE sensor_id = 11;
|
||||
|
||||
@ -1262,7 +1262,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_delete_recompressed:
|
||||
DELETE FROM sensor_data WHERE sensor_id = 5 AND time > '2022-01-01 01:00'::timestamptz;
|
||||
|
||||
@ -1317,7 +1317,7 @@ step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
step s1_recompress_chunk: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent updates on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent updates on uncompressed data, retrying with next policy run
|
||||
step s1_show_chunk_state:
|
||||
SELECT status FROM _timescaledb_catalog.chunk WHERE compressed_chunk_id IS NOT NULL;
|
||||
SELECT count(*) FROM sensor_data;
|
||||
@ -1359,7 +1359,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
@ -1428,7 +1428,7 @@ step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
step s1_recompress_chunk: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent updates on compressed data
|
||||
ERROR: aborting recompression due to concurrent updates on compressed data, retrying with next policy run
|
||||
step s1_show_chunk_state:
|
||||
SELECT status FROM _timescaledb_catalog.chunk WHERE compressed_chunk_id IS NOT NULL;
|
||||
SELECT count(*) FROM sensor_data;
|
||||
@ -1651,7 +1651,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_update_compressed:
|
||||
UPDATE sensor_data SET cpu = 9999 WHERE sensor_id = 1;
|
||||
|
||||
@ -1713,7 +1713,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_update_uncompressed:
|
||||
UPDATE sensor_data SET cpu = 9999 WHERE sensor_id = 11;
|
||||
|
||||
@ -1779,7 +1779,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_update_recompressed:
|
||||
UPDATE sensor_data SET cpu = 9999 WHERE sensor_id = 5 AND time > '2022-01-01 01:00'::timestamptz;
|
||||
|
||||
@ -1842,7 +1842,7 @@ step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
step s1_recompress_chunk: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent updates on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent updates on uncompressed data, retrying with next policy run
|
||||
step s1_show_chunk_state:
|
||||
SELECT status FROM _timescaledb_catalog.chunk WHERE compressed_chunk_id IS NOT NULL;
|
||||
SELECT count(*) FROM sensor_data;
|
||||
@ -1892,7 +1892,7 @@ step s1_recompress_chunk:
|
||||
FROM show_chunks('sensor_data') i
|
||||
LIMIT 1;
|
||||
|
||||
ERROR: cannot proceed with recompression due to concurrent DML on uncompressed data
|
||||
ERROR: aborting recompression due to concurrent DML on uncompressed data, retrying with next policy run
|
||||
step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
@ -1969,7 +1969,7 @@ step s2_commit:
|
||||
COMMIT;
|
||||
|
||||
step s1_recompress_chunk: <... completed>
|
||||
ERROR: cannot proceed with recompression due to concurrent updates on compressed data
|
||||
ERROR: aborting recompression due to concurrent updates on compressed data, retrying with next policy run
|
||||
step s1_show_chunk_state:
|
||||
SELECT status FROM _timescaledb_catalog.chunk WHERE compressed_chunk_id IS NOT NULL;
|
||||
SELECT count(*) FROM sensor_data;
|
||||
|
@ -216,6 +216,69 @@ WHERE hypertable_name = 'conditions' and is_compressed = true;
|
||||
|
||||
\i include/recompress_basic.sql
|
||||
|
||||
--TEST 7
|
||||
--compression policy should ignore frozen partially compressed chunks
|
||||
CREATE TABLE test_table_frozen(time TIMESTAMPTZ, val SMALLINT);
|
||||
SELECT create_hypertable('test_table_frozen', 'time', chunk_time_interval => '1 day'::interval);
|
||||
|
||||
INSERT INTO test_table_frozen SELECT time, (random()*10)::smallint
|
||||
FROM generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '10 min') AS time;
|
||||
|
||||
ALTER TABLE test_table_frozen SET (timescaledb.compress);
|
||||
select add_compression_policy( 'test_table_frozen', compress_after=> '1 day'::interval ) as compressjob_id \gset
|
||||
SELECT * FROM _timescaledb_config.bgw_job WHERE id = :compressjob_id;
|
||||
SELECT show_chunks('test_table_frozen') as first_chunk LIMIT 1 \gset
|
||||
|
||||
--will compress all chunks that need compression
|
||||
CALL run_job(:compressjob_id);
|
||||
|
||||
-- make the chunks partial
|
||||
INSERT INTO test_table_frozen SELECT time, (random()*10)::smallint
|
||||
FROM generate_series('2018-12-01 00:00'::timestamp, '2018-12-31 00:00'::timestamp, '10 min') AS time;
|
||||
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
|
||||
-- freeze first chunk
|
||||
SELECT _timescaledb_functions.freeze_chunk(:'first_chunk');
|
||||
|
||||
-- first chunk status is 1 (Compressed) + 8 (Partially compressed) + 4 (Frozen) = 13
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
|
||||
--should recompress all chunks except first since its frozen
|
||||
CALL run_job(:compressjob_id);
|
||||
|
||||
-- first chunk status is unchanged
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
|
||||
-- unfreeze first chunk
|
||||
SELECT _timescaledb_functions.unfreeze_chunk(:'first_chunk');
|
||||
|
||||
-- should be able to recompress the chunk since its unfrozen
|
||||
CALL run_job(:compressjob_id);
|
||||
|
||||
-- first chunk status is Compressed (1)
|
||||
SELECT c.id, c.status
|
||||
FROM _timescaledb_catalog.chunk c
|
||||
INNER JOIN _timescaledb_catalog.hypertable h on (h.id = c.hypertable_id)
|
||||
WHERE h.table_name = 'test_table_frozen'
|
||||
ORDER BY c.id
|
||||
LIMIT 1;
|
||||
|
||||
-- Teardown test
|
||||
\c :TEST_DBNAME :ROLE_SUPERUSER
|
||||
REVOKE CREATE ON SCHEMA public FROM NOLOGIN_ROLE;
|
||||
|
@ -75,8 +75,7 @@ where time = '2022-06-01 10:14' and device = 1;
|
||||
-- Add a new policy that doesn't specify hypercore. It should still
|
||||
-- recompress hypercores.
|
||||
select add_compression_policy('readings',
|
||||
compress_after => '1 day'::interval,
|
||||
hypercore_use_access_method => false)
|
||||
compress_after => '1 day'::interval)
|
||||
as compression_job \gset
|
||||
|
||||
-- Run the policy job again to recompress
|
||||
|
@ -346,4 +346,41 @@ SELECT _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress')
|
||||
-- When GUC is OFF, entire chunk should be fully uncompressed and compressed instead
|
||||
SELECT compress_chunk(:'chunk_to_compress');
|
||||
|
||||
RESET timescaledb.enable_segmentwise_recompression;
|
||||
|
||||
--- Test behaviour of enable_exclusive_locking_recompression GUC
|
||||
CREATE TABLE exclusive_test(time timestamptz not null, a int, b int, c int);
|
||||
SELECT create_hypertable('exclusive_test', by_range('time', INTERVAL '1 day'));
|
||||
|
||||
ALTER TABLE guc_test set (timescaledb.compress, timescaledb.compress_segmentby = 'a, b');
|
||||
INSERT INTO guc_test VALUES ('2024-10-30 14:04:00.501519-06'::timestamptz, 1, 1, 1);
|
||||
SELECT show_chunks as chunk_to_compress FROM show_chunks('guc_test') LIMIT 1 \gset
|
||||
SELECT compress_chunk(:'chunk_to_compress');
|
||||
|
||||
INSERT INTO guc_test VALUES ('2024-10-30 14:14:00.501519-06'::timestamptz, 1, 1, 2);
|
||||
|
||||
-- Default behavior will try to get exclusive lock at the end of operation
|
||||
-- in order to change the chunk status. Here it will succeed since there
|
||||
-- isn't any concurrent operations.
|
||||
RESET timescaledb.enable_exclusive_locking_recompression;
|
||||
BEGIN;
|
||||
SELECT _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress');
|
||||
SELECT c.relname FROM pg_locks l
|
||||
INNER JOIN pg_class c ON c.oid = l.relation
|
||||
WHERE locktype = 'relation' AND mode = 'ExclusiveLock'
|
||||
ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
-- If we enable this GUC, it should get exclusive locks on 3 relations:
|
||||
-- uncompressed chunk table, compressed chunk table, and compressed chunk index.
|
||||
-- This is done so that we keep locking consistency to legacy way of locking.
|
||||
SET timescaledb.enable_exclusive_locking_recompression TO ON;
|
||||
BEGIN;
|
||||
SELECT _timescaledb_functions.recompress_chunk_segmentwise(:'chunk_to_compress');
|
||||
SELECT c.relname FROM pg_locks l
|
||||
INNER JOIN pg_class c ON c.oid = l.relation
|
||||
WHERE locktype = 'relation' AND mode = 'ExclusiveLock'
|
||||
ORDER BY 1;
|
||||
ROLLBACK;
|
||||
|
||||
RESET timescaledb.enable_exclusive_locking_recompression;
|
||||
|
Loading…
x
Reference in New Issue
Block a user