Add existence check to CAgg catalog find function

The function ts_continuous_agg_find_by_mat_hypertable_id is used to read
the data about a CAgg from the catalog. If the CAgg for a given
mat_hypertable_id is not found, the function returns NULL. Therefore,
most code paths performed a NULL check and did some error handling
afterward.  This PR moves the duplicated error handling into the
function.
This commit is contained in:
Jan Nidzwetzki 2024-02-12 17:04:38 +01:00 committed by Jan Nidzwetzki
parent 4f912f77ca
commit 550ba17539
7 changed files with 18 additions and 25 deletions

View File

@ -595,7 +595,7 @@ ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id)
/* Find a continuous aggregate by the materialized hypertable id */
ContinuousAgg *
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id, bool missing_ok)
{
ContinuousAgg *ca = NULL;
ScanIterator iterator =
@ -619,6 +619,13 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)
}
ts_scan_iterator_close(&iterator);
if (ca == NULL && !missing_ok)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
}
return ca;
}

View File

@ -159,7 +159,7 @@ extern TSDLLEXPORT Oid ts_cagg_permissions_check(Oid cagg_oid, Oid userid);
extern TSDLLEXPORT CaggsInfo ts_continuous_agg_get_all_caggs_info(int32 raw_hypertable_id);
extern TSDLLEXPORT ContinuousAgg *
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id);
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id, bool missing_ok);
extern TSDLLEXPORT void ts_materialization_invalidation_log_delete_inner(int32 mat_hypertable_id);

View File

@ -191,12 +191,7 @@ ts_continuous_agg_watermark(PG_FUNCTION_ARGS)
MemoryContextDelete(cagg_watermark_cache->mctx);
}
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id, false);
/*
* Preemptive permission check to ensure the function complains about lack
@ -264,12 +259,7 @@ ts_continuous_agg_watermark_materialized(PG_FUNCTION_ARGS)
Hypertable *ht;
int64 watermark;
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id);
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_hypertable_id)));
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_hypertable_id, false);
/*
* Preemptive permission check to ensure the function complains about lack
@ -411,12 +401,7 @@ TSDLLEXPORT void
ts_cagg_watermark_update(Hypertable *mat_ht, int64 watermark, bool watermark_isnull,
bool force_update)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id);
if (NULL == cagg)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid materialized hypertable ID: %d", mat_ht->fd.id)));
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_ht->fd.id, false);
/* If we have a real-time CAgg, it uses a watermark function. So, we have to invalidate the rel
* cache to force a replanning of prepared statements. See cagg_watermark_update_internal for

View File

@ -348,7 +348,7 @@ policy_retention_read_and_validate_config(Jsonb *config, PolicyRetentionData *po
/* We need to do a reverse lookup here since the given hypertable might be
a materialized hypertable, and thus need to call drop_chunks on the
continuous aggregate instead. */
cagg = ts_continuous_agg_find_by_mat_hypertable_id(hypertable->fd.id);
cagg = ts_continuous_agg_find_by_mat_hypertable_id(hypertable->fd.id, true);
if (cagg)
{
object_relid = ts_get_relation_relid(NameStr(cagg->data.user_view_schema),
@ -420,7 +420,7 @@ policy_refresh_cagg_read_and_validate_config(Jsonb *config, PolicyContinuousAggD
policy_data->refresh_window.type = dim_type;
policy_data->refresh_window.start = refresh_start;
policy_data->refresh_window.end = refresh_end;
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id);
policy_data->cagg = ts_continuous_agg_find_by_mat_hypertable_id(materialization_id, false);
policy_data->start_is_null = start_isnull;
policy_data->end_is_null = end_isnull;
}

View File

@ -273,7 +273,7 @@ get_hypertable_or_cagg_name(Hypertable *ht, Name objname)
namestrcpy(objname, NameStr(ht->fd.table_name));
else if (status == HypertableIsMaterialization)
{
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id);
ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id, false);
namestrcpy(objname, NameStr(cagg->data.user_view_name));
}
else

View File

@ -798,7 +798,8 @@ cagg_validate_query(const Query *query, const bool finalized, const char *cagg_s
if (status == HypertableIsMaterialization ||
status == HypertableIsMaterializationAndRaw)
{
const ContinuousAgg *cagg = ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id);
const ContinuousAgg *cagg =
ts_continuous_agg_find_by_mat_hypertable_id(ht->fd.id, false);
Assert(cagg != NULL);
ts_cache_release(hcache);

View File

@ -848,7 +848,7 @@ continuous_agg_refresh_internal(const ContinuousAgg *cagg,
/* Commit and Start a new transaction */
SPI_commit_and_chain();
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id);
cagg = ts_continuous_agg_find_by_mat_hypertable_id(mat_id, false);
if (!process_cagg_invalidations_and_refresh(cagg, &refresh_window, callctx, INVALID_CHUNK_ID))
emit_up_to_date_notice(cagg, callctx);