Improve locking when processing invalidations

Invalidation processing during refreshing of a continuous aggregate is
now better protected against concurrent refreshes by taking the lock
on the materialized hypertable before invalidation processing.

Since invalidation processing is now split across two transactions,
the first one processing the hypertable invalidation log and the
second one processing the continuous aggregate invalidation lock, they
are now separately protected by serializing around the invalidation
threshold lock and the materialized hypertable lock, respectively.
This commit is contained in:
Erik Nordström 2020-08-07 16:39:15 +02:00 committed by Erik Nordström
parent 21d033f76a
commit c4ca66b76c
2 changed files with 21 additions and 22 deletions

View File

@ -414,10 +414,11 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state,
{
int32 hyper_id = state->cagg.data.raw_hypertable_id;
List *cagg_ids = get_cagg_ids(hyper_id);
int32 last_cagg_hyper_id = llast_int(cagg_ids);
int32 last_cagg_hyper_id;
ListCell *lc;
Assert(list_length(cagg_ids) > 0);
last_cagg_hyper_id = llast_int(cagg_ids);
/* We use a per-tuple memory context in the scan loop since we could be
* processing a lot of invalidations (basically an unbounded
@ -449,6 +450,7 @@ move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state,
oldmctx = MemoryContextSwitchTo(state->per_tuple_mctx);
ti = ts_scan_iterator_tuple_info(&iterator);
invalidation_entry_set_from_hyper_invalidation(&logentry, ti, cagg_hyper_id);
if (!IS_VALID_INVALIDATION(&mergedentry))
@ -575,10 +577,8 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
invalidation_entry_reset(&mergedentry);
cagg_invalidations_scan_by_hypertable_init(&iterator, cagg_hyper_id, RowExclusiveLock);
iterator.ctx.snapshot = state->snapshot;
/* Must use an up-to-date snapshot to see inserts done after processing
* the hypertable invalidation log */
iterator.ctx.snapshot = RegisterSnapshot(GetLatestSnapshot());
MemoryContextReset(state->per_tuple_mctx);
/* Process all invalidations for the continuous aggregate */
@ -618,7 +618,6 @@ clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
cut_cagg_invalidation(state, refresh_window, &mergedentry);
ts_scan_iterator_close(&iterator);
UnregisterSnapshot(iterator.ctx.snapshot);
}
static void

View File

@ -194,21 +194,6 @@ continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg
refresh->refresh_window = *refresh_window;
refresh->partial_view.schema = &refresh->cagg.data.partial_view_schema;
refresh->partial_view.name = &refresh->cagg.data.partial_view_name;
/* Lock the continuous aggregate's materialized hypertable to protect
* against concurrent refreshes. Only reads will be allowed. This is a
* heavy lock that serializes all refreshes. We might want to consider
* relaxing this in the future, e.g., we'd like to at least allow
* concurrent refreshes that don't have overlapping refresh windows.
*
* Concurrent refreshes on the same continuous aggregate could be achieved
* if we protect the aggregate with a UNIQUE constraint on the GROUP BY
* columns. This would allow concurrent refreshes, but overlapping ones
* might fail with, e.g., unique violation errors. Those could be
* captured, however, and ignored when we know it means someone else just
* did the same work.
*/
LockRelationOid(refresh->cagg_ht->main_table_relid, ExclusiveLock);
}
/*
@ -394,10 +379,14 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
* invalidation log. Doing the threshold and copying as part of the first
* transaction ensures that the threshold and new invalidations will be
* visible as soon as possible to concurrent refreshes and that we keep
* locks for only a short period.
* locks for only a short period. Note that the first transaction
* serializes around the threshold table lock, which protects both the
* threshold and the invalidation processing against concurrent refreshes.
*
* The second transaction processes the cagg invalidation log and then
* performs the actual refresh (materialization of data).
* performs the actual refresh (materialization of data). This transaction
* serializes around a lock on the materialized hypertable for the
* continuous aggregate that gets refreshed.
*/
LockRelationOid(catalog_get_table_id(catalog, CONTINUOUS_AGGS_INVALIDATION_THRESHOLD),
AccessExclusiveLock);
@ -410,6 +399,17 @@ continuous_agg_refresh(PG_FUNCTION_ARGS)
CommitTransactionCommand();
StartTransactionCommand();
cagg = ts_continuous_agg_find_by_relid(cagg_relid);
cagg_ht = ts_hypertable_get_by_id(cagg->data.mat_hypertable_id);
/* Lock the continuous aggregate's materialized hypertable to protect
* against concurrent refreshes. Only concurrent reads will be
* allowed. This is a heavy lock that serializes all refreshes on the same
* continuous aggregate. We might want to consider relaxing this in the
* future, e.g., we'd like to at least allow concurrent refreshes on the
* same continuous aggregate when they don't have overlapping refresh
* windows.
*/
LockRelationOid(cagg_ht->main_table_relid, ExclusiveLock);
refresh_window = compute_bucketed_refresh_window(&refresh_window, cagg->data.bucket_width);