Refactor continuous aggregate code

This commit is contained in:
Rafia Sabih 2023-05-04 11:01:41 +02:00
parent 29154b29d1
commit 70d0704368
18 changed files with 2998 additions and 2707 deletions

View File

@ -1,9 +1,12 @@
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/common.c
${CMAKE_CURRENT_SOURCE_DIR}/finalize.c
${CMAKE_CURRENT_SOURCE_DIR}/create.c
${CMAKE_CURRENT_SOURCE_DIR}/insert.c
${CMAKE_CURRENT_SOURCE_DIR}/materialize.c
${CMAKE_CURRENT_SOURCE_DIR}/options.c
${CMAKE_CURRENT_SOURCE_DIR}/refresh.c
${CMAKE_CURRENT_SOURCE_DIR}/repair.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation.c
${CMAKE_CURRENT_SOURCE_DIR}/invalidation_threshold.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})

View File

@ -114,3 +114,20 @@ are cut against the given refresh window, leaving only invalidation
entries that are outside the refresh window. Subsequently, if the
refresh window does not match any invalidations, there is nothing to
refresh either.
## Distribution of functions across files
common.c
This file contains the functions common in all scenarios of creating a continuous aggregates.
create.c
This file contains the functions that are directly responsible for the creation of the continuous aggregates,
like creating hypertable, catalog_entry, view, etc.
finalize.c
This file contains the specific functions for the case when continous aggregates are created in old format.
materialize.c
This file contains the functions directly dealing with the materialization of the continuous aggregates.
repair.c
The repair and rebuilding related functions are put together in this file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,135 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_COMMON_H
#include <postgres.h>
#include <access/reloptions.h>
#include <access/xact.h>
#include <catalog/pg_aggregate.h>
#include <catalog/pg_type.h>
#include <catalog/toasting.h>
#include <commands/tablecmds.h>
#include <commands/tablespace.h>
#include <miscadmin.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/parsenodes.h>
#include <nodes/pg_list.h>
#include <optimizer/optimizer.h>
#include <parser/parse_func.h>
#include <parser/parse_oper.h>
#include <parser/parsetree.h>
#include <rewrite/rewriteHandler.h>
#include <rewrite/rewriteManip.h>
#include <utils/builtins.h>
#include <utils/syscache.h>
#include <utils/typcache.h>
#include "errors.h"
#include "func_cache.h"
#include "hypertable_cache.h"
#include "timezones.h"
#include "ts_catalog/catalog.h"
#include "ts_catalog/continuous_agg.h"
#define TS_PARTIALFN "partialize_agg"
#define CONTINUOUS_AGG_MAX_JOIN_RELATIONS 2
#define DEFAULT_MATPARTCOLUMN_NAME "time_partition_col"
typedef struct FinalizeQueryInfo
{
List *final_seltlist; /* select target list for finalize query */
Node *final_havingqual; /* having qual for finalize query */
Query *final_userquery; /* user query used to compute the finalize_query */
bool finalized; /* finalized form? */
} FinalizeQueryInfo;
typedef struct MatTableColumnInfo
{
List *matcollist; /* column defns for materialization tbl*/
List *partial_seltlist; /* tlist entries for populating the materialization table columns */
List *partial_grouplist; /* group clauses used for populating the materialization table */
List *mat_groupcolname_list; /* names of columns that are populated by the group-by clause
correspond to the partial_grouplist.
time_bucket column is not included here: it is the
matpartcolname */
int matpartcolno; /*index of partitioning column in matcollist */
char *matpartcolname; /*name of the partition column */
} MatTableColumnInfo;
typedef struct CAggTimebucketInfo
{
int32 htid; /* hypertable id */
int32 parent_mat_hypertable_id; /* parent materialization hypertable id */
Oid htoid; /* hypertable oid */
AttrNumber htpartcolno; /* primary partitioning column of raw hypertable */
/* This should also be the column used by time_bucket */
Oid htpartcoltype;
int64 htpartcol_interval_len; /* interval length setting for primary partitioning column */
int64 bucket_width; /* bucket_width of time_bucket, stores BUCKET_WIDTH_VARIABLE for
variable-sized buckets */
Oid bucket_width_type; /* type of bucket_width */
Interval *interval; /* stores the interval, NULL if not specified */
const char *timezone; /* the name of the timezone, NULL if not specified */
FuncExpr *bucket_func; /* function call expr of the bucketing function */
/*
* Custom origin value stored as UTC timestamp.
* If not specified, stores infinity.
*/
Timestamp origin;
} CAggTimebucketInfo;
typedef struct AggPartCxt
{
struct MatTableColumnInfo *mattblinfo;
bool added_aggref_col;
/*
* Set to true when you come across a Var
* that is not inside an Aggref node.
*/
bool var_outside_of_aggref;
Oid ignore_aggoid;
int original_query_resno;
/*
* "Original variables" are the Var nodes of the target list of the original
* CREATE MATERIALIZED VIEW query. "Mapped variables" are the Var nodes of the materialization
* table columns. The partialization query is the one that populates those columns. The
* finalization query should use the "mapped variables" to populate the user view.
*/
List *orig_vars; /* List of Var nodes that have been mapped to materialization table columns */
List *mapped_vars; /* List of Var nodes of the corresponding materialization table columns */
/* orig_vars and mapped_vars lists are mapped 1 to 1 */
} AggPartCxt;
#define CAGG_MAKEQUERY(selquery, srcquery) \
do \
{ \
(selquery) = makeNode(Query); \
(selquery)->commandType = CMD_SELECT; \
(selquery)->querySource = (srcquery)->querySource; \
(selquery)->queryId = (srcquery)->queryId; \
(selquery)->canSetTag = (srcquery)->canSetTag; \
(selquery)->utilityStmt = copyObject((srcquery)->utilityStmt); \
(selquery)->resultRelation = 0; \
(selquery)->hasAggs = true; \
(selquery)->hasRowSecurity = false; \
(selquery)->rtable = NULL; \
} while (0);
extern CAggTimebucketInfo cagg_validate_query(const Query *query, const bool finalized,
const char *cagg_schema, const char *cagg_name);
extern Query *destroy_union_query(Query *q);
extern Oid relation_oid(Name schema, Name name);
extern void RemoveRangeTableEntries(Query *query);
extern Query *build_union_query(CAggTimebucketInfo *tbinfo, int matpartcolno, Query *q1, Query *q2,
int materialize_htid);
extern void mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist);
extern void mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo);
extern bool function_allowed_in_cagg_definition(Oid funcid);
#endif

File diff suppressed because it is too large Load Diff

View File

@ -11,14 +11,10 @@
#include "with_clause_parser.h"
#include "ts_catalog/continuous_agg.h"
#define CONTINUOUS_AGG_CHUNK_ID_COL_NAME "chunk_id"
DDLResult tsl_process_continuous_agg_viewstmt(Node *node, const char *query_string, void *pstmt,
WithClauseResult *with_clause_options);
extern void cagg_flip_realtime_view_definition(ContinuousAgg *agg, Hypertable *mat_ht);
extern void cagg_rename_view_columns(ContinuousAgg *agg);
extern Datum tsl_cagg_try_repair(PG_FUNCTION_ARGS);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_CAGG_CREATE_H */

View File

@ -0,0 +1,914 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
/*
* This file contains the code related to the *NOT* finalized version of
* Continuous Aggregates (with partials)
*/
#include "finalize.h"
#include "create.h"
#include "common.h"
typedef struct CAggHavingCxt
{
List *origq_tlist;
List *finalizeq_tlist;
AggPartCxt agg_cxt;
} CAggHavingCxt;
/* Static function prototypes */
static Datum get_input_types_array_datum(Aggref *original_aggregate);
static Aggref *add_partialize_column(Aggref *agg_to_partialize, AggPartCxt *cxt);
static void set_var_mapping(Var *orig_var, Var *mapped_var, AggPartCxt *cxt);
static Var *var_already_mapped(Var *var, AggPartCxt *cxt);
static Node *create_replace_having_qual_mutator(Node *node, CAggHavingCxt *cxt);
static Node *finalizequery_create_havingqual(FinalizeQueryInfo *inp,
MatTableColumnInfo *mattblinfo);
static Var *mattablecolumninfo_addentry(MatTableColumnInfo *out, Node *input,
int original_query_resno, bool finalized,
bool *skip_adding);
static FuncExpr *get_partialize_funcexpr(Aggref *agg);
static inline void makeMaterializeColumnName(char *colbuf, const char *type,
int original_query_resno, int colno);
static inline void
makeMaterializeColumnName(char *colbuf, const char *type, int original_query_resno, int colno)
{
int ret = snprintf(colbuf, NAMEDATALEN, "%s_%d_%d", type, original_query_resno, colno);
if (ret < 0 || ret >= NAMEDATALEN)
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR), errmsg("bad materialization table column name")));
}
/*
* Creates a partialize expr for the passed in agg:
* partialize_agg(agg).
*/
static FuncExpr *
get_partialize_funcexpr(Aggref *agg)
{
FuncExpr *partialize_fnexpr;
Oid partfnoid, partargtype;
partargtype = ANYELEMENTOID;
partfnoid =
LookupFuncName(list_make2(makeString(INTERNAL_SCHEMA_NAME), makeString(TS_PARTIALFN)),
1,
&partargtype,
false);
partialize_fnexpr = makeFuncExpr(partfnoid,
BYTEAOID,
list_make1(agg), /*args*/
InvalidOid,
InvalidOid,
COERCE_EXPLICIT_CALL);
return partialize_fnexpr;
}
/*
* Build a [N][2] array where N is number of arguments
* and the inner array is of [schema_name,type_name].
*/
static Datum
get_input_types_array_datum(Aggref *original_aggregate)
{
ListCell *lc;
MemoryContext builder_context =
AllocSetContextCreate(CurrentMemoryContext, "input types builder", ALLOCSET_DEFAULT_SIZES);
Oid name_array_type_oid = get_array_type(NAMEOID);
ArrayBuildStateArr *outer_builder =
initArrayResultArr(name_array_type_oid, NAMEOID, builder_context, false);
Datum result;
foreach (lc, original_aggregate->args)
{
TargetEntry *te = lfirst(lc);
Oid type_oid = exprType((Node *) te->expr);
ArrayBuildState *schema_name_builder = initArrayResult(NAMEOID, builder_context, false);
HeapTuple tp;
Form_pg_type typtup;
char *schema_name;
Name type_name = (Name) palloc0(NAMEDATALEN);
Datum schema_datum;
Datum type_name_datum;
Datum inner_array_datum;
tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type_oid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for type %u", type_oid);
typtup = (Form_pg_type) GETSTRUCT(tp);
namestrcpy(type_name, NameStr(typtup->typname));
schema_name = get_namespace_name(typtup->typnamespace);
ReleaseSysCache(tp);
type_name_datum = NameGetDatum(type_name);
/* Using name in because creating from a char * (that may be null or too long). */
schema_datum = DirectFunctionCall1(namein, CStringGetDatum(schema_name));
accumArrayResult(schema_name_builder, schema_datum, false, NAMEOID, builder_context);
accumArrayResult(schema_name_builder, type_name_datum, false, NAMEOID, builder_context);
inner_array_datum = makeArrayResult(schema_name_builder, CurrentMemoryContext);
accumArrayResultArr(outer_builder,
inner_array_datum,
false,
name_array_type_oid,
builder_context);
}
result = makeArrayResultArr(outer_builder, CurrentMemoryContext, false);
MemoryContextDelete(builder_context);
return result;
}
static Aggref *
add_partialize_column(Aggref *agg_to_partialize, AggPartCxt *cxt)
{
Aggref *newagg;
Var *var;
bool skip_adding;
/*
* Step 1: create partialize( aggref) column
* for materialization table.
*/
var = mattablecolumninfo_addentry(cxt->mattblinfo,
(Node *) agg_to_partialize,
cxt->original_query_resno,
false,
&skip_adding);
cxt->added_aggref_col = true;
/*
* Step 2: create finalize_agg expr using var
* for the column added to the materialization table.
*/
/* This is a var for the column we created. */
newagg = get_finalize_aggref(agg_to_partialize, var);
return newagg;
}
static void
set_var_mapping(Var *orig_var, Var *mapped_var, AggPartCxt *cxt)
{
cxt->orig_vars = lappend(cxt->orig_vars, orig_var);
cxt->mapped_vars = lappend(cxt->mapped_vars, mapped_var);
}
/*
* Checks whether var has already been mapped and returns the
* corresponding column of the materialization table.
*/
static Var *
var_already_mapped(Var *var, AggPartCxt *cxt)
{
ListCell *lc_old, *lc_new;
forboth (lc_old, cxt->orig_vars, lc_new, cxt->mapped_vars)
{
Var *orig_var = (Var *) lfirst_node(Var, lc_old);
Var *mapped_var = (Var *) lfirst_node(Var, lc_new);
/* There should be no subqueries so varlevelsup should not be a problem here. */
if (var->varno == orig_var->varno && var->varattno == orig_var->varattno)
return mapped_var;
}
return NULL;
}
/*
* Add ts_internal_cagg_final to bytea column.
* bytea column is the internal state for an agg. Pass info for the agg as "inp".
* inpcol = bytea column.
* This function returns an aggref
* ts_internal_cagg_final( Oid, Oid, bytea, NULL::output_typeid)
* the arguments are a list of targetentry
*/
Oid
get_finalize_function_oid(void)
{
Oid finalfnoid;
Oid finalfnargtypes[] = { TEXTOID, NAMEOID, NAMEOID, get_array_type(NAMEOID),
BYTEAOID, ANYELEMENTOID };
List *funcname = list_make2(makeString(INTERNAL_SCHEMA_NAME), makeString(FINALFN));
int nargs = sizeof(finalfnargtypes) / sizeof(finalfnargtypes[0]);
finalfnoid = LookupFuncName(funcname, nargs, finalfnargtypes, false);
return finalfnoid;
}
/*
* Creates an aggref of the form:
* finalize-agg(
* "sum(int)" TEXT,
* collation_schema_name NAME, collation_name NAME,
* input_types_array NAME[N][2],
* <partial-column-name> BYTEA,
* null::<return-type of sum(int)>
* )
* here sum(int) is the input aggregate "inp" in the parameter-list.
*/
Aggref *
get_finalize_aggref(Aggref *inp, Var *partial_state_var)
{
Aggref *aggref;
TargetEntry *te;
char *aggregate_signature;
Const *aggregate_signature_const, *collation_schema_const, *collation_name_const,
*input_types_const, *return_type_const;
Oid name_array_type_oid = get_array_type(NAMEOID);
Var *partial_bytea_var;
List *tlist = NIL;
int tlist_attno = 1;
List *argtypes = NIL;
char *collation_name = NULL, *collation_schema_name = NULL;
Datum collation_name_datum = (Datum) 0;
Datum collation_schema_datum = (Datum) 0;
Oid finalfnoid = get_finalize_function_oid();
argtypes = list_make5_oid(TEXTOID, NAMEOID, NAMEOID, name_array_type_oid, BYTEAOID);
argtypes = lappend_oid(argtypes, inp->aggtype);
aggref = makeNode(Aggref);
aggref->aggfnoid = finalfnoid;
aggref->aggtype = inp->aggtype;
aggref->aggcollid = inp->aggcollid;
aggref->inputcollid = inp->inputcollid;
aggref->aggtranstype = InvalidOid; /* will be set by planner */
aggref->aggargtypes = argtypes;
aggref->aggdirectargs = NULL; /*relevant for hypothetical set aggs*/
aggref->aggorder = NULL;
aggref->aggdistinct = NULL;
aggref->aggfilter = NULL;
aggref->aggstar = false;
aggref->aggvariadic = false;
aggref->aggkind = AGGKIND_NORMAL;
aggref->aggsplit = AGGSPLIT_SIMPLE;
aggref->location = -1;
/* Construct the arguments. */
aggregate_signature = format_procedure_qualified(inp->aggfnoid);
aggregate_signature_const = makeConst(TEXTOID,
-1,
DEFAULT_COLLATION_OID,
-1,
CStringGetTextDatum(aggregate_signature),
false,
false /* passbyval */
);
te = makeTargetEntry((Expr *) aggregate_signature_const, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
if (OidIsValid(inp->inputcollid))
{
/* Similar to generate_collation_name. */
HeapTuple tp;
Form_pg_collation colltup;
tp = SearchSysCache1(COLLOID, ObjectIdGetDatum(inp->inputcollid));
if (!HeapTupleIsValid(tp))
elog(ERROR, "cache lookup failed for collation %u", inp->inputcollid);
colltup = (Form_pg_collation) GETSTRUCT(tp);
collation_name = pstrdup(NameStr(colltup->collname));
collation_name_datum = DirectFunctionCall1(namein, CStringGetDatum(collation_name));
collation_schema_name = get_namespace_name(colltup->collnamespace);
if (collation_schema_name != NULL)
collation_schema_datum =
DirectFunctionCall1(namein, CStringGetDatum(collation_schema_name));
ReleaseSysCache(tp);
}
collation_schema_const = makeConst(NAMEOID,
-1,
InvalidOid,
NAMEDATALEN,
collation_schema_datum,
(collation_schema_name == NULL) ? true : false,
false /* passbyval */
);
te = makeTargetEntry((Expr *) collation_schema_const, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
collation_name_const = makeConst(NAMEOID,
-1,
InvalidOid,
NAMEDATALEN,
collation_name_datum,
(collation_name == NULL) ? true : false,
false /* passbyval */
);
te = makeTargetEntry((Expr *) collation_name_const, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
input_types_const = makeConst(get_array_type(NAMEOID),
-1,
InvalidOid,
-1,
get_input_types_array_datum(inp),
false,
false /* passbyval */
);
te = makeTargetEntry((Expr *) input_types_const, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
partial_bytea_var = copyObject(partial_state_var);
te = makeTargetEntry((Expr *) partial_bytea_var, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
return_type_const = makeNullConst(inp->aggtype, -1, inp->aggcollid);
te = makeTargetEntry((Expr *) return_type_const, tlist_attno++, NULL, false);
tlist = lappend(tlist, te);
Assert(tlist_attno == 7);
aggref->args = tlist;
return aggref;
}
Node *
add_var_mutator(Node *node, AggPartCxt *cxt)
{
if (node == NULL)
return NULL;
if (IsA(node, Aggref))
{
return node; /* don't process this further */
}
if (IsA(node, Var))
{
Var *orig_var, *mapped_var;
bool skip_adding = false;
mapped_var = var_already_mapped((Var *) node, cxt);
/* Avoid duplicating columns in the materialization table. */
if (mapped_var)
/*
* There should be no subquery so mapped_var->varlevelsup
* should not be a problem here.
*/
return (Node *) copyObject(mapped_var);
orig_var = (Var *) node;
mapped_var = mattablecolumninfo_addentry(cxt->mattblinfo,
node,
cxt->original_query_resno,
false,
&skip_adding);
set_var_mapping(orig_var, mapped_var, cxt);
return (Node *) mapped_var;
}
return expression_tree_mutator(node, add_var_mutator, cxt);
}
/*
* This function modifies the passed in havingQual by mapping exprs to
* columns in materialization table or finalized aggregate form.
* Note that HAVING clause can contain only exprs from group-by or aggregates
* and GROUP BY clauses cannot be aggregates.
* (By the time we process havingQuals, all the group by exprs have been
* processed and have associated columns in the materialization hypertable).
* Example, if the original query has
* GROUP BY colA + colB, colC
* HAVING colA + colB + sum(colD) > 10 OR count(colE) = 10
*
* The transformed havingqual would be
* HAVING matCol3 + finalize_agg( sum(matCol4) > 10
* OR finalize_agg( count(matCol5)) = 10
*
*
* Note: GROUP BY exprs always appear in the query's targetlist.
* Some of the aggregates from the havingQual might also already appear in the targetlist.
* We replace all existing entries with their corresponding entry from the modified targetlist.
* If an aggregate (in the havingqual) does not exist in the TL, we create a
* materialization table column for it and use the finalize(column) form in the
* transformed havingQual.
*/
static Node *
create_replace_having_qual_mutator(Node *node, CAggHavingCxt *cxt)
{
if (node == NULL)
return NULL;
/*
* See if we already have a column in materialization hypertable for this
* expr. We do this by checking the existing targetlist
* entries for the query.
*/
ListCell *lc, *lc2;
List *origtlist = cxt->origq_tlist;
List *modtlist = cxt->finalizeq_tlist;
forboth (lc, origtlist, lc2, modtlist)
{
TargetEntry *te = (TargetEntry *) lfirst(lc);
TargetEntry *modte = (TargetEntry *) lfirst(lc2);
if (equal(node, te->expr))
{
return (Node *) modte->expr;
}
}
/*
* Didn't find a match in targetlist. If it is an aggregate,
* create a partialize column for it in materialization hypertable
* and return corresponding finalize expr.
*/
if (IsA(node, Aggref))
{
AggPartCxt *agg_cxt = &(cxt->agg_cxt);
agg_cxt->added_aggref_col = false;
Aggref *newagg = add_partialize_column((Aggref *) node, agg_cxt);
Assert(agg_cxt->added_aggref_col == true);
return (Node *) newagg;
}
return expression_tree_mutator(node, create_replace_having_qual_mutator, cxt);
}
static Node *
finalizequery_create_havingqual(FinalizeQueryInfo *inp, MatTableColumnInfo *mattblinfo)
{
Query *orig_query = inp->final_userquery;
if (orig_query->havingQual == NULL)
return NULL;
Node *havingQual = copyObject(orig_query->havingQual);
Assert(inp->final_seltlist != NULL);
CAggHavingCxt hcxt = { .origq_tlist = orig_query->targetList,
.finalizeq_tlist = inp->final_seltlist,
.agg_cxt.mattblinfo = mattblinfo,
.agg_cxt.original_query_resno = 0,
.agg_cxt.ignore_aggoid = get_finalize_function_oid(),
.agg_cxt.added_aggref_col = false,
.agg_cxt.var_outside_of_aggref = false,
.agg_cxt.orig_vars = NIL,
.agg_cxt.mapped_vars = NIL };
return create_replace_having_qual_mutator(havingQual, &hcxt);
}
Node *
add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt)
{
if (node == NULL)
return NULL;
/*
* Modify the aggref and create a partialize(aggref) expr
* for the materialization.
* Add a corresponding columndef for the mat table.
* Replace the aggref with the ts_internal_cagg_final fn.
* using a Var for the corresponding column in the mat table.
* All new Vars have varno = 1 (for RTE 1).
*/
if (IsA(node, Aggref))
{
if (cxt->ignore_aggoid == ((Aggref *) node)->aggfnoid)
return node; /* don't process this further */
Aggref *newagg = add_partialize_column((Aggref *) node, cxt);
return (Node *) newagg;
}
if (IsA(node, Var))
{
cxt->var_outside_of_aggref = true;
}
return expression_tree_mutator(node, add_aggregate_partialize_mutator, cxt);
}
/*
* Init the finalize query data structure.
* Parameters:
* orig_query - the original query from user view that is being used as template for the finalize
* query tlist_aliases - aliases for the view select list materialization table columns are created
* . This will be returned in the mattblinfo
*
* DO NOT modify orig_query. Make a copy if needed.
* SIDE_EFFECT: the data structure in mattblinfo is modified as a side effect by adding new
* materialize table columns and partialize exprs.
*/
void
finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query, MatTableColumnInfo *mattblinfo)
{
AggPartCxt cxt;
ListCell *lc;
int resno = 1;
inp->final_userquery = copyObject(orig_query);
inp->final_seltlist = NIL;
inp->final_havingqual = NULL;
/* Set up the final_seltlist and final_havingqual entries */
cxt.mattblinfo = mattblinfo;
cxt.ignore_aggoid = InvalidOid;
/* Set up the left over variable mapping lists */
cxt.orig_vars = NIL;
cxt.mapped_vars = NIL;
/*
* We want all the entries in the targetlist (resjunk or not)
* in the materialization table definition so we include group-by/having clause etc.
* We have to do 3 things here:
* 1) create a column for mat table
* 2) partialize_expr to populate it, and
* 3) modify the target entry to be a finalize_expr
* that selects from the materialization table.
*/
foreach (lc, orig_query->targetList)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
TargetEntry *modte = copyObject(tle);
cxt.added_aggref_col = false;
cxt.var_outside_of_aggref = false;
cxt.original_query_resno = resno;
if (!inp->finalized)
{
/*
* If tle has aggrefs, get the corresponding
* finalize_agg expression and save it in modte.
* Also add correspong materialization table column info
* for the aggrefs in tle.
*/
modte = (TargetEntry *) expression_tree_mutator((Node *) modte,
add_aggregate_partialize_mutator,
&cxt);
}
/*
* We need columns for non-aggregate targets.
* If it is not a resjunk OR appears in the grouping clause.
*/
if (cxt.added_aggref_col == false && (tle->resjunk == false || tle->ressortgroupref > 0))
{
Var *var;
bool skip_adding = false;
var = mattablecolumninfo_addentry(cxt.mattblinfo,
(Node *) tle,
cxt.original_query_resno,
inp->finalized,
&skip_adding);
/* Skip adding this column for finalized form. */
if (skip_adding)
{
continue;
}
/* Fix the expression for the target entry. */
modte->expr = (Expr *) var;
}
/* Check for left over variables (Var) of targets that contain Aggref. */
if (cxt.added_aggref_col && cxt.var_outside_of_aggref && !inp->finalized)
{
modte = (TargetEntry *) expression_tree_mutator((Node *) modte, add_var_mutator, &cxt);
}
/*
* Construct the targetlist for the query on the
* materialization table. The TL maps 1-1 with the original query:
* e.g select a, min(b)+max(d) from foo group by a,timebucket(a);
* becomes
* select <a-col>,
* ts_internal_cagg_final(..b-col ) + ts_internal_cagg_final(..d-col)
* from mattbl
* group by a-col, timebucket(a-col)
*/
/*
* We copy the modte target entries, resnos should be the same for
* final_selquery and origquery. So tleSortGroupReffor the targetentry
* can be reused, only table info needs to be modified.
*/
Assert((!inp->finalized && modte->resno == resno) ||
(inp->finalized && modte->resno >= resno));
resno++;
if (IsA(modte->expr, Var))
{
modte->resorigcol = ((Var *) modte->expr)->varattno;
}
inp->final_seltlist = lappend(inp->final_seltlist, modte);
}
/*
* All grouping clause elements are in targetlist already.
* So let's check the having clause.
*/
if (!inp->finalized)
inp->final_havingqual = finalizequery_create_havingqual(inp, mattblinfo);
}
/*
* Create select query with the finalize aggregates
* for the materialization table.
* matcollist - column list for mat table
* mattbladdress - materialization table ObjectAddress
* This is the function responsible for creating the final
* structures for selecting from the materialized hypertable
* created for the Cagg which is
* select * from _timescaldeb_internal._materialized_hypertable_<xxx>
*/
Query *
finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ObjectAddress *mattbladdress, char *relname)
{
Query *final_selquery = NULL;
ListCell *lc;
FromExpr *fromexpr;
RangeTblEntry *rte;
/*
* For initial cagg creation rtable will have only 1 entry,
* for alter table rtable will have multiple entries with our
* RangeTblEntry as last member.
* For cagg with joins, we need to create a new RTE and jointree
* which contains the information of the materialised hypertable
* that is created for this cagg.
*/
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
rte = makeNode(RangeTblEntry);
rte->alias = makeAlias(relname, NIL);
rte->inFromCl = true;
rte->inh = true;
rte->rellockmode = 1;
rte->eref = copyObject(rte->alias);
ListCell *l;
foreach (l, inp->final_userquery->jointree->fromlist)
{
/*
* In case of joins, update the rte with all the join related struct.
*/
Node *jtnode = (Node *) lfirst(l);
JoinExpr *join = NULL;
if (IsA(jtnode, JoinExpr))
{
join = castNode(JoinExpr, jtnode);
RangeTblEntry *jrte = rt_fetch(join->rtindex, inp->final_userquery->rtable);
rte->joinaliasvars = jrte->joinaliasvars;
rte->jointype = jrte->jointype;
#if PG13_GE
rte->joinleftcols = jrte->joinleftcols;
rte->joinrightcols = jrte->joinrightcols;
rte->joinmergedcols = jrte->joinmergedcols;
#endif
#if PG14_GE
rte->join_using_alias = jrte->join_using_alias;
#endif
rte->selectedCols = jrte->selectedCols;
}
}
}
else
{
rte = llast_node(RangeTblEntry, inp->final_userquery->rtable);
rte->eref->colnames = NIL;
rte->selectedCols = NULL;
}
if (rte->eref->colnames == NIL)
{
/*
* We only need to do this for the case when there is no Join node in the query.
* In the case of join, rte->eref is already populated by jrte->eref and hence the
* relevant info, so need not to do this.
*/
/* Aliases for column names for the materialization table. */
foreach (lc, matcollist)
{
ColumnDef *cdef = lfirst_node(ColumnDef, lc);
rte->eref->colnames = lappend(rte->eref->colnames, makeString(cdef->colname));
rte->selectedCols = bms_add_member(rte->selectedCols,
list_length(rte->eref->colnames) -
FirstLowInvalidHeapAttributeNumber);
}
}
rte->relid = mattbladdress->objectId;
rte->rtekind = RTE_RELATION;
rte->relkind = RELKIND_RELATION;
rte->tablesample = NULL;
rte->requiredPerms |= ACL_SELECT;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
/* 2. Fixup targetlist with the correct rel information. */
foreach (lc, inp->final_seltlist)
{
TargetEntry *tle = (TargetEntry *) lfirst(lc);
/*
* In case when this is a cagg wth joins, the Var from the normal table
* already has resorigtbl populated and we need to use that to resolve
* the Var. Hence only modify the tle when resorigtbl is unset
* which means it is Var of the Hypertable
*/
if (IsA(tle->expr, Var) && !OidIsValid(tle->resorigtbl))
{
tle->resorigtbl = rte->relid;
tle->resorigcol = ((Var *) tle->expr)->varattno;
}
}
CAGG_MAKEQUERY(final_selquery, inp->final_userquery);
final_selquery->hasAggs = !inp->finalized;
if (list_length(inp->final_userquery->jointree->fromlist) >=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS ||
!IsA(linitial(inp->final_userquery->jointree->fromlist), RangeTblRef))
{
RangeTblRef *rtr;
final_selquery->rtable = list_make1(rte);
rtr = makeNode(RangeTblRef);
rtr->rtindex = 1;
fromexpr = makeFromExpr(list_make1(rtr), NULL);
}
else
{
final_selquery->rtable = inp->final_userquery->rtable;
fromexpr = inp->final_userquery->jointree;
fromexpr->quals = NULL;
}
/*
* Fixup from list. No quals on original table should be
* present here - they should be on the query that populates
* the mattable (partial_selquery). For the Cagg with join,
* we can not copy the fromlist from inp->final_userquery as
* it has two tables in this case.
*/
Assert(list_length(inp->final_userquery->jointree->fromlist) <=
CONTINUOUS_AGG_MAX_JOIN_RELATIONS);
final_selquery->jointree = fromexpr;
final_selquery->targetList = inp->final_seltlist;
final_selquery->sortClause = inp->final_userquery->sortClause;
if (!inp->finalized)
{
final_selquery->groupClause = inp->final_userquery->groupClause;
/* Copy the having clause too */
final_selquery->havingQual = inp->final_havingqual;
}
return final_selquery;
}
/*
* Add Information required to create and populate the materialization table columns
* a) create a columndef for the materialization table
* b) create the corresponding expr to populate the column of the materialization table (e..g for a
* column that is an aggref, we create a partialize_agg expr to populate the column Returns: the
* Var corresponding to the newly created column of the materialization table
*
* Notes: make sure the materialization table columns do not save
* values computed by mutable function.
*
* Notes on TargetEntry fields:
* - (resname != NULL) means it's projected in our case
* - (ressortgroupref > 0) means part of GROUP BY, which can be projected or not, depending of the
* value of the resjunk
* - (resjunk == true) applies for GROUP BY columns that are not projected
*
*/
static Var *
mattablecolumninfo_addentry(MatTableColumnInfo *out, Node *input, int original_query_resno,
bool finalized, bool *skip_adding)
{
int matcolno = list_length(out->matcollist) + 1;
char colbuf[NAMEDATALEN];
char *colname;
TargetEntry *part_te = NULL;
ColumnDef *col;
Var *var;
Oid coltype, colcollation;
int32 coltypmod;
*skip_adding = false;
if (contain_mutable_functions(input))
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("only immutable functions supported in continuous aggregate view"),
errhint("Make sure all functions in the continuous aggregate definition"
" have IMMUTABLE volatility. Note that functions or expressions"
" may be IMMUTABLE for one data type, but STABLE or VOLATILE for"
" another.")));
}
switch (nodeTag(input))
{
case T_Aggref:
{
FuncExpr *fexpr = get_partialize_funcexpr((Aggref *) input);
makeMaterializeColumnName(colbuf, "agg", original_query_resno, matcolno);
colname = colbuf;
coltype = BYTEAOID;
coltypmod = -1;
colcollation = InvalidOid;
col = makeColumnDef(colname, coltype, coltypmod, colcollation);
part_te = makeTargetEntry((Expr *) fexpr, matcolno, pstrdup(colname), false);
}
break;
case T_TargetEntry:
{
TargetEntry *tle = (TargetEntry *) input;
bool timebkt_chk = false;
if (IsA(tle->expr, FuncExpr))
timebkt_chk = function_allowed_in_cagg_definition(((FuncExpr *) tle->expr)->funcid);
if (tle->resname)
colname = pstrdup(tle->resname);
else
{
if (timebkt_chk)
colname = DEFAULT_MATPARTCOLUMN_NAME;
else
{
makeMaterializeColumnName(colbuf, "grp", original_query_resno, matcolno);
colname = colbuf;
/* For finalized form we skip adding extra group by columns. */
*skip_adding = finalized;
}
}
if (timebkt_chk)
{
tle->resname = pstrdup(colname);
out->matpartcolno = matcolno;
out->matpartcolname = pstrdup(colname);
}
else
{
/*
* Add indexes only for columns that are part of the GROUP BY clause
* and for finals form.
* We skip adding it because we'll not add the extra group by columns
* to the materialization hypertable anymore.
*/
if (!*skip_adding && tle->ressortgroupref > 0)
out->mat_groupcolname_list =
lappend(out->mat_groupcolname_list, pstrdup(colname));
}
coltype = exprType((Node *) tle->expr);
coltypmod = exprTypmod((Node *) tle->expr);
colcollation = exprCollation((Node *) tle->expr);
col = makeColumnDef(colname, coltype, coltypmod, colcollation);
part_te = (TargetEntry *) copyObject(input);
/* Keep original resjunk if finalized or not time bucket. */
if (!finalized || timebkt_chk)
{
/*
* Need to project all the partial entries so that
* materialization table is filled.
*/
part_te->resjunk = false;
}
part_te->resno = matcolno;
if (timebkt_chk)
{
col->is_not_null = true;
}
if (part_te->resname == NULL)
{
part_te->resname = pstrdup(colname);
}
}
break;
case T_Var:
{
makeMaterializeColumnName(colbuf, "var", original_query_resno, matcolno);
colname = colbuf;
coltype = exprType(input);
coltypmod = exprTypmod(input);
colcollation = exprCollation(input);
col = makeColumnDef(colname, coltype, coltypmod, colcollation);
part_te = makeTargetEntry((Expr *) input, matcolno, pstrdup(colname), false);
/* Need to project all the partial entries so that materialization table is filled. */
part_te->resjunk = false;
part_te->resno = matcolno;
}
break;
default:
elog(ERROR, "invalid node type %d", nodeTag(input));
break;
}
Assert((!finalized && list_length(out->matcollist) == list_length(out->partial_seltlist)) ||
(finalized && list_length(out->matcollist) <= list_length(out->partial_seltlist)));
Assert(col != NULL);
Assert(part_te != NULL);
if (!*skip_adding)
{
out->matcollist = lappend(out->matcollist, col);
}
out->partial_seltlist = lappend(out->partial_seltlist, part_te);
var = makeVar(1, matcolno, coltype, coltypmod, colcollation, 0);
return var;
}

View File

@ -0,0 +1,39 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_FINALIZE_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_FINALIZE_H
#include <postgres.h>
#include <catalog/pg_aggregate.h>
#include <catalog/pg_collation.h>
#include <catalog/pg_type.h>
#include <nodes/makefuncs.h>
#include <nodes/nodeFuncs.h>
#include <nodes/pg_list.h>
#include <parser/parse_func.h>
#include <utils/builtins.h>
#include <utils/regproc.h>
#include <utils/syscache.h>
#include "ts_catalog/catalog.h"
#include "common.h"
#define FINALFN "finalize_agg"
extern Oid get_finalize_function_oid(void);
extern Aggref *get_finalize_aggref(Aggref *inp, Var *partial_state_var);
extern Node *add_aggregate_partialize_mutator(Node *node, AggPartCxt *cxt);
extern Node *add_var_mutator(Node *node, AggPartCxt *cxt);
extern Node *finalize_query_create_having_qual(FinalizeQueryInfo *inp,
MatTableColumnInfo *mattblinfo);
extern Query *finalize_query_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ObjectAddress *mattbladdress);
extern void finalizequery_init(FinalizeQueryInfo *inp, Query *orig_query,
MatTableColumnInfo *mattblinfo);
extern Query *finalizequery_get_select_query(FinalizeQueryInfo *inp, List *matcollist,
ObjectAddress *mattbladdress, char *relname);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_FINALIZE_H */

View File

@ -84,6 +84,18 @@ static MemoryContext continuous_aggs_trigger_mctx = NULL;
void _continuous_aggs_cache_inval_init(void);
void _continuous_aggs_cache_inval_fini(void);
static int64 tuple_get_time(Dimension *d, HeapTuple tuple, AttrNumber col, TupleDesc tupdesc);
static inline void cache_inval_entry_init(ContinuousAggsCacheInvalEntry *cache_entry,
int32 hypertable_id, int32 entry_id);
static inline void cache_entry_switch_to_chunk(ContinuousAggsCacheInvalEntry *cache_entry,
Oid chunk_id, Relation chunk_relation);
static inline void update_cache_entry(ContinuousAggsCacheInvalEntry *cache_entry, int64 timeval);
static void cache_inval_entry_write(ContinuousAggsCacheInvalEntry *entry);
static void cache_inval_cleanup(void);
static void cache_inval_htab_write(void);
static void continuous_agg_xact_invalidation_callback(XactEvent event, void *arg);
static ScanTupleResult invalidation_tuple_found(TupleInfo *ti, void *min);
static void
cache_inval_init()
{

View File

@ -111,6 +111,48 @@ typedef enum LogType
LOG_CAGG,
} LogType;
static Relation open_invalidation_log(LogType type, LOCKMODE lockmode);
static void hypertable_invalidation_scan_init(ScanIterator *iterator, int32 hyper_id,
LOCKMODE lockmode);
static HeapTuple create_invalidation_tup(const TupleDesc tupdesc, int32 cagg_hyper_id, int64 start,
int64 end);
static bool save_invalidation_for_refresh(const CaggInvalidationState *state,
const Invalidation *invalidation);
static void set_remainder_after_cut(Invalidation *remainder, int32 hyper_id,
int64 lowest_modified_value, int64 greatest_modified_value);
static void invalidation_entry_reset(Invalidation *entry);
static void
invalidation_expand_to_bucket_boundaries(Invalidation *inv, Oid time_type_oid, int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function);
static void
invalidation_entry_set_from_hyper_invalidation(Invalidation *entry, const TupleInfo *ti,
int32 hyper_id, Oid dimtype, int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function);
static void
invalidation_entry_set_from_cagg_invalidation(Invalidation *entry, const TupleInfo *ti, Oid dimtype,
int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function);
static bool invalidations_can_be_merged(const Invalidation *a, const Invalidation *b);
static bool invalidation_entry_try_merge(Invalidation *entry, const Invalidation *newentry);
static void cut_and_insert_new_cagg_invalidation(const CaggInvalidationState *state,
const Invalidation *entry, int32 cagg_hyper_id);
static void move_invalidations_from_hyper_to_cagg_log(const CaggInvalidationState *state);
static void cagg_invalidations_scan_by_hypertable_init(ScanIterator *iterator, int32 cagg_hyper_id,
LOCKMODE lockmode);
static Invalidation cut_cagg_invalidation(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window,
const Invalidation *entry);
static Invalidation cut_cagg_invalidation_and_compute_remainder(
const CaggInvalidationState *state, const InternalTimeRange *refresh_window,
const Invalidation *mergedentry, const Invalidation *current_remainder);
static void clear_cagg_invalidations_for_refresh(const CaggInvalidationState *state,
const InternalTimeRange *refresh_window);
static void invalidation_state_init(CaggInvalidationState *state, int32 mat_hypertable_id,
int32 raw_hypertable_id, Oid dimtype,
const CaggsInfo *all_caggs);
static void invalidation_state_cleanup(const CaggInvalidationState *state);
static ArrayType *bucket_functions_default_argument(int ndim);
static Relation
open_invalidation_log(LogType type, LOCKMODE lockmode)
{

View File

@ -70,6 +70,9 @@ typedef struct InvalidationThresholdData
bool was_updated;
} InvalidationThresholdData;
static ScanTupleResult scan_update_invalidation_threshold(TupleInfo *ti, void *data);
static ScanTupleResult invalidation_threshold_tuple_found(TupleInfo *ti, void *data);
static ScanTupleResult
scan_update_invalidation_threshold(TupleInfo *ti, void *data)
{

View File

@ -23,6 +23,9 @@
#include "materialize.h"
#define CHUNKIDFROMRELID "chunk_id_from_relid"
#define CONTINUOUS_AGG_CHUNK_ID_COL_NAME "chunk_id"
static bool ranges_overlap(InternalTimeRange invalidation_range,
InternalTimeRange new_materialization_range);
static TimeRange internal_time_range_to_time_range(InternalTimeRange internal);
@ -349,3 +352,95 @@ spi_insert_materializations(Hypertable *mat_ht, SchemaAndName partial_view,
}
}
}
/*
* Initialize MatTableColumnInfo.
*/
void
mattablecolumninfo_init(MatTableColumnInfo *matcolinfo, List *grouplist)
{
matcolinfo->matcollist = NIL;
matcolinfo->partial_seltlist = NIL;
matcolinfo->partial_grouplist = grouplist;
matcolinfo->mat_groupcolname_list = NIL;
matcolinfo->matpartcolno = -1;
matcolinfo->matpartcolname = NULL;
}
/*
* Add internal columns for the materialization table.
*/
void
mattablecolumninfo_addinternal(MatTableColumnInfo *matcolinfo)
{
Index maxRef;
int colno = list_length(matcolinfo->partial_seltlist) + 1;
ColumnDef *col;
Var *chunkfn_arg1;
FuncExpr *chunk_fnexpr;
Oid chunkfnoid;
Oid argtype[] = { OIDOID };
Oid rettype = INT4OID;
TargetEntry *chunk_te;
Oid sortop, eqop;
bool hashable;
ListCell *lc;
SortGroupClause *grpcl;
/* Add a chunk_id column for materialization table */
Node *vexpr = (Node *) makeVar(1, colno, INT4OID, -1, InvalidOid, 0);
col = makeColumnDef(CONTINUOUS_AGG_CHUNK_ID_COL_NAME,
exprType(vexpr),
exprTypmod(vexpr),
exprCollation(vexpr));
matcolinfo->matcollist = lappend(matcolinfo->matcollist, col);
/*
* Need to add an entry to the target list for computing chunk_id column
* : chunk_for_tuple( htid, table.*).
*/
chunkfnoid =
LookupFuncName(list_make2(makeString(INTERNAL_SCHEMA_NAME), makeString(CHUNKIDFROMRELID)),
sizeof(argtype) / sizeof(argtype[0]),
argtype,
false);
chunkfn_arg1 = makeVar(1, TableOidAttributeNumber, OIDOID, -1, 0, 0);
chunk_fnexpr = makeFuncExpr(chunkfnoid,
rettype,
list_make1(chunkfn_arg1),
InvalidOid,
InvalidOid,
COERCE_EXPLICIT_CALL);
chunk_te = makeTargetEntry((Expr *) chunk_fnexpr,
colno,
pstrdup(CONTINUOUS_AGG_CHUNK_ID_COL_NAME),
false);
matcolinfo->partial_seltlist = lappend(matcolinfo->partial_seltlist, chunk_te);
/* Any internal column needs to be added to the group-by clause as well. */
maxRef = 0;
foreach (lc, matcolinfo->partial_seltlist)
{
Index ref = ((TargetEntry *) lfirst(lc))->ressortgroupref;
if (ref > maxRef)
maxRef = ref;
}
chunk_te->ressortgroupref =
maxRef + 1; /* used by sortgroupclause to identify the targetentry */
grpcl = makeNode(SortGroupClause);
get_sort_group_operators(exprType((Node *) chunk_te->expr),
false,
true,
false,
&sortop,
&eqop,
NULL,
&hashable);
grpcl->tleSortGroupRef = chunk_te->ressortgroupref;
grpcl->eqop = eqop;
grpcl->sortop = sortop;
grpcl->nulls_first = false;
grpcl->hashable = hashable;
matcolinfo->partial_grouplist = lappend(matcolinfo->partial_grouplist, grpcl);
}

View File

@ -10,6 +10,7 @@
#include <fmgr.h>
#include <nodes/pg_list.h>
#include "ts_catalog/continuous_agg.h"
#include "common.h"
typedef struct SchemaAndName
{
@ -40,5 +41,4 @@ void continuous_agg_update_materialization(Hypertable *mat_ht, SchemaAndName par
const NameData *time_column_name,
InternalTimeRange new_materialization_range,
InternalTimeRange invalidation_range, int32 chunk_id);
#endif /* TIMESCALEDB_TSL_CONTINUOUS_AGGS_MATERIALIZE_H */

View File

@ -23,8 +23,13 @@
#include "hypertable_cache.h"
#include "scan_iterator.h"
static void cagg_update_materialized_only(ContinuousAgg *agg, bool materialized_only);
static List *cagg_find_groupingcols(ContinuousAgg *agg, Hypertable *mat_ht);
static List *cagg_get_compression_params(ContinuousAgg *agg, Hypertable *mat_ht);
static void cagg_alter_compression(ContinuousAgg *agg, Hypertable *mat_ht, List *compress_defelems);
static void
update_materialized_only(ContinuousAgg *agg, bool materialized_only)
cagg_update_materialized_only(ContinuousAgg *agg, bool materialized_only)
{
ScanIterator iterator =
ts_scan_iterator_create(CONTINUOUS_AGG, RowExclusiveLock, CurrentMemoryContext);
@ -255,7 +260,7 @@ continuous_agg_update_options(ContinuousAgg *agg, WithClauseResult *with_clause_
Assert(mat_ht != NULL);
cagg_flip_realtime_view_definition(agg, mat_ht);
update_materialized_only(agg, materialized_only);
cagg_update_materialized_only(agg, materialized_only);
ts_cache_release(hcache);
}
List *compression_options = ts_continuous_agg_get_compression_defelems(with_clause_options);

View File

@ -39,6 +39,43 @@ typedef struct CaggRefreshState
SchemaAndName partial_view;
} CaggRefreshState;
static Hypertable *cagg_get_hypertable_or_fail(int32 hypertable_id);
static InternalTimeRange get_largest_bucketed_window(Oid timetype, int64 bucket_width);
static InternalTimeRange
compute_inscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
const int64 bucket_width);
static InternalTimeRange
compute_circumscribed_bucketed_refresh_window(const InternalTimeRange *const refresh_window,
const int64 bucket_width,
const ContinuousAggsBucketFunction *bucket_function);
static void continuous_agg_refresh_init(CaggRefreshState *refresh, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window);
static void continuous_agg_refresh_execute(const CaggRefreshState *refresh,
const InternalTimeRange *bucketed_refresh_window,
const int32 chunk_id);
static void log_refresh_window(int elevel, const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window, const char *msg);
static long materialization_per_refresh_window(void);
static void continuous_agg_refresh_execute_wrapper(const InternalTimeRange *bucketed_refresh_window,
const long iteration, void *arg1_refresh,
void *arg2_chunk_id);
static void update_merged_refresh_window(const InternalTimeRange *bucketed_refresh_window,
const long iteration, void *arg1_merged_refresh_window,
void *arg2);
static void continuous_agg_refresh_with_window(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const InvalidationStore *invalidations,
const int64 bucket_width, int32 chunk_id,
const bool is_raw_ht_distributed,
const bool do_merged_refresh,
const InternalTimeRange merged_refresh_window);
static ContinuousAgg *get_cagg_by_relid(const Oid cagg_relid);
static void emit_up_to_date_notice(const ContinuousAgg *cagg, const CaggRefreshCallContext callctx);
static bool process_cagg_invalidations_and_refresh(const ContinuousAgg *cagg,
const InternalTimeRange *refresh_window,
const CaggRefreshCallContext callctx,
int32 chunk_id);
static Hypertable *
cagg_get_hypertable_or_fail(int32 hypertable_id)
{

View File

@ -0,0 +1,244 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#include "repair.h"
static void cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht,
bool force_rebuild);
/*
* Test the view definition of an existing continuous aggregate
* for errors and attempt to rebuild it if required.
*/
static void
cagg_rebuild_view_definition(ContinuousAgg *agg, Hypertable *mat_ht, bool force_rebuild)
{
bool test_failed = false;
char *relname = agg->data.user_view_name.data;
char *schema = agg->data.user_view_schema.data;
ListCell *lc1, *lc2;
int sec_ctx;
Oid uid, saved_uid;
/* Cagg view created by the user. */
Oid user_view_oid = relation_oid(&agg->data.user_view_schema, &agg->data.user_view_name);
Relation user_view_rel = relation_open(user_view_oid, AccessShareLock);
Query *user_query = get_view_query(user_view_rel);
bool finalized = ContinuousAggIsFinalized(agg);
bool rebuild_cagg_with_joins = false;
/* Extract final query from user view query. */
Query *final_query = copyObject(user_query);
RemoveRangeTableEntries(final_query);
if (finalized && !force_rebuild)
{
/* This continuous aggregate does not have partials, do not check for defects. */
elog(DEBUG1,
"[cagg_rebuild_view_definition] %s.%s does not have partials, do not check for "
"defects!",
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name)
);
relation_close(user_view_rel, NoLock);
return;
}
if (!agg->data.materialized_only)
{
final_query = destroy_union_query(final_query);
}
FinalizeQueryInfo fqi;
MatTableColumnInfo mattblinfo;
ObjectAddress mataddress = {
.classId = RelationRelationId,
.objectId = mat_ht->main_table_relid,
};
Oid direct_view_oid = relation_oid(&agg->data.direct_view_schema, &agg->data.direct_view_name);
Relation direct_view_rel = relation_open(direct_view_oid, AccessShareLock);
Query *direct_query = copyObject(get_view_query(direct_view_rel));
RemoveRangeTableEntries(direct_query);
/*
* If there is a join in CAggs then rebuild it definitley,
* because v2.10.0 has created the definition with missing structs.
*
* Removed the check for direct_query->jointree != NULL because
* we don't allow queries without FROM clause in Continuous Aggregate
* definition.
*
* Per coverityscan:
* https://scan4.scan.coverity.com/reports.htm#v54116/p12995/fileInstanceId=131745632&defectInstanceId=14569562&mergedDefectId=384045
*
*/
if (force_rebuild)
{
ListCell *l;
foreach (l, direct_query->jointree->fromlist)
{
Node *jtnode = (Node *) lfirst(l);
if (IsA(jtnode, JoinExpr))
rebuild_cagg_with_joins = true;
}
}
if (!rebuild_cagg_with_joins && finalized)
{
/* There's nothing to fix, so no need to rebuild */
elog(DEBUG1,
"[cagg_rebuild_view_definition] %s.%s does not have JOINS, so no need to rebuild the "
"definition!",
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name)
);
relation_close(user_view_rel, NoLock);
relation_close(direct_view_rel, NoLock);
return;
}
else
elog(DEBUG1,
"[cagg_rebuild_view_definition] %s.%s has been rebuilt!",
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
CAggTimebucketInfo timebucket_exprinfo =
cagg_validate_query(direct_query,
finalized,
NameStr(agg->data.user_view_schema),
NameStr(agg->data.user_view_name));
mattablecolumninfo_init(&mattblinfo, copyObject(direct_query->groupClause));
fqi.finalized = finalized;
finalizequery_init(&fqi, direct_query, &mattblinfo);
/*
* Add any internal columns needed for materialization based
* on the user query's table.
*/
if (!finalized)
mattablecolumninfo_addinternal(&mattblinfo);
Query *view_query = NULL;
if (rebuild_cagg_with_joins)
{
view_query = finalizequery_get_select_query(&fqi,
mattblinfo.matcollist,
&mataddress,
NameStr(mat_ht->fd.table_name));
}
else
view_query =
finalizequery_get_select_query(&fqi, mattblinfo.matcollist, &mataddress, relname);
if (!agg->data.materialized_only)
{
view_query = build_union_query(&timebucket_exprinfo,
mattblinfo.matpartcolno,
view_query,
direct_query,
mat_ht->fd.id);
}
if (list_length(mattblinfo.matcollist) != ts_get_relnatts(mat_ht->main_table_relid))
/*
* There is a mismatch of columns between the current version's finalization view
* building logic and the existing schema of the materialization table. As of version
* 2.7.0 this only happens due to buggy view generation in previous versions. Do not
* rebuild those views since the materialization table can not be queried correctly.
*/
test_failed = true;
/*
* When calling StoreViewQuery the target list names of the query have to
* match the view's tuple descriptor attribute names. But if a column of the continuous
* aggregate has been renamed, the query tree will not have the correct
* names in the target list, which will error out when calling
* StoreViewQuery. For that reason, we fetch the name from the user view
* relation and update the resource name in the query target list to match
* the name in the user view.
*/
TupleDesc desc = RelationGetDescr(user_view_rel);
int i = 0;
forboth (lc1, view_query->targetList, lc2, user_query->targetList)
{
TargetEntry *view_tle, *user_tle;
FormData_pg_attribute *attr = TupleDescAttr(desc, i);
view_tle = lfirst_node(TargetEntry, lc1);
user_tle = lfirst_node(TargetEntry, lc2);
if (view_tle->resjunk && user_tle->resjunk)
break;
else if (view_tle->resjunk || user_tle->resjunk)
{
/*
* This should never happen but if it ever does it's safer to
* error here instead of creating broken view definitions.
*/
test_failed = true;
break;
}
view_tle->resname = user_tle->resname = NameStr(attr->attname);
++i;
}
if (test_failed)
{
ereport(WARNING,
(errmsg("Inconsistent view definitions for continuous aggregate view "
"\"%s.%s\"",
schema,
relname),
errdetail("Continuous aggregate data possibly corrupted."),
errhint("You may need to recreate the continuous aggregate with CREATE "
"MATERIALIZED VIEW.")));
}
else
{
SWITCH_TO_TS_USER(NameStr(agg->data.user_view_schema), uid, saved_uid, sec_ctx);
StoreViewQuery(user_view_oid, view_query, true);
CommandCounterIncrement();
RESTORE_USER(uid, saved_uid, sec_ctx);
}
/*
* Keep locks until end of transaction and do not close the relation
* before the call to StoreViewQuery since it can otherwise release the
* memory for attr->attname, causing a segfault.
*/
relation_close(direct_view_rel, NoLock);
relation_close(user_view_rel, NoLock);
}
Datum
tsl_cagg_try_repair(PG_FUNCTION_ARGS)
{
Oid relid = PG_ARGISNULL(0) ? InvalidOid : PG_GETARG_OID(0);
char relkind = get_rel_relkind(relid);
bool force_rebuild = PG_ARGISNULL(0) ? false : PG_GETARG_BOOL(1);
ContinuousAgg *cagg = NULL;
if (RELKIND_VIEW == relkind)
cagg = ts_continuous_agg_find_by_relid(relid);
if (RELKIND_VIEW != relkind || !cagg)
{
ereport(WARNING,
(errmsg("invalid OID \"%u\" for continuous aggregate view", relid),
errdetail("Check for database corruption.")));
PG_RETURN_VOID();
}
Cache *hcache = ts_hypertable_cache_pin();
Hypertable *mat_ht = ts_hypertable_cache_get_entry_by_id(hcache, cagg->data.mat_hypertable_id);
Assert(mat_ht != NULL);
cagg_rebuild_view_definition(cagg, mat_ht, force_rebuild);
ts_cache_release(hcache);
PG_RETURN_VOID();
}

View File

@ -0,0 +1,19 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#ifndef TIMESCALEDB_TSL_CONTINUOUS_AGGS_CAGG_REPAIR_H
#define TIMESCALEDB_TSL_CONTINUOUS_AGGS_CAGG_REPAIR_H
#include <postgres.h>
#include <commands/view.h>
#include "continuous_aggs/common.h"
#include "continuous_aggs/finalize.h"
#include "continuous_aggs/materialize.h"
#include "ts_catalog/continuous_agg.h"
extern Datum tsl_cagg_try_repair(PG_FUNCTION_ARGS);
#endif

View File

@ -30,6 +30,7 @@
#include "continuous_aggs/options.h"
#include "continuous_aggs/refresh.h"
#include "continuous_aggs/invalidation.h"
#include "continuous_aggs/repair.h"
#include "cross_module_fn.h"
#include "nodes/data_node_dispatch.h"
#include "data_node.h"