diff --git a/src/cross_module_fn.c b/src/cross_module_fn.c index 78fac363f..1230ae506 100644 --- a/src/cross_module_fn.c +++ b/src/cross_module_fn.c @@ -250,9 +250,7 @@ continuous_agg_invalidate_mat_ht_all_default(const Hypertable *raw_ht, const Hyp static void continuous_agg_call_invalidation_trigger_default(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, HeapTuple chunk_newtuple, - bool update, - bool is_distributed_hypertable_trigger, - int32 parent_hypertable_id) + bool update) { error_no_default_fn_community(); pg_unreachable(); diff --git a/src/cross_module_fn.h b/src/cross_module_fn.h index 04f6d8f20..a2ef1075f 100644 --- a/src/cross_module_fn.h +++ b/src/cross_module_fn.h @@ -101,9 +101,7 @@ typedef struct CrossModuleFunctions PGFunction continuous_agg_invalidation_trigger; void (*continuous_agg_call_invalidation_trigger)(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, - HeapTuple chunk_newtuple, bool update, - bool is_distributed_hypertable_trigger, - int32 parent_hypertable_id); + HeapTuple chunk_newtuple, bool update); PGFunction continuous_agg_refresh; void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end); void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht, diff --git a/tsl/src/continuous_aggs/insert.c b/tsl/src/continuous_aggs/insert.c index cbe0b5286..63c74e4c8 100644 --- a/tsl/src/continuous_aggs/insert.c +++ b/tsl/src/continuous_aggs/insert.c @@ -61,11 +61,6 @@ typedef struct ContinuousAggsCacheInvalEntry { int32 hypertable_id; Oid hypertable_relid; - int32 entry_id; /* - * This is what actually gets written to the hypertable log. It can be the same - * as the hypertable_id for normal hypertables. In the distributed case it is - * the ID of the parent hypertable in the Access Node. - */ Dimension hypertable_open_dimension; Oid previous_chunk_relid; AttrNumber previous_chunk_open_dimension; @@ -83,7 +78,7 @@ static MemoryContext continuous_aggs_trigger_mctx = NULL; static int64 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc); static inline void cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, - int32 hypertable_id, int32 entry_id); + int32 hypertable_id); static inline void cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry, Oid chunk_reloid, Relation chunk_relation); static inline void update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval); @@ -145,8 +140,7 @@ tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc) } static inline void -cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id, - int32 entry_id) +cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id) { Cache *ht_cache = ts_hypertable_cache_pin(); /* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the @@ -160,7 +154,6 @@ cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hyperta } cache_entry->hypertable_id = hypertable_id; - cache_entry->entry_id = entry_id; cache_entry->hypertable_relid = ht->main_table_relid; cache_entry->hypertable_open_dimension = *hyperspace_get_open_dimension(ht->space, 0); if (cache_entry->hypertable_open_dimension.partitioning != NULL) @@ -230,9 +223,8 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) * rows (which act like deletes) and once with the new rows. */ TriggerData *trigdata = (TriggerData *) fcinfo->context; - char *hypertable_id_str, *parent_hypertable_id_str; - int32 hypertable_id, parent_hypertable_id = 0; - bool is_distributed_hypertable_trigger = false; + char *hypertable_id_str; + int32 hypertable_id; if (trigdata == NULL || trigdata->tg_trigger == NULL || trigdata->tg_trigger->tgnargs < 0) elog(ERROR, "must supply hypertable id"); @@ -240,13 +232,6 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) hypertable_id_str = trigdata->tg_trigger->tgargs[0]; hypertable_id = atol(hypertable_id_str); - if (trigdata->tg_trigger->tgnargs > 1) - { - parent_hypertable_id_str = trigdata->tg_trigger->tgargs[1]; - parent_hypertable_id = atol(parent_hypertable_id_str); - is_distributed_hypertable_trigger = true; - } - if (!CALLED_AS_TRIGGER(fcinfo)) elog(ERROR, "continuous agg trigger function must be called by trigger manager"); if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) @@ -255,9 +240,7 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) trigdata->tg_relation, trigdata->tg_trigtuple, trigdata->tg_newtuple, - TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event), - is_distributed_hypertable_trigger, - parent_hypertable_id); + TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)); if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) return PointerGetDatum(trigdata->tg_trigtuple); else @@ -272,8 +255,7 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS) */ void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, - HeapTuple chunk_newtuple, bool update, bool is_distributed_hypertable_trigger, - int32 parent_hypertable_id) + HeapTuple chunk_newtuple, bool update) { ContinuousAggsCacheInvalEntry *cache_entry; bool found; @@ -287,10 +269,7 @@ execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tu hash_search(continuous_aggs_cache_inval_htab, &hypertable_id, HASH_ENTER, &found); if (!found) - cache_inval_entry_init(cache_entry, - hypertable_id, - is_distributed_hypertable_trigger ? parent_hypertable_id : - hypertable_id); + cache_inval_entry_init(cache_entry, hypertable_id); /* handle the case where we need to repopulate the cached chunk data */ if (cache_entry->previous_chunk_relid != chunk_relid) @@ -327,12 +306,12 @@ cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry) * use a stronger isolation level, the isolation threshold could update without us seeing the * new value. In order to prevent serialization errors, we always append invalidation entries in * the case when we're using a strong enough isolation level that we won't see the new - * threshold. The same applies for distributed member invalidation triggers of hypertables. - * The materializer can handle invalidations that are beyond the threshold gracefully. + * threshold. The materializer can handle invalidations that are beyond the threshold + * gracefully. */ if (IsolationUsesXactSnapshot()) { - invalidation_hyper_log_add_entry(entry->entry_id, + invalidation_hyper_log_add_entry(entry->hypertable_id, entry->lowest_modified_value, entry->greatest_modified_value); return; @@ -341,7 +320,7 @@ cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry) liv = get_lowest_invalidated_time_for_hypertable(entry->hypertable_relid); if (entry->lowest_modified_value < liv) - invalidation_hyper_log_add_entry(entry->entry_id, + invalidation_hyper_log_add_entry(entry->hypertable_id, entry->lowest_modified_value, entry->greatest_modified_value); }; diff --git a/tsl/src/continuous_aggs/insert.h b/tsl/src/continuous_aggs/insert.h index 9bfac1f65..7bcde883f 100644 --- a/tsl/src/continuous_aggs/insert.h +++ b/tsl/src/continuous_aggs/insert.h @@ -12,6 +12,4 @@ extern Datum continuous_agg_trigfn(PG_FUNCTION_ARGS); extern void _continuous_aggs_cache_inval_init(void); extern void _continuous_aggs_cache_inval_fini(void); extern void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, - HeapTuple chunk_newtuple, bool update, - bool is_distributed_hypertable_trigger, - int32 parent_hypertable_id); + HeapTuple chunk_newtuple, bool update); diff --git a/tsl/src/continuous_aggs/invalidation.c b/tsl/src/continuous_aggs/invalidation.c index d2ded5b80..81b28f31b 100644 --- a/tsl/src/continuous_aggs/invalidation.c +++ b/tsl/src/continuous_aggs/invalidation.c @@ -221,8 +221,7 @@ invalidation_cagg_log_add_entry(int32 cagg_hyper_id, int64 start, int64 end) /** * Adds a materialization invalidation log entry to the local data node. * - * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access - * Node. + * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable * @param start_time The starting time of the materialization invalidation log entry. * @param end_time The ending time of the materialization invalidation log entry. */ @@ -244,8 +243,7 @@ tsl_invalidation_cagg_log_add_entry(PG_FUNCTION_ARGS) /** * Adds a hypertable invalidation log entry to the local data node. * - * @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access - * Node. + * @param raw_hypertable_id The hypertable ID of the original hypertable * @param start_time The starting time of the materialization invalidation log entry. * @param end_time The ending time of the materialization invalidation log entry. */ @@ -1087,17 +1085,15 @@ bucket_functions_default_argument(int ndim) } /** - * Processes the hypertable invalidation log in a data node for all the CAGGs that belong to the - * distributed hypertable with hypertable ID 'raw_hypertable_id' in the Access Node. The - * invalidations are cut, merged and moved to the materialization invalidation log. + * Processes the hypertable invalidation log for all the CAGGs that belong to the hypertable. + * The invalidations are cut, merged and moved to the materialization invalidation log. * - * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access - * Node that is currently being refreshed. - * @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access - * Node. + * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable that is currently + * being refreshed. + * @param raw_hypertable_id The hypertable ID of the original hypertable * @param dimtype The OID of the type of the time dimension for this CAGG. - * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables in - * the Access Node that belong to 'raw_hypertable_id'. + * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables that + * belong to 'raw_hypertable_id'. * @param bucket_widths The array of time bucket widths for all the CAGGs that belong to * 'raw_hypertable_id'. * @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward @@ -1189,19 +1185,18 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id, } /** - * Processes the materialization invalidation log in a data node for the CAGG being refreshed that - * belongs to the distributed hypertable with hypertable ID 'raw_hypertable_id' in the Access Node. - * The invalidations are cut, merged and returned as a single refresh window. + * Processes the materialization invalidation log for the CAGG being refreshed that belongs to the + * hypertable with hypertable ID 'raw_hypertable_id'. The invalidations are cut, merged and returned + * as a single refresh window. * - * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access - * Node that is currently being refreshed. - * @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access - * Node. + * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable that is currently + * being refreshed. + * @param raw_hypertable_id The hypertable ID of the original hypertable * @param dimtype The OID of the type of the time dimension for this CAGG. * @param window_start The starting time of the CAGG refresh window. * @param window_end The ending time of the CAGG refresh window. - * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables in - * the Access Node that belong to 'raw_hypertable_id'. + * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables that + * belong to 'raw_hypertable_id'. * @param bucket_widths The array of time bucket widths for all the CAGGs that belong to * 'raw_hypertable_id'. * @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward diff --git a/tsl/src/continuous_aggs/refresh.c b/tsl/src/continuous_aggs/refresh.c index 2bf0e0927..a2170cc44 100644 --- a/tsl/src/continuous_aggs/refresh.c +++ b/tsl/src/continuous_aggs/refresh.c @@ -66,7 +66,6 @@ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, const InvalidationStore *invalidations, const int64 bucket_width, int32 chunk_id, - const bool is_raw_ht_distributed, const bool do_merged_refresh, const InternalTimeRange merged_refresh_window); static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid); @@ -500,8 +499,7 @@ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg, const InternalTimeRange *refresh_window, const InvalidationStore *invalidations, const int64 bucket_width, - int32 chunk_id, const bool is_raw_ht_distributed, - const bool do_merged_refresh, + int32 chunk_id, const bool do_merged_refresh, const InternalTimeRange merged_refresh_window) { CaggRefreshState refresh; @@ -719,7 +717,6 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg, invalidations, bucket_width, chunk_id, - false, do_merged_refresh, merged_refresh_window); if (invalidations)