Refactor invalidation log inclusion

Commit 97c2578ffa6b08f733a75381defefc176c91826b overcomplicated the
`invalidate_add_entry` API by adding parameters related to the remote
function call for multi-node on materialization hypertables.

Refactored it simplifying the function interface and adding a new
function to deal with materialization hypertables on multi-node
environment.

Fixes #3833
This commit is contained in:
Fabrízio de Royes Mello 2021-12-10 10:40:10 -03:00
parent c5796c0f1d
commit 342f848d90
10 changed files with 112 additions and 70 deletions

View File

@ -3857,11 +3857,7 @@ ts_chunk_do_drop_chunks(Hypertable *ht, int64 older_than, int64 newer_than, int3
int64 start = ts_chunk_primary_dimension_start(&chunks[i]); int64 start = ts_chunk_primary_dimension_start(&chunks[i]);
int64 end = ts_chunk_primary_dimension_end(&chunks[i]); int64 end = ts_chunk_primary_dimension_end(&chunks[i]);
ts_cm_functions->continuous_agg_invalidate(ht, ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end);
HypertableIsRawTable,
ht->fd.id,
start,
end);
} }
} }

View File

@ -227,8 +227,14 @@ continuous_agg_update_options_default(ContinuousAgg *cagg, WithClauseResult *wit
} }
static void static void
continuous_agg_invalidate_all_default(const Hypertable *ht, continuous_agg_invalidate_raw_ht_all_default(const Hypertable *raw_ht, int64 start, int64 end)
ContinuousAggHypertableStatus caggstatus, int32 entry_id, {
error_no_default_fn_community();
pg_unreachable();
}
static void
continuous_agg_invalidate_mat_ht_all_default(const Hypertable *raw_ht, const Hypertable *mat_ht,
int64 start, int64 end) int64 start, int64 end)
{ {
error_no_default_fn_community(); error_no_default_fn_community();
@ -371,7 +377,8 @@ TSDLLEXPORT CrossModuleFunctions ts_cm_functions_default = {
.continuous_agg_call_invalidation_trigger = continuous_agg_call_invalidation_trigger_default, .continuous_agg_call_invalidation_trigger = continuous_agg_call_invalidation_trigger_default,
.continuous_agg_refresh = error_no_default_fn_pg_community, .continuous_agg_refresh = error_no_default_fn_pg_community,
.continuous_agg_refresh_chunk = error_no_default_fn_pg_community, .continuous_agg_refresh_chunk = error_no_default_fn_pg_community,
.continuous_agg_invalidate = continuous_agg_invalidate_all_default, .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht_all_default,
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht_all_default,
.continuous_agg_update_options = continuous_agg_update_options_default, .continuous_agg_update_options = continuous_agg_update_options_default,
.invalidation_cagg_log_add_entry = error_no_default_fn_pg_community, .invalidation_cagg_log_add_entry = error_no_default_fn_pg_community,
.invalidation_hyper_log_add_entry = error_no_default_fn_pg_community, .invalidation_hyper_log_add_entry = error_no_default_fn_pg_community,

View File

@ -100,8 +100,8 @@ typedef struct CrossModuleFunctions
int32 parent_hypertable_id); int32 parent_hypertable_id);
PGFunction continuous_agg_refresh; PGFunction continuous_agg_refresh;
PGFunction continuous_agg_refresh_chunk; PGFunction continuous_agg_refresh_chunk;
void (*continuous_agg_invalidate)(const Hypertable *ht, void (*continuous_agg_invalidate_raw_ht)(const Hypertable *raw_ht, int64 start, int64 end);
ContinuousAggHypertableStatus caggstatus, int32 entry_id, void (*continuous_agg_invalidate_mat_ht)(const Hypertable *raw_ht, const Hypertable *mat_ht,
int64 start, int64 end); int64 start, int64 end);
void (*continuous_agg_update_options)(ContinuousAgg *cagg, void (*continuous_agg_update_options)(ContinuousAgg *cagg,
WithClauseResult *with_clause_options); WithClauseResult *with_clause_options);

View File

@ -1036,9 +1036,8 @@ process_truncate(ProcessUtilityArgs *args)
* longer has any data */ * longer has any data */
raw_ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id); raw_ht = ts_hypertable_get_by_id(cagg->data.raw_hypertable_id);
Assert(raw_ht != NULL); Assert(raw_ht != NULL);
ts_cm_functions->continuous_agg_invalidate(raw_ht, ts_cm_functions->continuous_agg_invalidate_mat_ht(raw_ht,
HypertableIsMaterialization, mat_ht,
mat_ht->fd.id,
TS_TIME_NOBEGIN, TS_TIME_NOBEGIN,
TS_TIME_NOEND); TS_TIME_NOEND);
@ -1074,9 +1073,7 @@ process_truncate(ProcessUtilityArgs *args)
if (agg_status == HypertableIsRawTable) if (agg_status == HypertableIsRawTable)
{ {
/* The truncation invalidates all associated continuous aggregates */ /* The truncation invalidates all associated continuous aggregates */
ts_cm_functions->continuous_agg_invalidate(ht, ts_cm_functions->continuous_agg_invalidate_raw_ht(ht,
HypertableIsRawTable,
ht->fd.id,
TS_TIME_NOBEGIN, TS_TIME_NOBEGIN,
TS_TIME_NOEND); TS_TIME_NOEND);
} }
@ -1223,11 +1220,7 @@ process_drop_chunk(ProcessUtilityArgs *args, DropStmt *stmt)
Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id == Assert(hyperspace_get_open_dimension(ht->space, 0)->fd.id ==
chunk->cube->slices[0]->fd.dimension_id); chunk->cube->slices[0]->fd.dimension_id);
ts_cm_functions->continuous_agg_invalidate(ht, ts_cm_functions->continuous_agg_invalidate_raw_ht(ht, start, end);
HypertableIsRawTable,
ht->fd.id,
start,
end);
} }
} }
} }

View File

@ -55,4 +55,35 @@ SELECT create_hypertable('metrics', 'time');
ALTER TABLE metrics SET (timescaledb.compress); ALTER TABLE metrics SET (timescaledb.compress);
ERROR: functionality not supported under the current "apache" license ERROR: functionality not supported under the current "apache" license
HINT: Upgrade your license to 'timescale' to use this free community feature. HINT: Upgrade your license to 'timescale' to use this free community feature.
INSERT INTO metrics
VALUES ('2022-01-01 00:00:00', 1), ('2022-01-01 01:00:00', 2), ('2022-01-01 02:00:00', 3);
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(value),
MAX(value),
MIN(value)
FROM metrics
GROUP BY bucket
WITH NO DATA;
ERROR: functionality not supported under the current "apache" license
HINT: Upgrade your license to 'timescale' to use this free community feature.
CREATE MATERIALIZED VIEW metrics_hourly
AS
SELECT time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(value),
MAX(value),
MIN(value)
FROM metrics
GROUP BY bucket;
CALL refresh_continuous_aggregate('metrics_hourly', NULL, NULL);
ERROR: function "refresh_continuous_aggregate" is not supported under the current "apache" license
HINT: Upgrade your license to 'timescale' to use this free community feature.
SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(0, 0, 0);
ERROR: function "invalidation_hyper_log_add_entry" is not supported under the current "apache" license
HINT: Upgrade your license to 'timescale' to use this free community feature.
SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(0, 0, 0);
ERROR: function "invalidation_cagg_log_add_entry" is not supported under the current "apache" license
HINT: Upgrade your license to 'timescale' to use this free community feature.
DROP MATERIALIZED VIEW metrics_hourly;
DROP TABLE metrics; DROP TABLE metrics;

View File

@ -35,6 +35,34 @@ DROP FUNCTION custom_func;
CREATE TABLE metrics(time timestamptz NOT NULL, value float); CREATE TABLE metrics(time timestamptz NOT NULL, value float);
SELECT create_hypertable('metrics', 'time'); SELECT create_hypertable('metrics', 'time');
ALTER TABLE metrics SET (timescaledb.compress); ALTER TABLE metrics SET (timescaledb.compress);
DROP TABLE metrics;
INSERT INTO metrics
VALUES ('2022-01-01 00:00:00', 1), ('2022-01-01 01:00:00', 2), ('2022-01-01 02:00:00', 3);
CREATE MATERIALIZED VIEW metrics_hourly
WITH (timescaledb.continuous) AS
SELECT time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(value),
MAX(value),
MIN(value)
FROM metrics
GROUP BY bucket
WITH NO DATA;
CREATE MATERIALIZED VIEW metrics_hourly
AS
SELECT time_bucket(INTERVAL '1 hour', time) AS bucket,
AVG(value),
MAX(value),
MIN(value)
FROM metrics
GROUP BY bucket;
CALL refresh_continuous_aggregate('metrics_hourly', NULL, NULL);
SELECT _timescaledb_internal.invalidation_hyper_log_add_entry(0, 0, 0);
SELECT _timescaledb_internal.invalidation_cagg_log_add_entry(0, 0, 0);
DROP MATERIALIZED VIEW metrics_hourly;
DROP TABLE metrics;
\set ON_ERROR_STOP 1 \set ON_ERROR_STOP 1

View File

@ -589,18 +589,7 @@ mattablecolumninfo_create_materialization_table(MatTableColumnInfo *matcolinfo,
* aggregate. This is the initial state of the aggregate before any * aggregate. This is the initial state of the aggregate before any
* refreshes. */ * refreshes. */
orig_ht = ts_hypertable_cache_get_entry(hcache, origquery_tblinfo->htoid, CACHE_FLAG_NONE); orig_ht = ts_hypertable_cache_get_entry(hcache, origquery_tblinfo->htoid, CACHE_FLAG_NONE);
if (hypertable_is_distributed(orig_ht)) continuous_agg_invalidate_mat_ht(orig_ht, mat_ht, TS_TIME_NOBEGIN, TS_TIME_NOEND);
{
remote_invalidation_log_add_entry(orig_ht,
HypertableIsMaterialization,
mat_ht->fd.id,
TS_TIME_NOBEGIN,
TS_TIME_NOEND);
}
else
{
invalidation_cagg_log_add_entry(mat_htid, TS_TIME_NOBEGIN, TS_TIME_NOEND);
}
ts_cache_release(hcache); ts_cache_release(hcache);
return mat_htid; return mat_htid;
} }

View File

@ -320,41 +320,37 @@ invalidation_hyper_log_add_entry(int32 hyper_id, int64 start, int64 end)
* hypertable. * hypertable.
*/ */
void void
invalidation_add_entry(const Hypertable *ht, ContinuousAggHypertableStatus caggstatus, continuous_agg_invalidate_raw_ht(const Hypertable *raw_ht, int64 start, int64 end)
int32 entry_id, int64 start, int64 end)
{ {
Assert(ht != NULL); Assert(raw_ht != NULL);
if (hypertable_is_distributed(ht)) if (hypertable_is_distributed(raw_ht))
{ {
remote_invalidation_log_add_entry(ht, caggstatus, entry_id, start, end); remote_invalidation_log_add_entry(raw_ht, HypertableIsRawTable, raw_ht->fd.id, start, end);
} }
else else
{ {
ContinuousAggHypertableStatus caggstatus = ts_continuous_agg_hypertable_status(ht->fd.id); invalidation_hyper_log_add_entry(raw_ht->fd.id, start, end);
switch (caggstatus)
{
case HypertableIsMaterialization:
invalidation_cagg_log_add_entry(ht->fd.id, start, end);
break;
case HypertableIsRawTable:
invalidation_hyper_log_add_entry(ht->fd.id, start, end);
break;
case HypertableIsMaterializationAndRaw:
break;
case HypertableIsNotContinuousAgg:
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot invalidate hypertable \"%s\"",
get_rel_name(ht->main_table_relid)),
errdetail(
"There is no continuous aggregate associated with the hypertable.")));
break;
} }
}
void
continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hypertable *mat_ht, int64 start,
int64 end)
{
Assert((raw_ht != NULL) && (mat_ht != NULL));
if (hypertable_is_distributed(raw_ht))
{
remote_invalidation_log_add_entry(raw_ht,
HypertableIsMaterialization,
mat_ht->fd.id,
start,
end);
}
else
{
invalidation_cagg_log_add_entry(mat_ht->fd.id, start, end);
} }
} }

View File

@ -39,8 +39,9 @@ typedef struct Hypertable Hypertable;
extern void invalidation_cagg_log_add_entry(int32 cagg_hyper_id, int64 start, int64 end); extern void invalidation_cagg_log_add_entry(int32 cagg_hyper_id, int64 start, int64 end);
extern void invalidation_hyper_log_add_entry(int32 hyper_id, int64 start, int64 end); extern void invalidation_hyper_log_add_entry(int32 hyper_id, int64 start, int64 end);
extern void invalidation_add_entry(const Hypertable *ht, ContinuousAggHypertableStatus caggstatus, extern void continuous_agg_invalidate_raw_ht(const Hypertable *raw_ht, int64 start, int64 end);
int32 entry_id, int64 start, int64 end); extern void continuous_agg_invalidate_mat_ht(const Hypertable *raw_ht, const Hypertable *mat_ht,
int64 start, int64 end);
extern Datum tsl_invalidation_cagg_log_add_entry(PG_FUNCTION_ARGS); extern Datum tsl_invalidation_cagg_log_add_entry(PG_FUNCTION_ARGS);
extern Datum tsl_invalidation_hyper_log_add_entry(PG_FUNCTION_ARGS); extern Datum tsl_invalidation_hyper_log_add_entry(PG_FUNCTION_ARGS);

View File

@ -133,7 +133,8 @@ CrossModuleFunctions tsl_cm_functions = {
.continuous_agg_call_invalidation_trigger = execute_cagg_trigger, .continuous_agg_call_invalidation_trigger = execute_cagg_trigger,
.continuous_agg_refresh = continuous_agg_refresh, .continuous_agg_refresh = continuous_agg_refresh,
.continuous_agg_refresh_chunk = continuous_agg_refresh_chunk, .continuous_agg_refresh_chunk = continuous_agg_refresh_chunk,
.continuous_agg_invalidate = invalidation_add_entry, .continuous_agg_invalidate_raw_ht = continuous_agg_invalidate_raw_ht,
.continuous_agg_invalidate_mat_ht = continuous_agg_invalidate_mat_ht,
.continuous_agg_update_options = continuous_agg_update_options, .continuous_agg_update_options = continuous_agg_update_options,
.invalidation_cagg_log_add_entry = tsl_invalidation_cagg_log_add_entry, .invalidation_cagg_log_add_entry = tsl_invalidation_cagg_log_add_entry,
.invalidation_hyper_log_add_entry = tsl_invalidation_hyper_log_add_entry, .invalidation_hyper_log_add_entry = tsl_invalidation_hyper_log_add_entry,