Rework compression activity wal markers

- Adds WAL markers around all compression and decompression activities.
- Renames the GUC controlling this behaviour.
- Enables the WAL marker GUC by default.

This allows to distinguish between "user-driven" and
"compression-driven" DML on uncompressed chunks. This is a requirement
to be able to support DML on compressed chunks in live migration.

Note: A previous commit [1] added wal markers before and after inserts
which were part of "transparent decompression". Transparent
decompression is triggered when an UPDATE or DELETE statement affects
compressed data, or an INSERT statment inserts into a range of
compressed data which has a unique or primary key constraint. In these
cases, the data is first moved from the compressed chunk to the
uncompressed chunk, and then the DML is applied.

This change extends the existing behaviour on two fronts:

1. It adds WAL markers for both chunk compression and decompression
   events.
2. It extends the WAL markers for transparent decompression to include
   not only INSERTs into the compressed chunk, but also to TimescaleDB
   catalog operations which were part of the decompression.

[1]: b5b46a3e581b222f679c2d4aa15944646d8190d9
This commit is contained in:
James Guthrie 2024-05-09 00:32:57 +02:00 committed by Fabrízio de Royes Mello
parent 70bdbaf499
commit 30666055b9
9 changed files with 321 additions and 220 deletions

View File

@ -108,12 +108,14 @@ jobs:
make -j $MAKE_JOBS make -j $MAKE_JOBS
make -j $MAKE_JOBS -C src/test/isolation make -j $MAKE_JOBS -C src/test/isolation
make -j $MAKE_JOBS -C contrib/postgres_fdw make -j $MAKE_JOBS -C contrib/postgres_fdw
make -j $MAKE_JOBS -C contrib/test_decoding
- name: Install PostgreSQL ${{ matrix.pg }} - name: Install PostgreSQL ${{ matrix.pg }}
run: | run: |
useradd postgres useradd postgres
make -C ~/$PG_SRC_DIR install make -C ~/$PG_SRC_DIR install
make -C ~/$PG_SRC_DIR/contrib/postgres_fdw install make -C ~/$PG_SRC_DIR/contrib/postgres_fdw install
make -C ~/$PG_SRC_DIR/contrib/test_decoding install
chown -R postgres:postgres $HOME/$PG_INSTALL_DIR chown -R postgres:postgres $HOME/$PG_INSTALL_DIR
sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen
locale-gen locale-gen

View File

@ -127,11 +127,13 @@ jobs:
make -j $MAKE_JOBS make -j $MAKE_JOBS
make -j $MAKE_JOBS -C src/test/isolation make -j $MAKE_JOBS -C src/test/isolation
make -j $MAKE_JOBS -C contrib/postgres_fdw make -j $MAKE_JOBS -C contrib/postgres_fdw
make -j $MAKE_JOBS -C contrib/test_decoding
- name: Install PostgreSQL ${{ matrix.pg }} - name: Install PostgreSQL ${{ matrix.pg }}
run: | run: |
make -C ~/$PG_SRC_DIR install make -C ~/$PG_SRC_DIR install
make -C ~/$PG_SRC_DIR/contrib/postgres_fdw install make -C ~/$PG_SRC_DIR/contrib/postgres_fdw install
make -C ~/$PG_SRC_DIR/contrib/test_decoding install
echo "$HOME/$PG_INSTALL_DIR/bin" >> "${GITHUB_PATH}" echo "$HOME/$PG_INSTALL_DIR/bin" >> "${GITHUB_PATH}"
- name: Upload config.log - name: Upload config.log

1
.unreleased/pr_6920 Normal file
View File

@ -0,0 +1 @@
Implements: #6920 Rework compression activity wal markers

View File

@ -71,7 +71,7 @@ bool ts_guc_enable_osm_reads = true;
TSDLLEXPORT bool ts_guc_enable_dml_decompression = true; TSDLLEXPORT bool ts_guc_enable_dml_decompression = true;
TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000; TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml = 100000;
TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true; TSDLLEXPORT bool ts_guc_enable_transparent_decompression = true;
TSDLLEXPORT bool ts_guc_enable_decompression_logrep_markers = false; TSDLLEXPORT bool ts_guc_enable_compression_wal_markers = false;
TSDLLEXPORT bool ts_guc_enable_decompression_sorted_merge = true; TSDLLEXPORT bool ts_guc_enable_decompression_sorted_merge = true;
bool ts_guc_enable_chunkwise_aggregation = true; bool ts_guc_enable_chunkwise_aggregation = true;
bool ts_guc_enable_vectorized_aggregation = true; bool ts_guc_enable_vectorized_aggregation = true;
@ -476,13 +476,12 @@ _guc_init(void)
NULL, NULL,
NULL); NULL);
DefineCustomBoolVariable(MAKE_EXTOPTION("enable_decompression_logrep_markers"), DefineCustomBoolVariable(MAKE_EXTOPTION("enable_compression_wal_markers"),
"Enable logical replication markers for decompression ops", "Enable WAL markers for compression ops",
"Enable the generation of logical replication markers in the " "Enable the generation of markers in the WAL stream which mark the "
"WAL stream to mark the start and end of decompressions (for insert, " "start and end of compression operations",
"update, and delete operations)", &ts_guc_enable_compression_wal_markers,
&ts_guc_enable_decompression_logrep_markers, true,
false,
PGC_SIGHUP, PGC_SIGHUP,
0, 0,
NULL, NULL,

View File

@ -31,7 +31,7 @@ extern bool ts_guc_enable_osm_reads;
extern TSDLLEXPORT bool ts_guc_enable_dml_decompression; extern TSDLLEXPORT bool ts_guc_enable_dml_decompression;
extern TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml; extern TSDLLEXPORT int ts_guc_max_tuples_decompressed_per_dml;
extern TSDLLEXPORT bool ts_guc_enable_transparent_decompression; extern TSDLLEXPORT bool ts_guc_enable_transparent_decompression;
extern TSDLLEXPORT bool ts_guc_enable_decompression_logrep_markers; extern TSDLLEXPORT bool ts_guc_enable_compression_wal_markers;
extern TSDLLEXPORT bool ts_guc_enable_decompression_sorted_merge; extern TSDLLEXPORT bool ts_guc_enable_decompression_sorted_merge;
extern TSDLLEXPORT bool ts_guc_enable_skip_scan; extern TSDLLEXPORT bool ts_guc_enable_skip_scan;
extern TSDLLEXPORT bool ts_guc_enable_chunkwise_aggregation; extern TSDLLEXPORT bool ts_guc_enable_chunkwise_aggregation;

View File

@ -36,7 +36,6 @@
#include "debug_point.h" #include "debug_point.h"
#include "error_utils.h" #include "error_utils.h"
#include "errors.h" #include "errors.h"
#include "guc.h"
#include "hypercube.h" #include "hypercube.h"
#include "hypertable.h" #include "hypertable.h"
#include "hypertable_cache.h" #include "hypertable_cache.h"
@ -48,6 +47,7 @@
#include "ts_catalog/compression_settings.h" #include "ts_catalog/compression_settings.h"
#include "ts_catalog/continuous_agg.h" #include "ts_catalog/continuous_agg.h"
#include "utils.h" #include "utils.h"
#include "wal_utils.h"
typedef struct CompressChunkCxt typedef struct CompressChunkCxt
{ {
@ -541,6 +541,8 @@ decompress_chunk_impl(Chunk *uncompressed_chunk, bool if_compressed)
return; return;
} }
write_logical_replication_msg_decompression_start();
ts_chunk_validate_chunk_status_for_operation(uncompressed_chunk, CHUNK_DECOMPRESS, true); ts_chunk_validate_chunk_status_for_operation(uncompressed_chunk, CHUNK_DECOMPRESS, true);
compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true); compressed_chunk = ts_chunk_get_by_id(uncompressed_chunk->fd.compressed_chunk_id, true);
@ -610,6 +612,7 @@ decompress_chunk_impl(Chunk *uncompressed_chunk, bool if_compressed)
LockRelationOid(compressed_chunk->table_id, AccessExclusiveLock); LockRelationOid(compressed_chunk->table_id, AccessExclusiveLock);
ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1); ts_chunk_drop(compressed_chunk, DROP_RESTRICT, -1);
ts_cache_release(hcache); ts_cache_release(hcache);
write_logical_replication_msg_decompression_end();
} }
/* /*
@ -707,6 +710,8 @@ tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress
{ {
Oid uncompressed_chunk_id = chunk->table_id; Oid uncompressed_chunk_id = chunk->table_id;
write_logical_replication_msg_compression_start();
if (ts_chunk_is_compressed(chunk)) if (ts_chunk_is_compressed(chunk))
{ {
if (recompress) if (recompress)
@ -720,11 +725,13 @@ tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress
{ {
decompress_chunk_impl(chunk, false); decompress_chunk_impl(chunk, false);
compress_chunk_impl(chunk->hypertable_relid, chunk->table_id); compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
write_logical_replication_msg_compression_end();
return uncompressed_chunk_id; return uncompressed_chunk_id;
} }
} }
if (!ts_chunk_needs_recompression(chunk)) if (!ts_chunk_needs_recompression(chunk))
{ {
write_logical_replication_msg_compression_end();
ereport((if_not_compressed ? NOTICE : ERROR), ereport((if_not_compressed ? NOTICE : ERROR),
(errcode(ERRCODE_DUPLICATE_OBJECT), (errcode(ERRCODE_DUPLICATE_OBJECT),
errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id)))); errmsg("chunk \"%s\" is already compressed", get_rel_name(chunk->table_id))));
@ -746,6 +753,8 @@ tsl_compress_chunk_wrapper(Chunk *chunk, bool if_not_compressed, bool recompress
uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id); uncompressed_chunk_id = compress_chunk_impl(chunk->hypertable_relid, chunk->table_id);
} }
write_logical_replication_msg_compression_end();
return uncompressed_chunk_id; return uncompressed_chunk_id;
} }

View File

@ -27,7 +27,6 @@
#include <optimizer/optimizer.h> #include <optimizer/optimizer.h>
#include <parser/parse_coerce.h> #include <parser/parse_coerce.h>
#include <parser/parsetree.h> #include <parser/parsetree.h>
#include <replication/message.h>
#include <storage/lmgr.h> #include <storage/lmgr.h>
#include <storage/predicate.h> #include <storage/predicate.h>
#include <utils/builtins.h> #include <utils/builtins.h>
@ -65,6 +64,7 @@
#include "ts_catalog/catalog.h" #include "ts_catalog/catalog.h"
#include "ts_catalog/compression_chunk_size.h" #include "ts_catalog/compression_chunk_size.h"
#include "ts_catalog/compression_settings.h" #include "ts_catalog/compression_settings.h"
#include "wal_utils.h"
StaticAssertDecl(GLOBAL_MAX_ROWS_PER_COMPRESSION >= TARGET_COMPRESSED_BATCH_SIZE, StaticAssertDecl(GLOBAL_MAX_ROWS_PER_COMPRESSION >= TARGET_COMPRESSED_BATCH_SIZE,
"max row numbers must be harmonized"); "max row numbers must be harmonized");
@ -78,33 +78,6 @@ static const CompressionAlgorithmDefinition definitions[_END_COMPRESSION_ALGORIT
[COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION, [COMPRESSION_ALGORITHM_DELTADELTA] = DELTA_DELTA_ALGORITHM_DEFINITION,
}; };
/* The prefix of a logical replication message which is inserted into the
* replication stream right before decompression inserts are happening
*/
#define DECOMPRESSION_MARKER_START "::timescaledb-decompression-start"
/* The prefix of a logical replication message which is inserted into the
* replication stream right after all decompression inserts have finished
*/
#define DECOMPRESSION_MARKER_END "::timescaledb-decompression-end"
static inline void
write_logical_replication_msg_decompression_start()
{
if (ts_guc_enable_decompression_logrep_markers && XLogLogicalInfoActive())
{
LogLogicalMessage(DECOMPRESSION_MARKER_START, "", 0, true);
}
}
static inline void
write_logical_replication_msg_decompression_end()
{
if (ts_guc_enable_decompression_logrep_markers && XLogLogicalInfoActive())
{
LogLogicalMessage(DECOMPRESSION_MARKER_END, "", 0, true);
}
}
static Compressor * static Compressor *
compressor_for_type(Oid type) compressor_for_type(Oid type)
{ {
@ -2354,7 +2327,6 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
write_logical_replication_msg_decompression_start(); write_logical_replication_msg_decompression_start();
row_decompressor_decompress_row_to_table(&decompressor); row_decompressor_decompress_row_to_table(&decompressor);
write_logical_replication_msg_decompression_end();
TM_FailureData tmfd; TM_FailureData tmfd;
TM_Result result pg_attribute_unused(); TM_Result result pg_attribute_unused();
@ -2366,6 +2338,9 @@ decompress_batches_for_insert(const ChunkInsertState *cis, TupleTableSlot *slot)
true, true,
&tmfd, &tmfd,
false); false);
write_logical_replication_msg_decompression_end();
Assert(result == TM_Ok); Assert(result == TM_Ok);
Assert(cis->cds != NULL); Assert(cis->cds != NULL);
@ -3255,7 +3230,6 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
is_null, is_null,
&chunk_status_changed); &chunk_status_changed);
} }
write_logical_replication_msg_decompression_end();
/* /*
* tuples from compressed chunk has been decompressed and moved * tuples from compressed chunk has been decompressed and moved
@ -3264,6 +3238,8 @@ decompress_batches_for_update_delete(HypertableModifyState *ht_state, Chunk *chu
if (chunk_status_changed == true) if (chunk_status_changed == true)
ts_chunk_set_partial(chunk); ts_chunk_set_partial(chunk);
write_logical_replication_msg_decompression_end();
row_decompressor_close(&decompressor); row_decompressor_close(&decompressor);
table_close(chunk_rel, NoLock); table_close(chunk_rel, NoLock);

View File

@ -0,0 +1,66 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once
#include <postgres.h>
#include <replication/message.h>
#include "guc.h"
/*
* Utils to insert markers into the WAL log which demarcate compression
* and decompression operations.
* The primary purpose is to be able to discern between user-driven DML
* operations (caused by statements which INSERT/UPDATE/DELETE data), and
* compression-driven DML (moving data to/from compressed chunks).
*/
#define COMPRESSION_MARKER_START "::timescaledb-compression-start"
#define COMPRESSION_MARKER_END "::timescaledb-compression-end"
#define DECOMPRESSION_MARKER_START "::timescaledb-decompression-start"
#define DECOMPRESSION_MARKER_END "::timescaledb-decompression-end"
static inline bool
is_compression_wal_markers_enabled()
{
return ts_guc_enable_compression_wal_markers && XLogLogicalInfoActive();
}
static inline void
write_logical_replication_msg_compression_start()
{
if (is_compression_wal_markers_enabled())
{
LogLogicalMessage(COMPRESSION_MARKER_START, "", 0, true);
}
}
static inline void
write_logical_replication_msg_compression_end()
{
if (is_compression_wal_markers_enabled())
{
LogLogicalMessage(COMPRESSION_MARKER_END, "", 0, true);
}
}
static inline void
write_logical_replication_msg_decompression_start()
{
if (is_compression_wal_markers_enabled())
{
LogLogicalMessage(DECOMPRESSION_MARKER_START, "", 0, true);
}
}
static inline void
write_logical_replication_msg_decompression_end()
{
if (is_compression_wal_markers_enabled())
{
LogLogicalMessage(DECOMPRESSION_MARKER_END, "", 0, true);
}
}

View File

@ -10,216 +10,262 @@ use Test::More;
plan skip_all => "PostgreSQL version < 14" if $ENV{PG_VERSION_MAJOR} < 14; plan skip_all => "PostgreSQL version < 14" if $ENV{PG_VERSION_MAJOR} < 14;
# This test checks the creation of logical replication messages # This test checks the creation of logical replication messages
# used to mark the start and end of inserts happening as a result # used to mark the start and end of activity happening as a result
# of a (partial) decompression. # compression/decompression.
# Publishing node # Publishing node
my $publisher = my $db = TimescaleNode->create('publisher', allows_streaming => 'logical');
TimescaleNode->create('publisher', allows_streaming => 'logical');
# Subscribing node sub run_queries
my $subscriber = {
TimescaleNode->create('subscriber', allows_streaming => 'logical'); foreach my $query (@_)
{
$db->safe_psql('postgres', $query);
}
}
# Setup test structures sub query_generates_wal
$publisher->safe_psql( {
'postgres', my $description = shift @_;
qq( my $query = shift @_;
CREATE TABLE test (ts timestamptz NOT NULL PRIMARY KEY , val INT); my $expected_wal = shift @_;
SELECT create_hypertable('test', 'ts', chunk_time_interval := INTERVAL '1day');
)
);
# To kick off replication we need to fake the setup of a hypertable $db->safe_psql('postgres', $query);
$subscriber->safe_psql('postgres', my $result = $db->safe_psql('postgres',
"CREATE TABLE _timescaledb_internal._hyper_1_1_chunk (ts timestamptz NOT NULL PRIMARY KEY , val INT)" qq/SELECT data FROM pg_logical_slot_get_changes('slot', NULL, NULL);/
);
# Initial data insert and preparation of the internal chunk tables
$publisher->safe_psql(
'postgres',
qq(
INSERT INTO test
SELECT s.s, (random() * 100)::INT
FROM generate_series('2023-01-01'::timestamptz, '2023-01-02'::timestamptz, INTERVAL '3 hour') s;
)
);
# Setup logical replication
my $publisher_connstr = $publisher->connstr . ' dbname=postgres';
$publisher->safe_psql('postgres',
"CREATE PUBLICATION tap_pub FOR TABLE _timescaledb_internal._hyper_1_1_chunk"
);
$subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (binary = true)"
);
# Wait for catchup and disable consumption of additional messages
$publisher->wait_for_catchup('tap_sub');
$subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
$publisher->poll_query_until(
'postgres',
"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'",
1);
# Enable marker generation through GUC
$publisher->append_conf('postgresql.conf',
'timescaledb.enable_decompression_logrep_markers=true');
$publisher->reload();
# Compress chunks and consume replication stream explicitly
$publisher->safe_psql(
'postgres',
qq(
ALTER TABLE test SET (timescaledb.compress);
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);
)
);
$publisher->safe_psql(
'postgres',
qq(
SELECT pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true'
); );
) $result =~ s/\s+creation_time.*//g
); ; # creation_time is non-deterministic, so strip it from wal (it's always the final column)
$result =~ s/(BEGIN|COMMIT) \d+/$1/g
; # BEGIN/COMMIT statements contain the statement number, which we don't care about
# Create a new entry which forces a decompression to happen is($result, $expected_wal, $description);
$publisher->safe_psql('postgres', }
"INSERT INTO test VALUES ('2023-01-01 00:10:00', 5555)");
# Retrieve the replication log messages sub discard_wal
my $result = $publisher->safe_psql( {
'postgres', $db->safe_psql('postgres',
qq( qq/SELECT data FROM pg_logical_slot_get_changes('slot', NULL, NULL);/
SELECT get_byte(data, 0)
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
'proto_version', '1',
'publication_names', 'tap_pub',
'messages', 'true'
); );
) }
# disable background jobs because they can scribble in our WAL
run_queries(
qq/
SELECT public.alter_job(id::integer, scheduled=>false) FROM _timescaledb_config.bgw_job;
/
); );
# Test: BEGIN, MESSAGE (start marker), RELATION, ... INSERT (decompression inserts x6) ..., MESSAGE (end marker), INSERT, COMMIT run_queries(
is( $result, qq/
qq(66 CREATE TABLE metrics(time timestamptz, device_id bigint, value float8);
77 SELECT create_hypertable('metrics', 'time');
82 ALTER TABLE metrics REPLICA IDENTITY FULL;
73 ALTER TABLE metrics SET (timescaledb.compress = true, timescaledb.compress_orderby = 'time', timescaledb.compress_segmentby = 'device_id');
73 SELECT 'init' FROM pg_create_logical_replication_slot('slot', 'test_decoding');
73 /
73
73
73
77
73
67),
'messages on slot meet expectation <<BEGIN, MESSAGE (start marker), RELATION, ... INSERT (decompression inserts x6) ..., MESSAGE (end marker), INSERT, COMMIT>>'
); );
# Get initial message entry query_generates_wal(
$result = $publisher->safe_psql( "insert into plain chunk",
'postgres', qq/INSERT INTO metrics VALUES ('2023-07-01T00:00:00Z', 1, 1.0);/,
qq( qq(BEGIN
SELECT get_byte(data, 1), encode(substr(data, 11, 33), 'escape') table _timescaledb_catalog.dimension_slice: INSERT: id[integer]:1 dimension_id[integer]:1 range_start[bigint]:1687996800000000 range_end[bigint]:1688601600000000
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, table _timescaledb_catalog.chunk: INSERT: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:null dropped[boolean]:false status[integer]:0 osm_chunk[boolean]:false
'proto_version', '1', table _timescaledb_catalog.chunk_constraint: INSERT: chunk_id[integer]:1 dimension_slice_id[integer]:1 constraint_name[name]:'constraint_1' hypertable_constraint_name[name]:null
'publication_names', 'tap_pub', table _timescaledb_catalog.chunk_index: INSERT: chunk_id[integer]:1 index_name[name]:'_hyper_1_1_chunk_metrics_time_idx' hypertable_id[integer]:1 hypertable_index_name[name]:'metrics_time_idx'
'messages', 'true' table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1
) COMMIT)
OFFSET 1 LIMIT 1;
)
); );
is( $result,
qq(1|::timescaledb-decompression-start),
'first entry is decompression marker start message');
# Get second message entry query_generates_wal(
$result = $publisher->safe_psql( "compress chunk",
'postgres', qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq( qq(BEGIN
SELECT get_byte(data, 1), encode(substr(data, 11, 31), 'escape') message: transactional: 1 prefix: ::timescaledb-compression-start, sz: 0 content:
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, table _timescaledb_catalog.chunk: INSERT: id[integer]:2 hypertable_id[integer]:2 schema_name[name]:'_timescaledb_internal' table_name[name]:'compress_hyper_2_2_chunk' compressed_chunk_id[integer]:null dropped[boolean]:false status[integer]:0 osm_chunk[boolean]:false
'proto_version', '1', table _timescaledb_catalog.compression_settings: INSERT: relid[regclass]:'_timescaledb_internal.compress_hyper_2_2_chunk' segmentby[text[]]:'{device_id}' orderby[text[]]:'{time}' orderby_desc[boolean[]]:'{f}' orderby_nullsfirst[boolean[]]:'{f}'
'publication_names', 'tap_pub', table _timescaledb_internal.compress_hyper_2_2_chunk: INSERT: _ts_meta_count[integer]:1 _ts_meta_sequence_num[integer]:10 device_id[bigint]:1 _ts_meta_min_1[timestamp with time zone]:'2023-06-30 17:00:00-07' _ts_meta_max_1[timestamp with time zone]:'2023-06-30 17:00:00-07' "time"[_timescaledb_internal.compressed_data]:'BAAAAqJgYhxAAAAComBiHEAAAAAAAQAAAAEAAAAAAAAADgAFRMDEOIAA' value[_timescaledb_internal.compressed_data]:'AwA/8AAAAAAAAAAAAAEAAAABAAAAAAAAAAEAAAAAAAAAAQAAAAEAAAABAAAAAAAAAAEAAAAAAAAAAQAAAAEGAAAAAAAAAAIAAAABAAAAAQAAAAAAAAAEAAAAAAAAAAoAAAABCgAAAAAAAAP/'
'messages', 'true' table _timescaledb_catalog.compression_chunk_size: INSERT: chunk_id[integer]:1 compressed_chunk_id[integer]:2 uncompressed_heap_size[bigint]:8192 uncompressed_toast_size[bigint]:0 uncompressed_index_size[bigint]:16384 compressed_heap_size[bigint]:16384 compressed_toast_size[bigint]:8192 compressed_index_size[bigint]:16384 numrows_pre_compression[bigint]:1 numrows_post_compression[bigint]:1 numrows_frozen_immediately[bigint]:1
) table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:2 dropped[boolean]:false status[integer]:1 osm_chunk[boolean]:false
OFFSET 9 LIMIT 1; message: transactional: 1 prefix: ::timescaledb-compression-end, sz: 0 content:
) COMMIT)
); );
is( $result,
qq(1|::timescaledb-decompression-end),
'10th entry is decompression marker end message');
# Get last insert entry to check it is the user executed insert (and value is 5555 or 35353535 in hex) query_generates_wal(
$result = $publisher->safe_psql( "decompress chunk",
'postgres', qq(SELECT decompress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq( qq(BEGIN
SELECT get_byte(data, 0), encode(substring(data from 41 for 44), 'hex') message: transactional: 1 prefix: ::timescaledb-decompression-start, sz: 0 content:
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1
'proto_version', '1', table _timescaledb_catalog.compression_chunk_size: DELETE: chunk_id[integer]:1
'publication_names', 'tap_pub', table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:null dropped[boolean]:false status[integer]:0 osm_chunk[boolean]:false
'messages', 'true' table _timescaledb_catalog.compression_settings: DELETE: relid[regclass]:'_timescaledb_internal.compress_hyper_2_2_chunk'
) table _timescaledb_catalog.chunk: DELETE: id[integer]:2
OFFSET 10 LIMIT 1; message: transactional: 1 prefix: ::timescaledb-decompression-end, sz: 0 content:
) COMMIT)
);
run_queries(
qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);)
);
discard_wal();
query_generates_wal(
"recompress chunk (NOOP)",
qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq(BEGIN
message: transactional: 1 prefix: ::timescaledb-compression-start, sz: 0 content:
message: transactional: 1 prefix: ::timescaledb-compression-end, sz: 0 content:
COMMIT),);
query_generates_wal(
"insert into uncompressed chunk",
qq(INSERT INTO metrics VALUES ('2023-07-01T01:00:00Z', 1, 1.0);),
qq(BEGIN
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 18:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:3 dropped[boolean]:false status[integer]:9 osm_chunk[boolean]:false
COMMIT),);
query_generates_wal(
"recompress chunk",
qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq(BEGIN
message: transactional: 1 prefix: ::timescaledb-compression-start, sz: 0 content:
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:3 dropped[boolean]:false status[integer]:1 osm_chunk[boolean]:false
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 18:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal.compress_hyper_2_3_chunk: DELETE: (no-tuple-data)
table _timescaledb_internal.compress_hyper_2_3_chunk: INSERT: _ts_meta_count[integer]:2 _ts_meta_sequence_num[integer]:10 device_id[bigint]:1 _ts_meta_min_1[timestamp with time zone]:'2023-06-30 17:00:00-07' _ts_meta_max_1[timestamp with time zone]:'2023-06-30 18:00:00-07' "time"[_timescaledb_internal.compressed_data]:'BAAAAqJhOK/kAAAAAADWk6QAAAAAAgAAAAIAAAAAAAAA7gAFRMDEOIAAAAVEvxcRN/8=' value[_timescaledb_internal.compressed_data]:'AwA/8AAAAAAAAAAAAAIAAAABAAAAAAAAAAEAAAAAAAAAAwAAAAIAAAABAAAAAAAAAAEAAAAAAAAAAwAAAAEMAAAAAAAAD8IAAAACAAAAAQAAAAAAAAAEAAAAAAAAAAoAAAABCgAAAAAAAAP/'
message: transactional: 1 prefix: ::timescaledb-compression-end, sz: 0 content:
COMMIT)
);
query_generates_wal(
"insert into uncompressed chunk 2",
qq(INSERT INTO metrics VALUES ('2023-07-01T02:00:00Z', 1, 1.0);),
qq(BEGIN
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 19:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:3 dropped[boolean]:false status[integer]:9 osm_chunk[boolean]:false
COMMIT),);
query_generates_wal(
"update rows in uncompressed and compressed chunk",
qq(UPDATE metrics SET value = 22 WHERE time IN ('2023-07-01T00:00:00Z', '2023-07-01T02:00:00Z');),
qq(BEGIN
message: transactional: 1 prefix: ::timescaledb-decompression-start, sz: 0 content:
table _timescaledb_internal.compress_hyper_2_3_chunk: DELETE: (no-tuple-data)
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 18:00:00-07' device_id[bigint]:1 value[double precision]:1
message: transactional: 1 prefix: ::timescaledb-decompression-end, sz: 0 content:
table _timescaledb_internal._hyper_1_1_chunk: UPDATE: old-key: "time"[timestamp with time zone]:'2023-06-30 19:00:00-07' device_id[bigint]:1 value[double precision]:1 new-tuple: "time"[timestamp with time zone]:'2023-06-30 19:00:00-07' device_id[bigint]:1 value[double precision]:22
table _timescaledb_internal._hyper_1_1_chunk: UPDATE: old-key: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1 new-tuple: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:22
COMMIT),);
query_generates_wal(
"compress chunk after update",
qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq(BEGIN
message: transactional: 1 prefix: ::timescaledb-compression-start, sz: 0 content:
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:3 dropped[boolean]:false status[integer]:1 osm_chunk[boolean]:false
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 18:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 19:00:00-07' device_id[bigint]:1 value[double precision]:22
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:22
table _timescaledb_internal.compress_hyper_2_3_chunk: INSERT: _ts_meta_count[integer]:3 _ts_meta_sequence_num[integer]:10 device_id[bigint]:1 _ts_meta_min_1[timestamp with time zone]:'2023-06-30 17:00:00-07' _ts_meta_max_1[timestamp with time zone]:'2023-06-30 19:00:00-07' "time"[_timescaledb_internal.compressed_data]:'BAAAAqJiD0OIAAAAAADWk6QAAAAAAwAAAAMAAAAAAAAB7gAFRMDEOIAAAAVEvxcRN/8AAAAAAAAAAA==' value[_timescaledb_internal.compressed_data]:'AwBANgAAAAAAAAAAAAMAAAABAAAAAAAAAAEAAAAAAAAABwAAAAMAAAABAAAAAAAAAAEAAAAAAAAABwAAAAESAAAAAAAAEEEAAAADAAAAAQAAAAAAAAAEAAAAAAAADu4AAAABKgAAA/4/+OAb'
message: transactional: 1 prefix: ::timescaledb-compression-end, sz: 0 content:
COMMIT)
);
query_generates_wal(
"insert into uncompressed chunk 3",
qq(INSERT INTO metrics VALUES ('2023-07-01T03:00:00Z', 1, 1.0);),
qq(BEGIN
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 20:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:3 dropped[boolean]:false status[integer]:9 osm_chunk[boolean]:false
COMMIT),);
query_generates_wal(
"delete row in compressed chunk",
qq(DELETE FROM metrics WHERE time IN ('2023-07-01T00:00:00Z', '2023-07-01T03:00:00Z');),
qq(BEGIN
message: transactional: 1 prefix: ::timescaledb-decompression-start, sz: 0 content:
table _timescaledb_internal.compress_hyper_2_3_chunk: DELETE: (no-tuple-data)
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:22
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 18:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 19:00:00-07' device_id[bigint]:1 value[double precision]:22
message: transactional: 1 prefix: ::timescaledb-decompression-end, sz: 0 content:
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 20:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:22
COMMIT)
);
# switch to PK on time for insert with PK decompression test
run_queries(
qq/
SELECT decompress_chunk('_timescaledb_internal._hyper_1_1_chunk');
DELETE FROM metrics;
ALTER TABLE metrics SET (timescaledb.compress = false);
ALTER TABLE metrics REPLICA IDENTITY DEFAULT;
ALTER TABLE metrics ADD PRIMARY KEY (time);
ALTER TABLE metrics SET (timescaledb.compress = true, timescaledb.compress_orderby = 'time', timescaledb.compress_segmentby = 'device_id');
INSERT INTO metrics VALUES('2023-07-01T00:00:00Z', 1, 1.0), ('2023-07-01T12:00:00Z', 1, 2.0);
SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk');
/
);
discard_wal();
query_generates_wal(
"insert into compressed chunk with pk forces decompression",
qq/INSERT INTO metrics VALUES ('2023-07-01 01:00:00', 1, 5555);/,
qq/BEGIN
message: transactional: 1 prefix: ::timescaledb-decompression-start, sz: 0 content:
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-07-01 05:00:00-07' device_id[bigint]:1 value[double precision]:2
table _timescaledb_internal.compress_hyper_3_4_chunk: DELETE: (no-tuple-data)
message: transactional: 1 prefix: ::timescaledb-decompression-end, sz: 0 content:
table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-07-01 01:00:00-07' device_id[bigint]:1 value[double precision]:5555
table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:4 dropped[boolean]:false status[integer]:9 osm_chunk[boolean]:false
COMMIT/
); );
is($result, qq(73|35353535), '11th entry is an insert message');
# Disable marker generation through GUC # Disable marker generation through GUC
$publisher->append_conf('postgresql.conf', $db->append_conf('postgresql.conf',
'timescaledb.enable_decompression_logrep_markers=false'); 'timescaledb.enable_compression_wal_markers=false');
$publisher->reload(); $db->reload();
# Compress chunks and consume replication stream explicitly query_generates_wal(
$publisher->safe_psql('postgres', "compress chunk after disabling markers",
"CALL recompress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE)" qq(SELECT compress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
); qq(BEGIN
$publisher->safe_psql( table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:4 dropped[boolean]:false status[integer]:1 osm_chunk[boolean]:false
'postgres', table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07'
qq( table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-07-01 05:00:00-07'
SELECT pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, table _timescaledb_internal._hyper_1_1_chunk: DELETE: "time"[timestamp with time zone]:'2023-07-01 01:00:00-07'
'proto_version', '1', table _timescaledb_internal.compress_hyper_3_4_chunk: INSERT: _ts_meta_count[integer]:3 _ts_meta_sequence_num[integer]:10 device_id[bigint]:1 _ts_meta_min_1[timestamp with time zone]:'2023-06-30 17:00:00-07' _ts_meta_max_1[timestamp with time zone]:'2023-07-01 05:00:00-07' "time"[_timescaledb_internal.compressed_data]:'BAAAAqJqcQfwAAAAAANaTpAAAAAAAwAAAAMAAAAAAAAO7gAFRMDEOIAAAAVEs1r+P/8AAAAGtJ0f/w==' value[_timescaledb_internal.compressed_data]:'AwBAAAAAAAAAAAAAAAMAAAABAAAAAAAAAAEAAAAAAAAABwAAAAMAAAABAAAAAAAAAAEAAAAAAAAABwAAAAESAAAAAAAAgEIAAAADAAAAAQAAAAAAAAAFAAAAAAAAQuoAAAABMQABa2f9Fs//'
'publication_names', 'tap_pub', COMMIT)
'messages', 'true'
);
)
); );
# Create a new entry which forces a decompression to happen query_generates_wal(
$publisher->safe_psql('postgres', "decompress chunk after disabling markers",
"INSERT INTO test VALUES ('2023-01-01 00:11:00', 5555)"); qq(SELECT decompress_chunk('_timescaledb_internal._hyper_1_1_chunk'::regclass, TRUE);),
qq(BEGIN
# Retrieve the replication log messages table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-06-30 17:00:00-07' device_id[bigint]:1 value[double precision]:1
$result = $publisher->safe_psql( table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-07-01 01:00:00-07' device_id[bigint]:1 value[double precision]:5555
'postgres', table _timescaledb_internal._hyper_1_1_chunk: INSERT: "time"[timestamp with time zone]:'2023-07-01 05:00:00-07' device_id[bigint]:1 value[double precision]:2
qq( table _timescaledb_catalog.compression_chunk_size: DELETE: chunk_id[integer]:1
SELECT get_byte(data, 0) table _timescaledb_catalog.chunk: UPDATE: id[integer]:1 hypertable_id[integer]:1 schema_name[name]:'_timescaledb_internal' table_name[name]:'_hyper_1_1_chunk' compressed_chunk_id[integer]:null dropped[boolean]:false status[integer]:0 osm_chunk[boolean]:false
FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, table _timescaledb_catalog.compression_settings: DELETE: relid[regclass]:'_timescaledb_internal.compress_hyper_3_4_chunk'
'proto_version', '1', table _timescaledb_catalog.chunk: DELETE: id[integer]:4
'publication_names', 'tap_pub', COMMIT)
'messages', 'true'
);
)
); );
# Test: BEGIN, RELATION, ... INSERT (decompression inserts x7) ..., INSERT, COMMIT # re-enable background jobs
is( $result, run_queries(
qq(66 qq/
82 SELECT public.alter_job(id::integer, scheduled=>true) FROM _timescaledb_config.bgw_job;
73 /
73
73
73
73
73
73
73
67),
'messages on slot meet expectation <<BEGIN, RELATION, ... INSERT (decompression inserts x7) ..., INSERT, COMMIT>>'
); );
pass();
done_testing(); done_testing();