diff --git a/tsl/src/continuous_aggs/invalidation.c b/tsl/src/continuous_aggs/invalidation.c index 3631f3078..64d85ef96 100644 --- a/tsl/src/continuous_aggs/invalidation.c +++ b/tsl/src/continuous_aggs/invalidation.c @@ -414,10 +414,11 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state, { int32 hyper_id = state->cagg.data.raw_hypertable_id; List *cagg_ids = get_cagg_ids(hyper_id); - int32 last_cagg_hyper_id = llast_int(cagg_ids); + int32 last_cagg_hyper_id; ListCell *lc; Assert(list_length(cagg_ids) > 0); + last_cagg_hyper_id = llast_int(cagg_ids); /* We use a per-tuple memory context in the scan loop since we could be * processing a lot of invalidations (basically an unbounded @@ -449,6 +450,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state, oldmctx = MemoryContextSwitchTo(state->per_tuple_mctx); ti = ts_scan_iterator_tuple_info(&iterator); + invalidation_entry_set_from_hyper_invalidation(&logentry, ti, cagg_hyper_id); if (!IS_VALID_INVALIDATION(&mergedentry)) @@ -575,10 +577,8 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state, invalidation_entry_reset(&mergedentry); cagg_invalidations_scan_by_hypertable_init(&iterator, cagg_hyper_id, RowExclusiveLock); + iterator.ctx.snapshot = state->snapshot; - /* Must use an up-to-date snapshot to see inserts done after processing - * the hypertable invalidation log */ - iterator.ctx.snapshot = RegisterSnapshot(GetLatestSnapshot()); MemoryContextReset(state->per_tuple_mctx); /* Process all invalidations for the continuous aggregate */ @@ -618,7 +618,6 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state, cut_cagg_invalidation(state, refresh_window, &mergedentry); ts_scan_iterator_close(&iterator); - UnregisterSnapshot(iterator.ctx.snapshot); } static void diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index e52d22b45..5c5687969 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -194,21 +194,6 @@ continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg refresh->refresh_window = *refresh_window; refresh->partial_view.schema = &refresh->cagg.data.partial_view_schema; refresh->partial_view.name = &refresh->cagg.data.partial_view_name; - - /* Lock the continuous aggregate's materialized hypertable to protect - * against concurrent refreshes. Only reads will be allowed. This is a - * heavy lock that serializes all refreshes. We might want to consider - * relaxing this in the future, e.g., we'd like to at least allow - * concurrent refreshes that don't have overlapping refresh windows. - * - * Concurrent refreshes on the same continuous aggregate could be achieved - * if we protect the aggregate with a UNIQUE constraint on the GROUP BY - * columns. This would allow concurrent refreshes, but overlapping ones - * might fail with, e.g., unique violation errors. Those could be - * captured, however, and ignored when we know it means someone else just - * did the same work. - */ - LockRelationOid(refresh->cagg_ht->main_table_relid, ExclusiveLock); } /* @@ -394,10 +379,14 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) * invalidation log. Doing the threshold and copying as part of the first * transaction ensures that the threshold and new invalidations will be * visible as soon as possible to concurrent refreshes and that we keep - * locks for only a short period. + * locks for only a short period. Note that the first transaction + * serializes around the threshold table lock, which protects both the + * threshold and the invalidation processing against concurrent refreshes. * * The second transaction processes the cagg invalidation log and then - * performs the actual refresh (materialization of data). + * performs the actual refresh (materialization of data). This transaction + * serializes around a lock on the materialized hypertable for the + * continuous aggregate that gets refreshed. */ LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD), AccessExclusiveLock); @@ -410,6 +399,17 @@ continuous_agg_refresh(PG_FUNCTION_ARGS) CommitTransactionCommand(); StartTransactionCommand(); cagg = ts_continuous_agg_find_by_relid(cagg_relid); + cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id); + + /* Lock the continuous aggregate's materialized hypertable to protect + * against concurrent refreshes. Only concurrent reads will be + * allowed. This is a heavy lock that serializes all refreshes on the same + * continuous aggregate. We might want to consider relaxing this in the + * future, e.g., we'd like to at least allow concurrent refreshes on the + * same continuous aggregate when they don't have overlapping refresh + * windows. + */ + LockRelationOid(cagg_ht->main_table_relid, ExclusiveLock); refresh_window = compute_bucketed_refresh_window(&refresh_window, cagg->data.bucket_width);