Remove some multinode leftovers from CAggs

This commit is contained in:
Fabrízio de Royes Mello 2024-01-22 10:47:26 -03:00
parent e89bc24af2
commit a44a19b095
6 changed files with 32 additions and 67 deletions

View File

@ -250,9 +250,7 @@ continuous_agg_invalidate_mat_ht_all_default(const Hypertable *raw_ht, const Hyp
static void static void
continuous_agg_call_invalidation_trigger_default(int32 hypertable_id, Relation chunk_rel, continuous_agg_call_invalidation_trigger_default(int32 hypertable_id, Relation chunk_rel,
HeapTuple chunk_tuple, HeapTuple chunk_newtuple, HeapTuple chunk_tuple, HeapTuple chunk_newtuple,
bool update, bool update)
bool is_distributed_hypertable_trigger,
int32 parent_hypertable_id)
{ {
error_no_default_fn_community(); error_no_default_fn_community();
pg_unreachable(); pg_unreachable();

View File

@ -101,9 +101,7 @@ typedef struct CrossModuleFunctions
PGFunction continuous_agg_invalidation_trigger; PGFunction continuous_agg_invalidation_trigger;
void (*continuous_agg_call_invalidation_trigger)(int32 hypertable_id, Relation chunk_rel, void (*continuous_agg_call_invalidation_trigger)(int32 hypertable_id, Relation chunk_rel,
HeapTuple chunk_tuple, HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update, HeapTuple chunk_newtuple, bool update);
bool is_distributed_hypertable_trigger,
int32 parent_hypertable_id);
PGFunction continuous_agg_refresh; PGFunction continuous_agg_refresh;
void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end); void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end);
void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht, void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht,

View File

@ -61,11 +61,6 @@ typedef struct ContinuousAggsCacheInvalEntry
{ {
int32 hypertable_id; int32 hypertable_id;
Oid hypertable_relid; Oid hypertable_relid;
int32 entry_id; /*
* This is what actually gets written to the hypertable log. It can be the same
* as the hypertable_id for normal hypertables. In the distributed case it is
* the ID of the parent hypertable in the Access Node.
*/
Dimension hypertable_open_dimension; Dimension hypertable_open_dimension;
Oid previous_chunk_relid; Oid previous_chunk_relid;
AttrNumber previous_chunk_open_dimension; AttrNumber previous_chunk_open_dimension;
@ -83,7 +78,7 @@ static MemoryContext continuous_aggs_trigger_mctx = NULL;
static int64 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc); static int64 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc);
static inline void cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, static inline void cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry,
int32 hypertable_id, int32 entry_id); int32 hypertable_id);
static inline void cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry, static inline void cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry,
Oid chunk_reloid, Relation chunk_relation); Oid chunk_reloid, Relation chunk_relation);
static inline void update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval); static inline void update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval);
@ -145,8 +140,7 @@ tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc)
} }
static inline void static inline void
cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id, cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id)
int32 entry_id)
{ {
Cache *ht_cache = ts_hypertable_cache_pin(); Cache *ht_cache = ts_hypertable_cache_pin();
/* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the /* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the
@ -160,7 +154,6 @@ cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hyperta
} }
cache_entry->hypertable_id = hypertable_id; cache_entry->hypertable_id = hypertable_id;
cache_entry->entry_id = entry_id;
cache_entry->hypertable_relid = ht->main_table_relid; cache_entry->hypertable_relid = ht->main_table_relid;
cache_entry->hypertable_open_dimension = *hyperspace_get_open_dimension(ht->space, 0); cache_entry->hypertable_open_dimension = *hyperspace_get_open_dimension(ht->space, 0);
if (cache_entry->hypertable_open_dimension.partitioning != NULL) if (cache_entry->hypertable_open_dimension.partitioning != NULL)
@ -230,9 +223,8 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
* rows (which act like deletes) and once with the new rows. * rows (which act like deletes) and once with the new rows.
*/ */
TriggerData *trigdata = (TriggerData *) fcinfo->context; TriggerData *trigdata = (TriggerData *) fcinfo->context;
char *hypertable_id_str, *parent_hypertable_id_str; char *hypertable_id_str;
int32 hypertable_id, parent_hypertable_id = 0; int32 hypertable_id;
bool is_distributed_hypertable_trigger = false;
if (trigdata == NULL || trigdata->tg_trigger == NULL || trigdata->tg_trigger->tgnargs < 0) if (trigdata == NULL || trigdata->tg_trigger == NULL || trigdata->tg_trigger->tgnargs < 0)
elog(ERROR, "must supply hypertable id"); elog(ERROR, "must supply hypertable id");
@ -240,13 +232,6 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
hypertable_id_str = trigdata->tg_trigger->tgargs[0]; hypertable_id_str = trigdata->tg_trigger->tgargs[0];
hypertable_id = atol(hypertable_id_str); hypertable_id = atol(hypertable_id_str);
if (trigdata->tg_trigger->tgnargs > 1)
{
parent_hypertable_id_str = trigdata->tg_trigger->tgargs[1];
parent_hypertable_id = atol(parent_hypertable_id_str);
is_distributed_hypertable_trigger = true;
}
if (!CALLED_AS_TRIGGER(fcinfo)) if (!CALLED_AS_TRIGGER(fcinfo))
elog(ERROR, "continuous agg trigger function must be called by trigger manager"); elog(ERROR, "continuous agg trigger function must be called by trigger manager");
if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event)) if (!TRIGGER_FIRED_AFTER(trigdata->tg_event) || !TRIGGER_FIRED_FOR_ROW(trigdata->tg_event))
@ -255,9 +240,7 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
trigdata->tg_relation, trigdata->tg_relation,
trigdata->tg_trigtuple, trigdata->tg_trigtuple,
trigdata->tg_newtuple, trigdata->tg_newtuple,
TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event), TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event));
is_distributed_hypertable_trigger,
parent_hypertable_id);
if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event)) if (!TRIGGER_FIRED_BY_UPDATE(trigdata->tg_event))
return PointerGetDatum(trigdata->tg_trigtuple); return PointerGetDatum(trigdata->tg_trigtuple);
else else
@ -272,8 +255,7 @@ continuous_agg_trigfn(PG_FUNCTION_ARGS)
*/ */
void void
execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update, bool is_distributed_hypertable_trigger, HeapTuple chunk_newtuple, bool update)
int32 parent_hypertable_id)
{ {
ContinuousAggsCacheInvalEntry *cache_entry; ContinuousAggsCacheInvalEntry *cache_entry;
bool found; bool found;
@ -287,10 +269,7 @@ execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tu
hash_search(continuous_aggs_cache_inval_htab, &hypertable_id, HASH_ENTER, &found); hash_search(continuous_aggs_cache_inval_htab, &hypertable_id, HASH_ENTER, &found);
if (!found) if (!found)
cache_inval_entry_init(cache_entry, cache_inval_entry_init(cache_entry, hypertable_id);
hypertable_id,
is_distributed_hypertable_trigger ? parent_hypertable_id :
hypertable_id);
/* handle the case where we need to repopulate the cached chunk data */ /* handle the case where we need to repopulate the cached chunk data */
if (cache_entry->previous_chunk_relid != chunk_relid) if (cache_entry->previous_chunk_relid != chunk_relid)
@ -327,12 +306,12 @@ cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry)
* use a stronger isolation level, the isolation threshold could update without us seeing the * use a stronger isolation level, the isolation threshold could update without us seeing the
* new value. In order to prevent serialization errors, we always append invalidation entries in * new value. In order to prevent serialization errors, we always append invalidation entries in
* the case when we're using a strong enough isolation level that we won't see the new * the case when we're using a strong enough isolation level that we won't see the new
* threshold. The same applies for distributed member invalidation triggers of hypertables. * threshold. The materializer can handle invalidations that are beyond the threshold
* The materializer can handle invalidations that are beyond the threshold gracefully. * gracefully.
*/ */
if (IsolationUsesXactSnapshot()) if (IsolationUsesXactSnapshot())
{ {
invalidation_hyper_log_add_entry(entry->entry_id, invalidation_hyper_log_add_entry(entry->hypertable_id,
entry->lowest_modified_value, entry->lowest_modified_value,
entry->greatest_modified_value); entry->greatest_modified_value);
return; return;
@ -341,7 +320,7 @@ cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry)
liv = get_lowest_invalidated_time_for_hypertable(entry->hypertable_relid); liv = get_lowest_invalidated_time_for_hypertable(entry->hypertable_relid);
if (entry->lowest_modified_value < liv) if (entry->lowest_modified_value < liv)
invalidation_hyper_log_add_entry(entry->entry_id, invalidation_hyper_log_add_entry(entry->hypertable_id,
entry->lowest_modified_value, entry->lowest_modified_value,
entry->greatest_modified_value); entry->greatest_modified_value);
}; };

View File

@ -12,6 +12,4 @@ extern Datum continuous_agg_trigfn(PG_FUNCTION_ARGS);
extern void _continuous_aggs_cache_inval_init(void); extern void _continuous_aggs_cache_inval_init(void);
extern void _continuous_aggs_cache_inval_fini(void); extern void _continuous_aggs_cache_inval_fini(void);
extern void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple, extern void execute_cagg_trigger(int32 hypertable_id, Relation chunk_rel, HeapTuple chunk_tuple,
HeapTuple chunk_newtuple, bool update, HeapTuple chunk_newtuple, bool update);
bool is_distributed_hypertable_trigger,
int32 parent_hypertable_id);

View File

@ -221,8 +221,7 @@ invalidation_cagg_log_add_entry(int32 cagg_hyper_id, int64 start, int64 end)
/** /**
* Adds a materialization invalidation log entry to the local data node. * Adds a materialization invalidation log entry to the local data node.
* *
* @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable
* Node.
* @param start_time The starting time of the materialization invalidation log entry. * @param start_time The starting time of the materialization invalidation log entry.
* @param end_time The ending time of the materialization invalidation log entry. * @param end_time The ending time of the materialization invalidation log entry.
*/ */
@ -244,8 +243,7 @@ tsl_invalidation_cagg_log_add_entry(PG_FUNCTION_ARGS)
/** /**
* Adds a hypertable invalidation log entry to the local data node. * Adds a hypertable invalidation log entry to the local data node.
* *
* @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access * @param raw_hypertable_id The hypertable ID of the original hypertable
* Node.
* @param start_time The starting time of the materialization invalidation log entry. * @param start_time The starting time of the materialization invalidation log entry.
* @param end_time The ending time of the materialization invalidation log entry. * @param end_time The ending time of the materialization invalidation log entry.
*/ */
@ -1087,17 +1085,15 @@ bucket_functions_default_argument(int ndim)
} }
/** /**
* Processes the hypertable invalidation log in a data node for all the CAGGs that belong to the * Processes the hypertable invalidation log for all the CAGGs that belong to the hypertable.
* distributed hypertable with hypertable ID 'raw_hypertable_id' in the Access Node. The * The invalidations are cut, merged and moved to the materialization invalidation log.
* invalidations are cut, merged and moved to the materialization invalidation log.
* *
* @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable that is currently
* Node that is currently being refreshed. * being refreshed.
* @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access * @param raw_hypertable_id The hypertable ID of the original hypertable
* Node.
* @param dimtype The OID of the type of the time dimension for this CAGG. * @param dimtype The OID of the type of the time dimension for this CAGG.
* @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables in * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables that
* the Access Node that belong to 'raw_hypertable_id'. * belong to 'raw_hypertable_id'.
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to * @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
* 'raw_hypertable_id'. * 'raw_hypertable_id'.
* @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward * @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward
@ -1189,19 +1185,18 @@ invalidation_process_cagg_log(int32 mat_hypertable_id, int32 raw_hypertable_id,
} }
/** /**
* Processes the materialization invalidation log in a data node for the CAGG being refreshed that * Processes the materialization invalidation log for the CAGG being refreshed that belongs to the
* belongs to the distributed hypertable with hypertable ID 'raw_hypertable_id' in the Access Node. * hypertable with hypertable ID 'raw_hypertable_id'. The invalidations are cut, merged and returned
* The invalidations are cut, merged and returned as a single refresh window. * as a single refresh window.
* *
* @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable in the Access * @param mat_hypertable_id The hypertable ID of the CAGG materialized hypertable that is currently
* Node that is currently being refreshed. * being refreshed.
* @param raw_hypertable_id The hypertable ID of the original distributed hypertable in the Access * @param raw_hypertable_id The hypertable ID of the original hypertable
* Node.
* @param dimtype The OID of the type of the time dimension for this CAGG. * @param dimtype The OID of the type of the time dimension for this CAGG.
* @param window_start The starting time of the CAGG refresh window. * @param window_start The starting time of the CAGG refresh window.
* @param window_end The ending time of the CAGG refresh window. * @param window_end The ending time of the CAGG refresh window.
* @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables in * @param mat_hypertable_ids The array of hypertable IDs for all CAGG materialized hypertables that
* the Access Node that belong to 'raw_hypertable_id'. * belong to 'raw_hypertable_id'.
* @param bucket_widths The array of time bucket widths for all the CAGGs that belong to * @param bucket_widths The array of time bucket widths for all the CAGGs that belong to
* 'raw_hypertable_id'. * 'raw_hypertable_id'.
* @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward * @param max_bucket_widths (Deprecated) This argument is ignored and is present only for backward

View File

@ -66,7 +66,6 @@ static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, const InvalidationStore *invalidations,
const int64 bucket_width, int32 chunk_id, const int64 bucket_width, int32 chunk_id,
const bool is_raw_ht_distributed,
const bool do_merged_refresh, const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window); const InternalTimeRange merged_refresh_window);
static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid); static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid);
@ -500,8 +499,7 @@ static void
continuous_agg_refresh_with_window(const ContinuousAgg *cagg, continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations, const int64 bucket_width, const InvalidationStore *invalidations, const int64 bucket_width,
int32 chunk_id, const bool is_raw_ht_distributed, int32 chunk_id, const bool do_merged_refresh,
const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window) const InternalTimeRange merged_refresh_window)
{ {
CaggRefreshState refresh; CaggRefreshState refresh;
@ -719,7 +717,6 @@ process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
invalidations, invalidations,
bucket_width, bucket_width,
chunk_id, chunk_id,
false,
do_merged_refresh, do_merged_refresh,
merged_refresh_window); merged_refresh_window);
if (invalidations) if (invalidations)