Remove unused materializer code

The new refresh functionality for continuous aggregates replaces the
old materializer, which means some code is no longer used and should
be removed.

Closes #2395
This commit is contained in:
Erik Nordström 2020-09-10 20:29:42 +02:00 committed by Erik Nordström
parent 27e44f20ac
commit c884fe43f0
11 changed files with 10 additions and 1312 deletions

View File

@ -319,56 +319,6 @@ ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id)
return continuous_aggs;
}
TSDLLEXPORT int64
ts_continuous_aggs_max_ignore_invalidation_older_than(int32 raw_hypertable_id,
FormData_continuous_agg *entry)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, AccessShareLock, CurrentMemoryContext);
int64 ignore_invalidation_older_than = -1;
init_scan_by_raw_hypertable_id(&iterator, raw_hypertable_id);
ts_scanner_foreach(&iterator)
{
bool should_free;
HeapTuple tuple = ts_scan_iterator_fetch_heap_tuple(&iterator, false, &should_free);
FormData_continuous_agg *data = (FormData_continuous_agg *) GETSTRUCT(tuple);
if (ignore_invalidation_older_than < data->ignore_invalidation_older_than)
ignore_invalidation_older_than = data->ignore_invalidation_older_than;
if (entry != NULL)
memcpy(entry, data, sizeof(*entry));
if (should_free)
heap_freetuple(tuple);
}
return ignore_invalidation_older_than;
}
/* returns the inclusive value for the minimum time to invalidate */
TSDLLEXPORT int64
ts_continuous_aggs_get_minimum_invalidation_time(int64 modification_time,
int64 ignore_invalidation_older_than)
{
if (ignore_invalidation_older_than == PG_INT64_MAX ||
ignore_invalidation_older_than > modification_time)
{
/* invalidate everything */
return PG_INT64_MIN;
}
else if (ignore_invalidation_older_than == 0)
{
/* don't invalidate anything */
return PG_INT64_MAX;
}
else
{
return modification_time - ignore_invalidation_older_than;
}
}
/* Find a continuous aggregate by the materialized hypertable id */
ContinuousAgg *
ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id)

View File

@ -63,13 +63,7 @@ ts_continuous_agg_find_by_mat_hypertable_id(int32 mat_hypertable_id);
extern TSDLLEXPORT ContinuousAggHypertableStatus
ts_continuous_agg_hypertable_status(int32 hypertable_id);
extern TSDLLEXPORT List *ts_continuous_aggs_find_by_raw_table_id(int32 raw_hypertable_id);
extern TSDLLEXPORT int64 ts_continuous_aggs_max_ignore_invalidation_older_than(
int32 raw_hypertable_id, FormData_continuous_agg *entry);
extern TSDLLEXPORT int64 ts_continuous_aggs_get_minimum_invalidation_time(
int64 modification_time, int64 ignore_invalidation_older_than);
TSDLLEXPORT
int64 ts_continuous_agg_get_completed_threshold(int32 materialization_id);
extern TSDLLEXPORT int64 ts_continuous_agg_get_completed_threshold(int32 materialization_id);
extern TSDLLEXPORT ContinuousAgg *ts_continuous_agg_find_by_view_name(const char *schema,
const char *name,
ContinuousAggViewType type);

View File

@ -184,13 +184,6 @@ job_execute_default_fn(BgwJob *job)
pg_unreachable();
}
static bool
cagg_materialize_default_fn(int32 materialization_id, ContinuousAggMatOptions *options)
{
error_no_default_fn_community();
pg_unreachable();
}
static bool
process_compress_table_default(AlterTableCmd *cmd, Hypertable *ht,
WithClauseResult *with_clause_options)
@ -377,7 +370,6 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_refresh_all = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_invalidate = continuous_agg_invalidate_or_refresh_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default,
.continuous_agg_materialize = cagg_materialize_default_fn,
/* compression */
.compressed_data_send = error_no_default_fn_pg_community,

View File

@ -45,7 +45,6 @@ typedef struct CrossModuleFunctions
void (*print_tsl_license_expiration_info_hook)(void);
void (*module_shutdown_hook)(void);
void (*add_tsl_telemetry_info)(JsonbParseState **parse_state);
bool (*continuous_agg_materialize)(int32 materialization_id, ContinuousAggMatOptions *options);
PGFunction policy_compression_add;
PGFunction policy_compression_proc;

View File

@ -485,19 +485,6 @@ ts_hypertable_get_all(void)
return result;
}
/* Lazy-load the maximum ignore_invalidation_older_than setting from all associated continuous aggs
*/
TSDLLEXPORT int64
ts_hypertable_get_max_ignore_invalidation_older_than(Hypertable *ht)
{
if (ht->max_ignore_invalidation_older_than < 0)
ht->max_ignore_invalidation_older_than =
ts_continuous_aggs_max_ignore_invalidation_older_than(ht->fd.id, NULL);
Assert(ht->max_ignore_invalidation_older_than >= 0);
return ht->max_ignore_invalidation_older_than;
}
static ScanTupleResult
hypertable_tuple_update(TupleInfo *ti, void *data)
{

View File

@ -116,7 +116,6 @@ extern TSDLLEXPORT int ts_hypertable_update(Hypertable *ht);
extern int ts_hypertable_set_name(Hypertable *ht, const char *newname);
extern int ts_hypertable_set_schema(Hypertable *ht, const char *newname);
extern int ts_hypertable_set_num_dimensions(Hypertable *ht, int16 num_dimensions);
extern TSDLLEXPORT int64 ts_hypertable_get_max_ignore_invalidation_older_than(Hypertable *ht);
extern int ts_hypertable_delete_by_name(const char *schema_name, const char *table_name);
extern TSDLLEXPORT ObjectAddress ts_hypertable_create_trigger(Hypertable *ht, CreateTrigStmt *stmt,
const char *query);

View File

@ -62,8 +62,6 @@ typedef struct ContinuousAggsCacheInvalEntry
Oid hypertable_relid;
Dimension hypertable_open_dimension;
int64 modification_time;
int64 minimum_invalidation_time; /* inclusive */
Oid previous_chunk_relid;
AttrNumber previous_chunk_open_dimension;
@ -137,7 +135,6 @@ static inline void
cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hypertable_id)
{
Cache *ht_cache = ts_hypertable_cache_pin();
int64 ignore_invalidation_older_than;
/* NOTE: we can remove the id=>relid scan, if it becomes an issue, by getting the
* hypertable_relid directly from the Chunk*/
Hypertable *ht = ts_hypertable_cache_get_entry_by_id(ht_cache, hypertable_id);
@ -152,12 +149,6 @@ cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry, int32 hyperta
cache_entry->hypertable_open_dimension.partitioning = open_dim_part_info;
}
cache_entry->modification_time = ts_get_now_internal(&cache_entry->hypertable_open_dimension);
ignore_invalidation_older_than = ts_hypertable_get_max_ignore_invalidation_older_than(ht);
Assert(ignore_invalidation_older_than >= 0);
cache_entry->minimum_invalidation_time =
ts_continuous_aggs_get_minimum_invalidation_time(cache_entry->modification_time,
ignore_invalidation_older_than);
cache_entry->previous_chunk_relid = InvalidOid;
cache_entry->value_is_set = false;
cache_entry->lowest_modified_value = PG_INT64_MAX;
@ -185,9 +176,6 @@ cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry, Oid chun
static inline void
update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval)
{
if (timeval < cache_entry->minimum_invalidation_time)
return;
cache_entry->value_is_set = true;
if (timeval < cache_entry->lowest_modified_value)
cache_entry->lowest_modified_value = timeval;

File diff suppressed because it is too large Load Diff

View File

@ -36,12 +36,6 @@ typedef struct InternalTimeRange
} InternalTimeRange;
InternalTimeRange continuous_agg_materialize_window_max(Oid timetype);
bool continuous_agg_materialize(int32 materialization_id, ContinuousAggMatOptions *options);
void continuous_agg_execute_materialization(int64 bucket_width, int32 hypertable_id,
int32 materialization_id, SchemaAndName partial_view,
int64 invalidation_range_start,
int64 invalidation_range_end,
int64 materialization_invalidation_threshold);
void continuous_agg_update_materialization(SchemaAndName partial_view,
SchemaAndName materialization_table,
Name time_column_name,

View File

@ -60,7 +60,11 @@ cagg_get_hypertable_or_fail(int32 hypertable_id)
static InternalTimeRange
get_largest_bucketed_window(Oid timetype, int64 bucket_width)
{
InternalTimeRange maxwindow = continuous_agg_materialize_window_max(timetype);
InternalTimeRange maxwindow = {
.type = timetype,
.start = ts_time_get_min(timetype),
.end = ts_time_get_end_or_max(timetype),
};
InternalTimeRange maxbuckets;
/* For the MIN value, the corresponding bucket either falls on the exact

View File

@ -24,7 +24,6 @@
#include "compression/segment_meta.h"
#include "continuous_aggs/create.h"
#include "continuous_aggs/insert.h"
#include "continuous_aggs/materialize.h"
#include "continuous_aggs/options.h"
#include "continuous_aggs/refresh.h"
#include "continuous_aggs/invalidation.h"
@ -92,7 +91,6 @@ CrossModuleFunctions tsl_cm_functions = {
.print_tsl_license_expiration_info_hook = license_print_expiration_info,
.module_shutdown_hook = module_shutdown,
.add_tsl_telemetry_info = tsl_telemetry_add_info,
.continuous_agg_materialize = continuous_agg_materialize,
.create_upper_paths_hook = tsl_create_upper_paths_hook,
.set_rel_pathlist_dml = tsl_set_rel_pathlist_dml,