mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-16 18:43:18 +08:00
Fix lost concurrent CAgg updates
When two CAggs on the same hypertable are refreshed at the same type, we had a race condition on the invalidation threshold table occur. So far, the table has been locked with a non-self-conflicting lock. Therefore, both scanners ran at the same time, but only one was able to lock the threshold value with a proper tuple lock. The other scanner ignored this failing lock and just returned. Therefore, the field computed_invalidation_threshold was never populated and still contains 0. So, invalidation_threshold_set_or_get returns and refresh end interval of 0. As a consequence, the `if (refresh_window.start >= refresh_window.end)` branch in continuous_agg_refresh_internal could be taken and we return from the refresh without doing any work. This patch adds proper error reporting and also implements some retry logic to avoid these problems. A self-conficting lock is not used due to the problems discussed in #5809.
This commit is contained in:
parent
75caf5df70
commit
ac97c564e2
.unreleased
src
tsl
src/continuous_aggs
test/isolation
1
.unreleased/fix_6443
Normal file
1
.unreleased/fix_6443
Normal file
@ -0,0 +1 @@
|
|||||||
|
Fixes: #6443 Fix lost concurrent CAgg updates
|
@ -461,15 +461,26 @@ ts_scanner_scan(ScannerCtx *ctx)
|
|||||||
|
|
||||||
for (ts_scanner_start_scan(ctx); (tinfo = ts_scanner_next(ctx));)
|
for (ts_scanner_start_scan(ctx); (tinfo = ts_scanner_next(ctx));)
|
||||||
{
|
{
|
||||||
/* Call tuple_found handler. Abort the scan if the handler wants us to */
|
if (ctx->tuple_found != NULL)
|
||||||
if (ctx->tuple_found != NULL && ctx->tuple_found(tinfo, ctx->data) == SCAN_DONE)
|
|
||||||
{
|
{
|
||||||
if (!(ctx->flags & SCANNER_F_NOEND))
|
ScanTupleResult scan_result = ctx->tuple_found(tinfo, ctx->data);
|
||||||
ts_scanner_end_scan(ctx);
|
|
||||||
|
|
||||||
if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
|
/* Call tuple_found handler. Abort the scan if the handler wants us to */
|
||||||
ts_scanner_close(ctx);
|
if (scan_result == SCAN_DONE)
|
||||||
break;
|
{
|
||||||
|
if (!(ctx->flags & SCANNER_F_NOEND))
|
||||||
|
ts_scanner_end_scan(ctx);
|
||||||
|
|
||||||
|
if (!(ctx->flags & SCANNER_F_NOEND_AND_NOCLOSE))
|
||||||
|
ts_scanner_close(ctx);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
else if (scan_result == SCAN_RESCAN)
|
||||||
|
{
|
||||||
|
ctx->internal.tinfo.count = 0;
|
||||||
|
ts_scanner_rescan(ctx, NULL);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -49,7 +49,8 @@ typedef struct TupleInfo
|
|||||||
typedef enum ScanTupleResult
|
typedef enum ScanTupleResult
|
||||||
{
|
{
|
||||||
SCAN_DONE,
|
SCAN_DONE,
|
||||||
SCAN_CONTINUE
|
SCAN_CONTINUE,
|
||||||
|
SCAN_RESCAN
|
||||||
} ScanTupleResult;
|
} ScanTupleResult;
|
||||||
|
|
||||||
typedef enum ScanFilterResult
|
typedef enum ScanFilterResult
|
||||||
|
@ -21,6 +21,7 @@
|
|||||||
#include <time_utils.h>
|
#include <time_utils.h>
|
||||||
#include <time_bucket.h>
|
#include <time_bucket.h>
|
||||||
|
|
||||||
|
#include "debug_point.h"
|
||||||
#include "ts_catalog/continuous_agg.h"
|
#include "ts_catalog/continuous_agg.h"
|
||||||
#include "continuous_aggs/materialize.h"
|
#include "continuous_aggs/materialize.h"
|
||||||
#include "invalidation_threshold.h"
|
#include "invalidation_threshold.h"
|
||||||
@ -74,10 +75,23 @@ typedef struct InvalidationThresholdData
|
|||||||
static ScanTupleResult
|
static ScanTupleResult
|
||||||
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
|
invalidation_threshold_scan_update(TupleInfo *ti, void *const data)
|
||||||
{
|
{
|
||||||
|
DEBUG_WAITPOINT("invalidation_threshold_scan_update_enter");
|
||||||
|
|
||||||
InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;
|
InvalidationThresholdData *invthresh = (InvalidationThresholdData *) data;
|
||||||
|
|
||||||
|
/* If the tuple was modified concurrently, retry the operation */
|
||||||
|
if (ti->lockresult == TM_Updated)
|
||||||
|
return SCAN_RESCAN;
|
||||||
|
|
||||||
if (ti->lockresult != TM_Ok)
|
if (ti->lockresult != TM_Ok)
|
||||||
return SCAN_CONTINUE;
|
{
|
||||||
|
elog(ERROR,
|
||||||
|
"unable to lock invalidation threshold tuple for hypertable %d (lock result %d)",
|
||||||
|
invthresh->cagg->data.raw_hypertable_id,
|
||||||
|
ti->lockresult);
|
||||||
|
|
||||||
|
pg_unreachable();
|
||||||
|
}
|
||||||
|
|
||||||
bool isnull;
|
bool isnull;
|
||||||
Datum datum =
|
Datum datum =
|
||||||
|
38
tsl/test/isolation/expected/cagg_concurrent_invalidation.out
Normal file
38
tsl/test/isolation/expected/cagg_concurrent_invalidation.out
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
Parsed test spec with 3 sessions
|
||||||
|
|
||||||
|
starting permutation: s3_lock_invalidation s1_run_update s2_run_update s3_release_invalidation s3_check_watermarks
|
||||||
|
step s3_lock_invalidation:
|
||||||
|
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');
|
||||||
|
|
||||||
|
debug_waitpoint_enable
|
||||||
|
----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1_run_update:
|
||||||
|
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
|
||||||
|
<waiting ...>
|
||||||
|
step s2_run_update:
|
||||||
|
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
|
||||||
|
<waiting ...>
|
||||||
|
step s3_release_invalidation:
|
||||||
|
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');
|
||||||
|
|
||||||
|
debug_waitpoint_release
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1_run_update: <... completed>
|
||||||
|
step s2_run_update: <... completed>
|
||||||
|
step s3_check_watermarks:
|
||||||
|
SELECT _timescaledb_functions.to_timestamp(watermark)
|
||||||
|
FROM _timescaledb_catalog.continuous_aggs_watermark
|
||||||
|
ORDER BY mat_hypertable_id;
|
||||||
|
|
||||||
|
to_timestamp
|
||||||
|
----------------------------
|
||||||
|
Wed Jan 01 16:00:00 2020 PST
|
||||||
|
Wed Jan 01 16:00:00 2020 PST
|
||||||
|
(2 rows)
|
||||||
|
|
@ -23,9 +23,14 @@ endif()
|
|||||||
|
|
||||||
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
if(CMAKE_BUILD_TYPE MATCHES Debug)
|
||||||
list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG})
|
list(APPEND TEST_TEMPLATES_MODULE ${TEST_TEMPLATES_MODULE_DEBUG})
|
||||||
list(APPEND TEST_FILES compression_chunk_race.spec compression_freeze.spec
|
list(
|
||||||
compression_merge_race.spec
|
APPEND
|
||||||
decompression_chunk_and_parallel_query_wo_idx.spec)
|
TEST_FILES
|
||||||
|
cagg_concurrent_invalidation.spec
|
||||||
|
compression_chunk_race.spec
|
||||||
|
compression_freeze.spec
|
||||||
|
compression_merge_race.spec
|
||||||
|
decompression_chunk_and_parallel_query_wo_idx.spec)
|
||||||
if(PG_VERSION VERSION_GREATER_EQUAL "14.0")
|
if(PG_VERSION VERSION_GREATER_EQUAL "14.0")
|
||||||
list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec)
|
list(APPEND TEST_FILES freeze_chunk.spec compression_dml_iso.spec)
|
||||||
endif()
|
endif()
|
||||||
|
93
tsl/test/isolation/specs/cagg_concurrent_invalidation.spec
Normal file
93
tsl/test/isolation/specs/cagg_concurrent_invalidation.spec
Normal file
@ -0,0 +1,93 @@
|
|||||||
|
# This file and its contents are licensed under the Timescale License.
|
||||||
|
# Please see the included NOTICE for copyright information and
|
||||||
|
# LICENSE-TIMESCALE for a copy of the license.
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# Test concurrent CAgg refreshes and invalidation threshold updates. This isolation test
|
||||||
|
# checks that we don't skip CAgg updates when two sessions are trying to modify the
|
||||||
|
# invalidation threshold at the same time.
|
||||||
|
#
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
CREATE TABLE temperature (
|
||||||
|
time timestamptz NOT NULL,
|
||||||
|
value float
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_hypertable('temperature', 'time');
|
||||||
|
|
||||||
|
INSERT INTO temperature
|
||||||
|
SELECT time, ceil(random() * 100)::int
|
||||||
|
FROM generate_series('2000-01-01 0:00:00+0'::timestamptz,
|
||||||
|
'2000-01-01 23:59:59+0','1m') time;
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW cagg_1
|
||||||
|
WITH (timescaledb.continuous) AS
|
||||||
|
SELECT time_bucket('4 hour', time), avg(value)
|
||||||
|
FROM temperature
|
||||||
|
GROUP BY 1 ORDER BY 1
|
||||||
|
WITH NO DATA;
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW cagg_2
|
||||||
|
WITH (timescaledb.continuous) AS
|
||||||
|
SELECT time_bucket('4 hour', time), avg(value)
|
||||||
|
FROM temperature
|
||||||
|
GROUP BY 1 ORDER BY 1
|
||||||
|
WITH NO DATA;
|
||||||
|
}
|
||||||
|
|
||||||
|
# Refresh CAGGs in separate transactions
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
CALL refresh_continuous_aggregate('cagg_1', NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
CALL refresh_continuous_aggregate('cagg_2', NULL, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
# Add new data to hypertable. This time in the year 2020 instead of 2000 as we
|
||||||
|
# did for the setup of the CAgg.
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
INSERT INTO temperature
|
||||||
|
SELECT time, ceil(random() * 100)::int
|
||||||
|
FROM generate_series('2020-01-01 0:00:00+0'::timestamptz,
|
||||||
|
'2020-01-01 23:59:59+0','1m') time;
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown {
|
||||||
|
DROP TABLE temperature CASCADE;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "S1"
|
||||||
|
step "s1_run_update" {
|
||||||
|
CALL refresh_continuous_aggregate('cagg_1', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
|
||||||
|
}
|
||||||
|
|
||||||
|
session "S2"
|
||||||
|
step "s2_run_update" {
|
||||||
|
CALL refresh_continuous_aggregate('cagg_2', '2020-01-01 00:00:00', '2021-01-01 00:00:00');
|
||||||
|
}
|
||||||
|
|
||||||
|
session "S3"
|
||||||
|
step "s3_lock_invalidation" {
|
||||||
|
SELECT debug_waitpoint_enable('invalidation_threshold_scan_update_enter');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3_release_invalidation" {
|
||||||
|
SELECT debug_waitpoint_release('invalidation_threshold_scan_update_enter');
|
||||||
|
}
|
||||||
|
|
||||||
|
# Check that both CAggs have a watermark in 2020 after the updates are executed.
|
||||||
|
# mat_hypertable_id is not included in the query to make the test independent of the
|
||||||
|
# actual hypertable ids.
|
||||||
|
step "s3_check_watermarks" {
|
||||||
|
SELECT _timescaledb_functions.to_timestamp(watermark)
|
||||||
|
FROM _timescaledb_catalog.continuous_aggs_watermark
|
||||||
|
ORDER BY mat_hypertable_id;
|
||||||
|
}
|
||||||
|
|
||||||
|
permutation "s3_lock_invalidation" "s1_run_update" "s2_run_update" "s3_release_invalidation" "s3_check_watermarks"("s1_run_update", "s2_run_update")
|
Loading…
x
Reference in New Issue
Block a user