mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-17 02:53:51 +08:00
Mark cagg_watermark as PARALLEL RESTRICTED
This patch marks the function cagg_watermark as PARALLEL RESTRICTED. It partially reverts the change of c0f2ed18095f21ac737f96fe93e4035dbfeeaf2c. The reason is as follows: for transaction isolation levels < REPEATABLE READ it can not be ensured that parallel worker reads the same watermark (e.g., using read committed isolation level: worker A reads the watermark, the CAGG is refreshed and the watermark changes, worker B reads the newer watermark). The different views on the CAGG can cause unexpected results and crashes (e.g., the chunk exclusion excludes different chunks in worker A and in worker B). In addition, a correct snapshot is used when the watermark is read from the CAGG and a TAP test is added, which detects inconsistent watermark reads. Co-authored-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Co-authored-by: Zoltan Haindrich <zoltan@timescale.com>
This commit is contained in:
parent
a22e732c02
commit
81e2f35d4b
.unreleased
sql
src/ts_catalog
tsl/test
1
.unreleased/bugfix_5804
Normal file
1
.unreleased/bugfix_5804
Normal file
@ -0,0 +1 @@
|
||||
Fixes: #5804 Mark cagg_watermark function as PARALLEL RESTRICTED
|
@ -39,7 +39,7 @@ CREATE OR REPLACE FUNCTION _timescaledb_internal.time_to_internal(time_val ANYEL
|
||||
RETURNS BIGINT AS '@MODULE_PATHNAME@', 'ts_time_to_internal' LANGUAGE C VOLATILE STRICT;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark(hypertable_id INTEGER)
|
||||
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL SAFE;
|
||||
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark' LANGUAGE C STABLE STRICT PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION _timescaledb_internal.cagg_watermark_materialized(hypertable_id INTEGER)
|
||||
RETURNS INT8 AS '@MODULE_PATHNAME@', 'ts_continuous_agg_watermark_materialized' LANGUAGE C STABLE STRICT PARALLEL SAFE;
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <fmgr.h>
|
||||
#include <miscadmin.h>
|
||||
#include <utils/acl.h>
|
||||
#include <utils/snapmgr.h>
|
||||
|
||||
#include "ts_catalog/continuous_agg.h"
|
||||
#include "ts_catalog/continuous_aggs_watermark.h"
|
||||
@ -88,6 +89,16 @@ cagg_watermark_get(Hypertable *mat_ht)
|
||||
ScanIterator iterator =
|
||||
ts_scan_iterator_create(CONTINUOUS_AGGS_WATERMARK, AccessShareLock, CurrentMemoryContext);
|
||||
|
||||
/*
|
||||
* The watermark of a CAGG has to be fetched by using the transaction snapshot.
|
||||
*
|
||||
* By default, the ts_scanner uses the SnapshotSelf to perform a scan. However, reading the
|
||||
* watermark must be done using the transaction snapshot in order to ensure that the view on the
|
||||
* watermark and the materialized part of the CAGG match.
|
||||
*/
|
||||
iterator.ctx.snapshot = GetTransactionSnapshot();
|
||||
Assert(iterator.ctx.snapshot != NULL);
|
||||
|
||||
cagg_watermark_init_scan_by_mat_hypertable_id(&iterator, mat_ht->fd.id);
|
||||
|
||||
ts_scanner_foreach(&iterator)
|
||||
@ -105,6 +116,13 @@ cagg_watermark_get(Hypertable *mat_ht)
|
||||
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("watermark not defined for continuous aggregate: %d", mat_ht->fd.id)));
|
||||
|
||||
/* Log the read watermark, needed for MVCC tap tests */
|
||||
ereport(DEBUG5,
|
||||
(errcode(ERRCODE_SUCCESSFUL_COMPLETION),
|
||||
errmsg("watermark for continuous aggregate, '%d' is: " INT64_FORMAT,
|
||||
mat_ht->fd.id,
|
||||
DatumGetInt64(watermark))));
|
||||
|
||||
return DatumGetInt64(watermark);
|
||||
}
|
||||
|
||||
|
@ -2315,34 +2315,28 @@ SET parallel_setup_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
-- Parallel planning
|
||||
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Merge Append
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Gather Merge
|
||||
Workers Planned: 2
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: (sum(conditions.temperature)) DESC
|
||||
-> Finalize HashAggregate
|
||||
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
|
||||
-> Gather
|
||||
Workers Planned: 4
|
||||
-> Partial HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Parallel Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(27 rows)
|
||||
-> HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(21 rows)
|
||||
|
||||
|
@ -2315,34 +2315,28 @@ SET parallel_setup_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
-- Parallel planning
|
||||
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Merge Append
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Gather Merge
|
||||
Workers Planned: 2
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: (sum(conditions.temperature)) DESC
|
||||
-> Finalize HashAggregate
|
||||
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
|
||||
-> Gather
|
||||
Workers Planned: 4
|
||||
-> Partial HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Parallel Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(27 rows)
|
||||
-> HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(21 rows)
|
||||
|
||||
|
@ -2314,34 +2314,28 @@ SET parallel_setup_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
-- Parallel planning
|
||||
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Merge Append
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Gather Merge
|
||||
Workers Planned: 2
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: (sum(conditions.temperature)) DESC
|
||||
-> Finalize HashAggregate
|
||||
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
|
||||
-> Gather
|
||||
Workers Planned: 4
|
||||
-> Partial HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Parallel Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(27 rows)
|
||||
-> HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(21 rows)
|
||||
|
||||
|
@ -2316,35 +2316,29 @@ SET parallel_setup_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
-- Parallel planning
|
||||
EXPLAIN (COSTS OFF, TIMING OFF) SELECT * FROM conditions_daily WHERE time_bucket >= '2023-07-01';
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
Merge Append
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Gather Merge
|
||||
Workers Planned: 2
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Parallel Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Parallel Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Parallel Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: _materialized_hypertable_64.sum DESC
|
||||
-> Custom Scan (ChunkAppend) on _materialized_hypertable_64
|
||||
Chunks excluded during startup: 0
|
||||
-> Index Scan using _hyper_64_185_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_185_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_189_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_189_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Index Scan using _hyper_64_190_chunk__materialized_hypertable_64_time_bucket_idx on _hyper_64_190_chunk
|
||||
Index Cond: ((time_bucket < COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (time_bucket >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
-> Sort
|
||||
Sort Key: (sum(conditions.temperature)) DESC
|
||||
-> Finalize HashAggregate
|
||||
Group Key: (time_bucket('@ 1 day'::interval, conditions.timec))
|
||||
-> Gather
|
||||
Workers Planned: 4
|
||||
-> Partial HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Result
|
||||
-> Parallel Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Parallel Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(28 rows)
|
||||
-> HashAggregate
|
||||
Group Key: time_bucket('@ 1 day'::interval, conditions.timec)
|
||||
-> Result
|
||||
-> Custom Scan (ChunkAppend) on conditions
|
||||
Chunks excluded during startup: 26
|
||||
-> Index Scan Backward using _hyper_63_184_chunk_conditions_timec_idx on _hyper_63_184_chunk
|
||||
Index Cond: ((timec >= COALESCE(_timescaledb_internal.to_timestamp(_timescaledb_internal.cagg_watermark(64)), '-infinity'::timestamp with time zone)) AND (timec >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone))
|
||||
Filter: (time_bucket('@ 1 day'::interval, timec) >= 'Sat Jul 01 00:00:00 2023 PDT'::timestamp with time zone)
|
||||
(22 rows)
|
||||
|
||||
|
179
tsl/test/t/008_mvcc_cagg.pl
Normal file
179
tsl/test/t/008_mvcc_cagg.pl
Normal file
@ -0,0 +1,179 @@
|
||||
# This file and its contents are licensed under the Timescale License.
|
||||
# Please see the included NOTICE for copyright information and
|
||||
# LICENSE-TIMESCALE for a copy of the license.
|
||||
|
||||
# This TAP test tests the MVCC behavior of the watermark function of the CAGGs
|
||||
# It creates a hypertable, a CAGG, and a refresh policy. Afterward, it inserts
|
||||
# into the hypertable and checks that all workers of a parallel query see the same
|
||||
# watermark.
|
||||
|
||||
use strict;
|
||||
use warnings;
|
||||
use TimescaleNode;
|
||||
use Data::Dumper;
|
||||
use Test::More tests => 5;
|
||||
|
||||
# See debug output in 'tsl/test/tmp_check/log/regress_log_008_mvcc_cagg'
|
||||
my $debug = 0;
|
||||
|
||||
# Should be enabled as soon as _timescaledb_internal.cagg_watermark is declared as parallel safe
|
||||
my $check_for_parallel_plan = 0;
|
||||
|
||||
my $timescale_node = TimescaleNode->create('insert');
|
||||
|
||||
# Test 1 - Create table
|
||||
my $result = $timescale_node->safe_psql(
|
||||
'postgres', q{
|
||||
CREATE TABLE adapter_metrics (
|
||||
time timestamptz NOT NULL,
|
||||
adapter_token text NOT NULL,
|
||||
dyno text NOT NULL,
|
||||
queue text NOT NULL,
|
||||
value double precision NOT NULL);
|
||||
}
|
||||
);
|
||||
is($result, '', 'create table');
|
||||
|
||||
# Test 2 - Convert to hypertable
|
||||
$result = $timescale_node->safe_psql(
|
||||
'postgres', q{
|
||||
SELECT FROM create_hypertable('adapter_metrics','time', chunk_time_interval => INTERVAL '5 seconds');
|
||||
}
|
||||
);
|
||||
is($result, '', 'create hypertable');
|
||||
|
||||
# Test 3 - Create CAGG
|
||||
$result = $timescale_node->safe_psql(
|
||||
'postgres', q{
|
||||
CREATE MATERIALIZED VIEW adapter_metrics_rollup WITH (timescaledb.continuous, timescaledb.materialized_only=false) AS
|
||||
SELECT
|
||||
time_bucket('00:00:05'::interval, adapter_metrics.time) AS bucket,
|
||||
avg(adapter_metrics.value) AS avg_value,
|
||||
max(adapter_metrics.value) AS max_value,
|
||||
count(*) AS count
|
||||
FROM adapter_metrics
|
||||
GROUP BY time_bucket('00:00:05'::interval, adapter_metrics.time)
|
||||
WITH DATA;
|
||||
}
|
||||
);
|
||||
is($result, '', 'crete cagg');
|
||||
|
||||
# Test 4 - Create CAGG policy
|
||||
$result = $timescale_node->safe_psql(
|
||||
'postgres', q{
|
||||
SELECT add_continuous_aggregate_policy('adapter_metrics_rollup',
|
||||
start_offset => INTERVAL '20 seconds',
|
||||
end_offset => NULL,
|
||||
schedule_interval => INTERVAL '1 seconds');
|
||||
}
|
||||
);
|
||||
is($result, '1000', 'job id of the cagg');
|
||||
|
||||
# Test 5 - Test consistent CAGG watermarks
|
||||
for (my $iteration = 0; $iteration < 10; $iteration++)
|
||||
{
|
||||
# Insert data
|
||||
$timescale_node->safe_psql(
|
||||
'postgres', q{
|
||||
INSERT INTO adapter_metrics
|
||||
SELECT
|
||||
now(),
|
||||
random()::text AS dyno,
|
||||
random()::text AS queue,
|
||||
random()::text AS adapter_token,
|
||||
value
|
||||
FROM
|
||||
generate_series(1, 100, 1) AS g2(value);
|
||||
}
|
||||
);
|
||||
|
||||
# Perform selects while the CAGG update is running
|
||||
for (my $query = 0; $query < 10; $query++)
|
||||
{
|
||||
# Encourage the use of parallel workers
|
||||
my $sql = q{
|
||||
SET force_parallel_mode = 1;
|
||||
SET enable_bitmapscan = 0;
|
||||
SET parallel_setup_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
SET parallel_tuple_cost = 0;
|
||||
SET client_min_messages TO DEBUG5;
|
||||
SELECT * FROM adapter_metrics_rollup WHERE adapter_metrics_rollup.bucket > now() - INTERVAL '30 seconds';
|
||||
};
|
||||
my ($stdout, $stderr);
|
||||
|
||||
if ($debug == 1)
|
||||
{
|
||||
print "===== New query\n";
|
||||
}
|
||||
|
||||
# To be able to access stderr, use psql instead of safe_psql
|
||||
$timescale_node->psql(
|
||||
'postgres', $sql,
|
||||
stdout => \$stdout,
|
||||
stderr => \$stderr,
|
||||
on_error_die => 1,
|
||||
on_error_stop => 1);
|
||||
|
||||
# Ensure all parallel workers have seen the same watermark
|
||||
my @log_lines = split "\n", $stderr;
|
||||
my $seen_watermarks = 0;
|
||||
my $cagg_id = -1;
|
||||
my $watermark = -1;
|
||||
|
||||
# Check that all workers have seen the same watermark
|
||||
foreach (@log_lines)
|
||||
{
|
||||
if (/watermark for continuous aggregate .+'(\d+)' is: (\d+)/)
|
||||
{
|
||||
my $current_cagg = $1;
|
||||
my $current_watermark = $2;
|
||||
|
||||
if ($debug == 1)
|
||||
{
|
||||
print $_ . "\n";
|
||||
}
|
||||
|
||||
if ($watermark == -1)
|
||||
{
|
||||
$watermark = $current_watermark;
|
||||
$cagg_id = $current_cagg;
|
||||
}
|
||||
elsif ($watermark != $current_watermark)
|
||||
{
|
||||
diag(
|
||||
"Some workers have read the watermark $watermark, one worker read the watermark $current_watermark\n"
|
||||
);
|
||||
fail();
|
||||
BAIL_OUT();
|
||||
}
|
||||
elsif ($cagg_id != $current_cagg)
|
||||
{
|
||||
diag(
|
||||
"Got watermarks for differnt CAGGs, this test can not handle this ($cagg_id / $current_cagg)\n"
|
||||
);
|
||||
fail();
|
||||
BAIL_OUT();
|
||||
}
|
||||
|
||||
$seen_watermarks++;
|
||||
}
|
||||
}
|
||||
|
||||
if ($check_for_parallel_plan == 1 && $seen_watermarks < 2)
|
||||
{
|
||||
diag(
|
||||
"Got watermark only from one worker, was a parallel plan used?"
|
||||
);
|
||||
fail();
|
||||
BAIL_OUT();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# No errors before? Let test 5 pass
|
||||
pass();
|
||||
|
||||
done_testing();
|
||||
|
||||
1;
|
@ -1,7 +1,7 @@
|
||||
set(PROVE_TEST_FILES 001_simple_multinode.pl 003_connections_privs.pl)
|
||||
set(PROVE_DEBUG_TEST_FILES
|
||||
002_chunk_copy_move.pl 004_multinode_rdwr_1pc.pl 005_add_data_node.pl
|
||||
006_job_crash_log.pl 007_healthcheck.pl)
|
||||
006_job_crash_log.pl 007_healthcheck.pl 008_mvcc_cagg.pl)
|
||||
|
||||
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||
list(APPEND PROVE_TEST_FILES ${PROVE_DEBUG_TEST_FILES})
|
||||
|
Loading…
x
Reference in New Issue
Block a user