Set invalidation threshold during refresh

The invalidation threshold governs the window of data from the head of
a hypertable that shouldn't be subject to invalidations in order to
reduce write amplification during inserts on the hypertable.

When a continuous aggregate is refreshed, the invalidation threshold
must be moved forward (or initialized if it doesn't previously exist)
whenever the refresh window stretches beyond the current threshold.

Tests for setting the invalidation threshold are also added, including
new isolation tests for concurrency.
This commit is contained in:
Erik Nordström 2020-08-10 09:31:23 +02:00 committed by Erik Nordström
parent 80720206df
commit c01faa72f0
7 changed files with 647 additions and 232 deletions

View File

@ -62,7 +62,6 @@ static void lock_invalidation_threshold_hypertable_row(int32 raw_hypertable_id);
static void drain_invalidation_log(int32 raw_hypertable_id, List **invalidations_out);
static void insert_materialization_invalidation_logs(List *caggs, List *invalidations,
Relation rel);
static void invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold);
static Datum internal_to_time_value_or_infinite(int64 internal, Oid time_type,
bool *is_infinite_out);
static InternalTimeRange materialization_invalidation_log_get_range(int32 materialization_id,
@ -259,8 +258,8 @@ continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *op
{
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
invalidation_threshold_set(cagg_data.raw_hypertable_id,
materialization_invalidation_threshold);
continuous_agg_invalidation_threshold_set(cagg_data.raw_hypertable_id,
materialization_invalidation_threshold);
}
table_close(materialization_invalidation_log_table_relation, NoLock);
@ -1136,23 +1135,30 @@ continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable_id,
return;
}
typedef struct InvalidationThresholdData
{
int64 threshold;
bool was_updated;
} InvalidationThresholdData;
static ScanTupleResult
scan_update_invalidation_threshold(TupleInfo *ti, void *data)
{
int64 new_threshold = *(int64 *) data;
InvalidationThresholdData *invthresh = data;
bool should_free;
HeapTuple tuple = ts_scanner_fetch_heap_tuple(ti, false, &should_free);
Form_continuous_aggs_invalidation_threshold form =
(Form_continuous_aggs_invalidation_threshold) GETSTRUCT(tuple);
if (new_threshold > form->watermark)
if (invthresh->threshold > form->watermark)
{
HeapTuple new_tuple = heap_copytuple(tuple);
form = (Form_continuous_aggs_invalidation_threshold) GETSTRUCT(new_tuple);
form->watermark = new_threshold;
form->watermark = invthresh->threshold;
ts_catalog_update(ti->scanrel, new_tuple);
heap_freetuple(new_tuple);
invthresh->was_updated = true;
}
else
{
@ -1161,7 +1167,7 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data)
" " INT64_FORMAT,
form->hypertable_id,
form->watermark,
new_threshold);
invthresh->threshold);
}
if (should_free)
@ -1174,10 +1180,14 @@ scan_update_invalidation_threshold(TupleInfo *ti, void *data)
*refresh_lag etc. We update the raw hypertable's invalidation threshold
* only if this new value is greater than the existsing one.
*/
static void
invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold)
bool
continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold)
{
bool updated_threshold;
bool threshold_found;
InvalidationThresholdData data = {
.threshold = invalidation_threshold,
.was_updated = false,
};
ScanKeyData scankey[1];
ScanKeyInit(&scankey[0],
@ -1195,7 +1205,7 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold
* cause us to lose the updates. The AccessExclusiveLock ensures no one else can possibly be
* reading the threshold.
*/
updated_threshold =
threshold_found =
ts_catalog_scan_one(CONTINUOUS_AGGS_INVALIDATION_THRESHOLD /*=table*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_PKEY /*=indexid*/,
scankey /*=scankey*/,
@ -1203,9 +1213,9 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold
scan_update_invalidation_threshold /*=tuple_found*/,
AccessExclusiveLock /*=lockmode*/,
CONTINUOUS_AGGS_INVALIDATION_THRESHOLD_TABLE_NAME /*=table_name*/,
&invalidation_threshold /*=data*/);
&data /*=data*/);
if (!updated_threshold)
if (!threshold_found)
{
Catalog *catalog = ts_catalog_get();
/* NOTE: this function deliberately takes a stronger lock than RowExclusive, see the comment
@ -1225,7 +1235,10 @@ invalidation_threshold_set(int32 raw_hypertable_id, int64 invalidation_threshold
ts_catalog_insert_values(rel, desc, values, nulls);
table_close(rel, NoLock);
data.was_updated = true;
}
return data.was_updated;
}
static ScanTupleResult

View File

@ -55,5 +55,7 @@ void continuous_agg_update_materialization(SchemaAndName partial_view,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range,
int64 bucket_width);
bool continuous_agg_invalidation_threshold_set(int32 raw_hypertable_id,
int64 invalidation_threshold);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_MATERIALIZE_H */

View File

@ -6,6 +6,8 @@
#include <postgres.h>
#include <utils/lsyscache.h>
#include <utils/fmgrprotos.h>
#include <utils/snapmgr.h>
#include <access/xact.h>
#include <storage/lmgr.h>
#include <miscadmin.h>
#include <fmgr.h>
@ -312,6 +314,8 @@ get_time_value_from_arg(Datum arg, Oid argtype, Oid cagg_timetype)
return ts_time_value_to_internal(arg, argtype);
}
#define REFRESH_FUNCTION_NAME "refresh_continuous_aggregate()"
/*
* Refresh a continuous aggregate across the given window.
*/
@ -319,6 +323,7 @@ Datum
continuous_agg_refresh(PG_FUNCTION_ARGS)
{
Oid cagg_relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
Catalog *catalog = ts_catalog_get();
ContinuousAgg *cagg;
Hypertable *cagg_ht;
Dimension *time_dim;
@ -328,7 +333,16 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
.end = PG_INT64_MAX,
};
PreventCommandIfReadOnly("refresh_continuous_aggregate()");
PreventCommandIfReadOnly(REFRESH_FUNCTION_NAME);
/* Prevent running refresh if we're in a transaction block since a refresh
* can run two transactions and might take a long time to release locks if
* there's a lot to materialize. Strictly, it is optional to prohibit
* transaction blocks since there will be only one transaction if the
* invalidation threshold needs no update. However, materialization might
* still take a long time and it is probably best for conistency to always
* prevent transaction blocks. */
PreventInTransactionBlock(true, REFRESH_FUNCTION_NAME);
if (!OidIsValid(cagg_relid))
ereport(ERROR,
@ -372,6 +386,19 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
errmsg("invalid refresh window"),
errhint("The start of the window must be before the end.")));
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
if (continuous_agg_invalidation_threshold_set(cagg->data.raw_hypertable_id, refresh_window.end))
{
/* Start a new transaction if the threshold was set. Note that this
* invalidates previous memory allocations (and locks). */
PopActiveSnapshot();
CommitTransactionCommand();
StartTransactionCommand();
cagg = ts_continuous_agg_find_by_relid(cagg_relid);
}
continuous_agg_invalidation_process(cagg, &refresh_window);
continuous_agg_refresh_with_window(cagg, &refresh_window);

View File

@ -130,10 +130,54 @@ ORDER BY day DESC, device;
-----+--------+----------
(0 rows)
-- Must refresh with "legacy" functionality to move the invalidation
-- threshold, or no invalidations will be generated.
REFRESH MATERIALIZED VIEW cond_10;
REFRESH MATERIALIZED VIEW measure_10;
-- Must refresh to move the invalidation threshold, or no
-- invalidations will be generated. Initially, there is no threshold
-- set:
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
(0 rows)
-- Now refresh up to 50, and the threshold should be updated accordingly:
CALL refresh_continuous_aggregate('cond_10', 1, 50);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
1 | 50
(1 row)
-- Refreshing below the threshold does not move it:
CALL refresh_continuous_aggregate('cond_10', 20, 49);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
1 | 50
(1 row)
-- Refreshing measure_10 moves the threshold only for the other hypertable:
CALL refresh_continuous_aggregate('measure_10', 1, 30);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
1 | 50
2 | 30
(2 rows)
-- Refresh on the second continuous aggregate, cond_20, on the first
-- hypertable moves the same threshold as when refreshing cond_10:
CALL refresh_continuous_aggregate('cond_20', 60, 100);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
1 | 100
2 | 30
(2 rows)
-- There should be no invalidations initially:
SELECT hypertable_id AS hyper_id,
lowest_modified_value AS start,
@ -189,13 +233,21 @@ SELECT hypertable_id AS hyper_id,
1 | 10 | 19
1 | 60 | 70
2 | 20 | 20
2 | 30 | 80
(5 rows)
(4 rows)
-- First refresh a window where we don't have any invalidations. This
-- allows us to see only the copying of the invalidations to the per
-- cagg log without additional processing.
CALL refresh_continuous_aggregate('cond_10', 20, 60);
-- Invalidation threshold remains at 100:
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
hypertable_id | watermark
---------------+-----------
1 | 100
2 | 30
(2 rows)
-- Invalidations should be moved from the hypertable invalidation log
-- to the continuous aggregate log, but only for the hypertable that
-- the refreshed aggregate belongs to:
@ -207,8 +259,7 @@ SELECT hypertable_id AS hyper_id,
hyper_id | start | end
----------+-------+-----
2 | 20 | 20
2 | 30 | 80
(2 rows)
(1 row)
SELECT materialization_id AS cagg_id,
lowest_modified_value AS start,
@ -258,8 +309,7 @@ SELECT hypertable_id AS hyper_id,
1 | 30 | 59
1 | 60 | 90
2 | 20 | 20
2 | 30 | 80
(10 rows)
(9 rows)
-- But nothing has yet changed in the cagg invalidation log:
SELECT materialization_id AS cagg_id,
@ -289,8 +339,7 @@ SELECT hypertable_id AS hyper_id,
hyper_id | start | end
----------+-------+-----
2 | 20 | 20
2 | 30 | 80
(2 rows)
(1 row)
-- Only the cond_10 cagg should have its entries cut:
SELECT materialization_id AS cagg_id,

View File

@ -1,194 +1,393 @@
Parsed test spec with 5 sessions
Parsed test spec with 8 sessions
starting permutation: R1_refresh R1_commit S1_select R2_commit R3_commit R4_commit
starting permutation: R1_refresh S1_select R3_refresh S1_select L2_read_unlock_threshold_table L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
refresh_continuous_aggregate
step R1_commit:
COMMIT;
CALL refresh_continuous_aggregate('cond_10', 35, 62);
step S1_select:
SELECT day, avg_temp
FROM daily_temp
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
day avg_temp
bucket avg_temp
Thu Apr 30 17:00:00 2020 PDT18.4411764705882
Fri May 01 17:00:00 2020 PDT18.8541666666667
30 18.3
40 15.1
50 26.9
60 18.9
cagg_bucket_count
2
step R2_commit:
COMMIT;
4
hypertable threshold
step R3_commit:
COMMIT;
conditions 62
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
step R4_commit:
COMMIT;
step S1_select:
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
bucket avg_temp
30 18.3
40 15.1
50 26.9
60 18.9
70 24.6
80 23.6
90 21.3
cagg_bucket_count
7
hypertable threshold
conditions 97
step L2_read_unlock_threshold_table:
ROLLBACK;
step L3_unlock_cagg_table:
ROLLBACK;
step L1_unlock_threshold_table:
ROLLBACK;
starting permutation: R1_refresh R2_refresh R1_commit R2_commit S1_select R3_commit R4_commit
step R1_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
starting permutation: L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table
step L2_read_lock_threshold_table:
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
refresh_continuous_aggregate
step R2_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
<waiting ...>
step R1_commit:
COMMIT;
step R2_refresh: <... completed>
refresh_continuous_aggregate
step R2_commit:
COMMIT;
step L2_read_unlock_threshold_table:
ROLLBACK;
step R3_refresh: <... completed>
step S1_select:
SELECT day, avg_temp
FROM daily_temp
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
day avg_temp
bucket avg_temp
Thu Apr 30 17:00:00 2020 PDT18.4411764705882
Fri May 01 17:00:00 2020 PDT18.8541666666667
70 24.6
80 23.6
90 21.3
cagg_bucket_count
2
step R3_commit:
COMMIT;
3
hypertable threshold
step R4_commit:
COMMIT;
conditions 97
step L3_unlock_cagg_table:
ROLLBACK;
step L1_unlock_threshold_table:
ROLLBACK;
starting permutation: R1_refresh R2_refresh R2_commit R1_commit S1_select R3_commit R4_commit
starting permutation: R1_refresh L2_read_lock_threshold_table R3_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table
step R1_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
CALL refresh_continuous_aggregate('cond_10', 35, 62);
refresh_continuous_aggregate
step L2_read_lock_threshold_table:
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
step R2_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
<waiting ...>
step R2_refresh: <... completed>
ERROR: canceling statement due to lock timeout
step R2_commit:
COMMIT;
step R1_commit:
COMMIT;
step L2_read_unlock_threshold_table:
ROLLBACK;
step R3_refresh: <... completed>
step S1_select:
SELECT day, avg_temp
FROM daily_temp
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
day avg_temp
bucket avg_temp
Thu Apr 30 17:00:00 2020 PDT18.4411764705882
Fri May 01 17:00:00 2020 PDT18.8541666666667
30 18.3
40 15.1
50 26.9
60 18.9
70 24.6
80 23.6
90 21.3
cagg_bucket_count
2
step R3_commit:
COMMIT;
7
hypertable threshold
step R4_commit:
COMMIT;
conditions 97
step L3_unlock_cagg_table:
ROLLBACK;
step L1_unlock_threshold_table:
ROLLBACK;
starting permutation: R1_refresh R3_refresh R3_commit R1_commit S1_select R2_commit R4_commit
starting permutation: R3_refresh L2_read_lock_threshold_table R1_refresh L2_read_unlock_threshold_table S1_select L3_unlock_cagg_table L1_unlock_threshold_table
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
step L2_read_lock_threshold_table:
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
step R1_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
CALL refresh_continuous_aggregate('cond_10', 35, 62);
<waiting ...>
step L2_read_unlock_threshold_table:
ROLLBACK;
refresh_continuous_aggregate
step R1_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
bucket avg_temp
30 18.3
40 15.1
50 26.9
60 18.9
70 24.6
80 23.6
90 21.3
cagg_bucket_count
7
hypertable threshold
conditions 97
step L3_unlock_cagg_table:
ROLLBACK;
step L1_unlock_threshold_table:
ROLLBACK;
starting permutation: L3_lock_cagg_table R1_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table
step L3_lock_cagg_table:
SELECT lock_cagg('cond_10');
lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
<waiting ...>
step L3_unlock_cagg_table:
ROLLBACK;
step R1_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
bucket avg_temp
30 18.3
40 15.1
50 26.9
60 18.9
cagg_bucket_count
4
hypertable threshold
conditions 62
step L1_unlock_threshold_table:
ROLLBACK;
step L2_read_unlock_threshold_table:
ROLLBACK;
starting permutation: L3_lock_cagg_table R1_refresh R2_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table
step L3_lock_cagg_table:
SELECT lock_cagg('cond_10');
lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
<waiting ...>
step R2_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
<waiting ...>
step L3_unlock_cagg_table:
ROLLBACK;
step R1_refresh: <... completed>
step R2_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
bucket avg_temp
30 18.3
40 15.1
50 26.9
60 18.9
cagg_bucket_count
4
hypertable threshold
conditions 62
step L1_unlock_threshold_table:
ROLLBACK;
step L2_read_unlock_threshold_table:
ROLLBACK;
starting permutation: L3_lock_cagg_table R1_refresh R3_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table
step L3_lock_cagg_table:
SELECT lock_cagg('cond_10');
lock_cagg
step R1_refresh:
CALL refresh_continuous_aggregate('cond_10', 35, 62);
<waiting ...>
step R3_refresh:
CALL refresh_continuous_aggregate('cond_10', 71, 97);
<waiting ...>
step L3_unlock_cagg_table:
ROLLBACK;
step R1_refresh: <... completed>
step R3_refresh: <... completed>
step S1_select:
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
bucket avg_temp
30 18.3
40 15.1
50 26.9
60 18.9
70 24.6
80 23.6
90 21.3
cagg_bucket_count
7
hypertable threshold
conditions 97
step L1_unlock_threshold_table:
ROLLBACK;
step L2_read_unlock_threshold_table:
ROLLBACK;
starting permutation: L3_lock_cagg_table R3_refresh R4_refresh L3_unlock_cagg_table S1_select L1_unlock_threshold_table L2_read_unlock_threshold_table
step L3_lock_cagg_table:
SELECT lock_cagg('cond_10');
lock_cagg
step R3_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-08', '2020-05-10');
CALL refresh_continuous_aggregate('cond_10', 71, 97);
<waiting ...>
step R3_refresh: <... completed>
ERROR: canceling statement due to lock timeout
step R3_commit:
COMMIT;
step R1_commit:
COMMIT;
step S1_select:
SELECT day, avg_temp
FROM daily_temp
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
day avg_temp
Thu Apr 30 17:00:00 2020 PDT18.4411764705882
Fri May 01 17:00:00 2020 PDT18.8541666666667
cagg_bucket_count
2
step R2_commit:
COMMIT;
step R4_commit:
COMMIT;
starting permutation: R1_refresh R4_refresh R4_commit R1_commit S1_select R2_commit R3_commit
step R1_refresh:
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
refresh_continuous_aggregate
step R4_refresh:
SELECT refresh_continuous_aggregate('weekly_temp', '2020-05-01', '2020-05-10');
CALL refresh_continuous_aggregate('cond_20', 39, 84);
refresh_continuous_aggregate
step R4_commit:
COMMIT;
step R1_commit:
COMMIT;
step L3_unlock_cagg_table:
ROLLBACK;
step R3_refresh: <... completed>
step S1_select:
SELECT day, avg_temp
FROM daily_temp
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
day avg_temp
bucket avg_temp
Thu Apr 30 17:00:00 2020 PDT18.4411764705882
Fri May 01 17:00:00 2020 PDT18.8541666666667
70 24.6
80 23.6
90 21.3
cagg_bucket_count
2
step R2_commit:
COMMIT;
3
hypertable threshold
step R3_commit:
COMMIT;
conditions 97
step L1_unlock_threshold_table:
ROLLBACK;
step L2_read_unlock_threshold_table:
ROLLBACK;

View File

@ -7,35 +7,44 @@
#
# We define a function 'cagg_bucket_count' to get the number of
# buckets in a continuous aggregate. We use it to verify that there
# aren't any duplicate buckets inserted after concurrent
# refreshes. Duplicate buckets are possible since there is no unique
# constraint on the GROUP BY keys in the materialized hypertable.
# aren't any duplicate buckets/rows inserted into the materialization
# hypertable after concurrent refreshes. Duplicate buckets are
# possible since there is no unique constraint on the GROUP BY keys in
# the materialized hypertable.
#
setup
{
SELECT _timescaledb_internal.stop_background_workers();
CREATE TABLE conditions(time timestamptz, temp float);
SELECT create_hypertable('conditions', 'time');
CREATE TABLE conditions(time int, temp float);
SELECT create_hypertable('conditions', 'time', chunk_time_interval => 20);
INSERT INTO conditions
SELECT t, abs(timestamp_hash(t::timestamp))%40
FROM generate_series('2020-05-01', '2020-05-10', '10 minutes'::interval) t;
CREATE VIEW daily_temp
SELECT t, abs(timestamp_hash(to_timestamp(t)::timestamp))%40
FROM generate_series(1, 100, 1) t;
CREATE OR REPLACE FUNCTION cond_now()
RETURNS int LANGUAGE SQL STABLE AS
$$
SELECT coalesce(max(time), 0)
FROM conditions
$$;
SELECT set_integer_now_func('conditions', 'cond_now');
CREATE VIEW cond_10
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 day', time) AS day, avg(temp) AS avg_temp
SELECT time_bucket(10, time) AS bucket, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1;
CREATE VIEW weekly_temp
CREATE VIEW cond_20
WITH (timescaledb.continuous,
timescaledb.materialized_only=true)
AS
SELECT time_bucket('1 week', time) AS day, avg(temp) AS avg_temp
SELECT time_bucket(20, time) AS bucket, avg(temp) AS avg_temp
FROM conditions
GROUP BY 1;
CREATE OR REPLACE FUNCTION cagg_bucket_count(cagg regclass) RETURNS int
AS $$
CREATE OR REPLACE FUNCTION cagg_bucket_count(cagg regclass)
RETURNS int AS
$$
DECLARE
cagg_schema name;
cagg_name name;
@ -65,111 +74,202 @@ setup
RETURN result;
END
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION lock_cagg(cagg name) RETURNS void AS $$
DECLARE
mattable text;
BEGIN
SELECT format('%I.%I', user_view_schema, user_view_name)
FROM _timescaledb_catalog.continuous_agg
INTO mattable;
EXECUTE format('LOCK table %s IN EXCLUSIVE MODE', mattable);
END; $$ LANGUAGE plpgsql;
}
teardown {
DROP TABLE conditions CASCADE;
}
# Session to refresh the daily_temp continuous aggregate
# Session to refresh the cond_10 continuous aggregate
session "R1"
setup
{
BEGIN;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '500ms';
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "R1_refresh"
{
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
}
step "R1_commit"
{
COMMIT;
CALL refresh_continuous_aggregate('cond_10', 35, 62);
}
# Refresh that overlaps with R1
session "R2"
setup
{
BEGIN;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '500ms';
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "R2_refresh"
{
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-01', '2020-05-02');
}
step "R2_commit"
{
COMMIT;
CALL refresh_continuous_aggregate('cond_10', 35, 62);
}
# Refresh on same aggregate (daily_temp) that doesn't overlap with R1 and R2
# Refresh on same aggregate (cond_10) that doesn't overlap with R1 and R2
session "R3"
setup
{
BEGIN;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '500ms';
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "R3_refresh"
{
SELECT refresh_continuous_aggregate('daily_temp', '2020-05-08', '2020-05-10');
}
step "R3_commit"
{
COMMIT;
CALL refresh_continuous_aggregate('cond_10', 71, 97);
}
# Overlapping refresh on another continuous aggregate (weekly_temp)
# Overlapping refresh on another continuous aggregate (cond_20)
session "R4"
setup
{
BEGIN;
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '500ms';
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "R4_refresh"
{
SELECT refresh_continuous_aggregate('weekly_temp', '2020-05-01', '2020-05-10');
}
step "R4_commit"
{
COMMIT;
CALL refresh_continuous_aggregate('cond_20', 39, 84);
}
# Session to query
# Define a number of lock sessions to simulate concurrent refreshes
# by selectively grabbing the locks we use to handle concurrency.
# The "L1" session exclusively locks the invalidation threshold
# table. This simulates an ongoing update of the invalidation
# threshold, which has not yet finished.
session "L1"
setup
{
BEGIN;
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "L1_lock_threshold_table"
{
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS EXCLUSIVE MODE;
}
step "L1_unlock_threshold_table"
{
ROLLBACK;
}
# The "L2" session takes an access share lock on the invalidation
# threshold table. This simulates a reader, which has not yet finished
# (e.g., and insert into the hypertable, or a refresh that has not yet
# grabbed the exclusive lock).
session "L2"
setup
{
BEGIN;
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "L2_read_lock_threshold_table"
{
LOCK _timescaledb_catalog.continuous_aggs_invalidation_threshold
IN ACCESS SHARE MODE;
}
step "L2_read_unlock_threshold_table"
{
ROLLBACK;
}
# The "L3" session locks the cagg table. This simulates an ongoing
# refresh that has not yet completed and released the lock on the cagg
# materialization table.
#
session "L3"
setup
{
BEGIN;
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "L3_lock_cagg_table"
{
SELECT lock_cagg('cond_10');
}
step "L3_unlock_cagg_table"
{
ROLLBACK;
}
# Session to view the contents of a cagg after materialization. It
# also prints the bucket count (number of rows in the materialization
# hypertable) and the invalidation threshold. The bucket count should
# match the number of rows in the query if there are no duplicate
# buckets/rows.
session "S1"
setup
{
SET LOCAL lock_timeout = '500ms';
SET LOCAL deadlock_timeout = '100ms';
SET SESSION lock_timeout = '500ms';
SET SESSION deadlock_timeout = '500ms';
}
step "S1_select"
{
SELECT day, avg_temp
FROM daily_temp
SELECT bucket, avg_temp
FROM cond_10
ORDER BY 1;
SELECT * FROM cagg_bucket_count('daily_temp');
SELECT * FROM cagg_bucket_count('cond_10');
SELECT h.table_name AS hypertable, it.watermark AS threshold
FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold it,
_timescaledb_catalog.hypertable h
WHERE it.hypertable_id = h.id;
}
# Run single transaction refresh to get some reference output.
# The result of a query on the aggregate should always look like this example.
permutation "R1_refresh" "R1_commit" "S1_select" "R2_commit" "R3_commit" "R4_commit"
####################################################################
#
# Tests for concurrent updates to the invalidation threshold (first
# transaction of a refresh).
#
####################################################################
# Interleave two refreshes that are overlapping. Since we serialize
# refreshes, R2 should block until R1 commits.
permutation "R1_refresh" "R2_refresh" "R1_commit" "R2_commit" "S1_select" "R3_commit" "R4_commit"
# Run single transaction refresh to get some reference output. The
# result of a query on the aggregate should always look like this
# example (when refreshed with the same window).
permutation "R1_refresh" "S1_select" "R3_refresh" "S1_select" "L2_read_unlock_threshold_table" "L3_unlock_cagg_table" "L1_unlock_threshold_table"
# R2 starts after R1 but commits before. This should not work (lock timeout).
permutation "R1_refresh" "R2_refresh" "R2_commit" "R1_commit" "S1_select" "R3_commit" "R4_commit"
# A threshold reader (insert) should block a refresh if the threshold
# does not exist yet (insert of new threshold)
permutation "L2_read_lock_threshold_table" "R3_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table"
# R1 and R3 don't have overlapping refresh windows, but we serialize
# anyway, so not yet supported.
permutation "R1_refresh" "R3_refresh" "R3_commit" "R1_commit" "S1_select" "R2_commit" "R4_commit"
# A threshold reader (insert) should block a refresh if the threshold
# needs an update
permutation "R1_refresh" "L2_read_lock_threshold_table" "R3_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table"
# Concurrent refreshing across two different aggregates on the same
# hypertable should be OK:
permutation "R1_refresh" "R4_refresh" "R4_commit" "R1_commit" "S1_select" "R2_commit" "R3_commit"
# A threshold reader (insert) blocks a refresh even if the threshold
# doesn't need an update (could be improved)
permutation "R3_refresh" "L2_read_lock_threshold_table" "R1_refresh" "L2_read_unlock_threshold_table" "S1_select" "L3_unlock_cagg_table" "L1_unlock_threshold_table"
##################################################################
#
# Tests for concurrent refreshes of continuous aggregates (second
# transaction of a refresh).
#
##################################################################
# Interleave two refreshes that are overlapping (one simulated). Since
# we serialize refreshes, R1 should block until the lock is released
permutation "L3_lock_cagg_table" "R1_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"
# R1 and R2 queued to refresh, both should serialize
permutation "L3_lock_cagg_table" "R1_refresh" "R2_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"
# R1 and R3 don't have overlapping refresh windows, but should serialize
# anyway. This could potentially be optimized in the future.
permutation "L3_lock_cagg_table" "R1_refresh" "R3_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"
# Concurrent refreshing across two different aggregates on same
# hypertable does not block
permutation "L3_lock_cagg_table" "R3_refresh" "R4_refresh" "L3_unlock_cagg_table" "S1_select" "L1_unlock_threshold_table" "L2_read_unlock_threshold_table"

View File

@ -86,10 +86,32 @@ ORDER BY day DESC, device;
SELECT * FROM measure_10
ORDER BY day DESC, device;
-- Must refresh with "legacy" functionality to move the invalidation
-- threshold, or no invalidations will be generated.
REFRESH MATERIALIZED VIEW cond_10;
REFRESH MATERIALIZED VIEW measure_10;
-- Must refresh to move the invalidation threshold, or no
-- invalidations will be generated. Initially, there is no threshold
-- set:
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Now refresh up to 50, and the threshold should be updated accordingly:
CALL refresh_continuous_aggregate('cond_10', 1, 50);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Refreshing below the threshold does not move it:
CALL refresh_continuous_aggregate('cond_10', 20, 49);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Refreshing measure_10 moves the threshold only for the other hypertable:
CALL refresh_continuous_aggregate('measure_10', 1, 30);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Refresh on the second continuous aggregate, cond_20, on the first
-- hypertable moves the same threshold as when refreshing cond_10:
CALL refresh_continuous_aggregate('cond_20', 60, 100);
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- There should be no invalidations initially:
SELECT hypertable_id AS hyper_id,
@ -136,6 +158,9 @@ SELECT hypertable_id AS hyper_id,
-- allows us to see only the copying of the invalidations to the per
-- cagg log without additional processing.
CALL refresh_continuous_aggregate('cond_10', 20, 60);
-- Invalidation threshold remains at 100:
SELECT * FROM _timescaledb_catalog.continuous_aggs_invalidation_threshold
ORDER BY 1,2;
-- Invalidations should be moved from the hypertable invalidation log
-- to the continuous aggregate log, but only for the hypertable that