mirror of
https://github.com/timescale/timescaledb.git
synced 2025-05-19 04:03:06 +08:00
Separate the memory allocation context for tsl_finalize_agg
Allocate the internal data structures in different memory contexts based on the lifetime i.e. global to query or global to the group used by the finalize_agg query. This helps minimize lookups for global lifetime objects like Oids.
This commit is contained in:
parent
e034978eef
commit
58601cc3a0
@ -76,9 +76,14 @@ typedef struct FACombineFnMeta
|
||||
Oid combinefnoid;
|
||||
Oid deserialfnoid;
|
||||
Oid transtype;
|
||||
Oid recv_fn;
|
||||
Oid typIOParam;
|
||||
FmgrInfo deserialfn;
|
||||
FmgrInfo internal_deserialfn;
|
||||
FmgrInfo combinefn;
|
||||
/* either deserialfn_fcinfo or internal_deserialfn_fcinfo is valid*/
|
||||
FunctionCallInfoData deserialfn_fcinfo;
|
||||
FunctionCallInfoData internal_deserialfn_fcinfo;
|
||||
FunctionCallInfoData combfn_fcinfo;
|
||||
|
||||
} FACombineFnMeta;
|
||||
@ -104,12 +109,17 @@ typedef struct FAPerGroupState
|
||||
bool trans_value_initialized;
|
||||
} FAPerGroupState;
|
||||
|
||||
/* metadata information that is common for the entire query */
|
||||
typedef struct FAPerQueryState
|
||||
{
|
||||
FACombineFnMeta combine_meta;
|
||||
FAFinalFnMeta final_meta;
|
||||
} FAPerQueryState;
|
||||
|
||||
typedef struct FATransitionState
|
||||
{
|
||||
FACombineFnMeta *combine_meta;
|
||||
FAFinalFnMeta *final_meta;
|
||||
FAPerQueryState *per_query_state;
|
||||
FAPerGroupState *per_group_state;
|
||||
|
||||
} FATransitionState;
|
||||
|
||||
static Oid
|
||||
@ -162,27 +172,29 @@ inner_agg_deserialize(FACombineFnMeta *combine_meta, bytea *serialized_partial,
|
||||
}
|
||||
deser_fcinfo->arg[0] = PointerGetDatum(serialized_partial);
|
||||
deser_fcinfo->argnull[0] = serialized_isnull;
|
||||
combine_meta->deserialfn_fcinfo.isnull = false;
|
||||
deserialized = FunctionCallInvoke(deser_fcinfo);
|
||||
*deserialized_isnull = deser_fcinfo->isnull;
|
||||
}
|
||||
else if (!serialized_isnull)
|
||||
{
|
||||
int32 typmod = -1;
|
||||
StringInfo string = makeStringInfo();
|
||||
Oid recv_fn, typIOParam;
|
||||
|
||||
getTypeBinaryInputInfo(combine_meta->transtype, &recv_fn, &typIOParam);
|
||||
|
||||
appendBinaryStringInfo(string,
|
||||
VARDATA_ANY(serialized_partial),
|
||||
VARSIZE_ANY_EXHDR(serialized_partial));
|
||||
/*
|
||||
* Note that we may want to switch this to ReceiveFunctionCall at some
|
||||
* point in the future because OidReceiveFunctionCall puts a lot of
|
||||
* stuff into CurrentMemoryContext that we may eventually want to manage
|
||||
* ourselves.
|
||||
*/
|
||||
deserialized = OidReceiveFunctionCall(recv_fn, string, typIOParam, 0);
|
||||
*deserialized_isnull = false;
|
||||
combine_meta->internal_deserialfn_fcinfo.arg[0] = PointerGetDatum(string);
|
||||
combine_meta->internal_deserialfn_fcinfo.arg[1] =
|
||||
ObjectIdGetDatum(combine_meta->typIOParam);
|
||||
combine_meta->internal_deserialfn_fcinfo.arg[2] = Int32GetDatum(typmod);
|
||||
combine_meta->internal_deserialfn_fcinfo.argnull[0] = false;
|
||||
combine_meta->internal_deserialfn_fcinfo.argnull[1] = false;
|
||||
combine_meta->internal_deserialfn_fcinfo.argnull[2] = false;
|
||||
combine_meta->internal_deserialfn_fcinfo.isnull = false;
|
||||
|
||||
deserialized = FunctionCallInvoke(&combine_meta->internal_deserialfn_fcinfo);
|
||||
*deserialized_isnull = combine_meta->internal_deserialfn_fcinfo.isnull;
|
||||
}
|
||||
PG_RETURN_DATUM(deserialized);
|
||||
}
|
||||
@ -249,21 +261,36 @@ get_input_types(ArrayType *input_types, size_t *number_types)
|
||||
};
|
||||
|
||||
static FATransitionState *
|
||||
fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid collation,
|
||||
AggState *fa_aggstate, ArrayType *input_types)
|
||||
fa_transition_state_init(MemoryContext *fa_context, FAPerQueryState *qstate, AggState *fa_aggstate)
|
||||
{
|
||||
FATransitionState *tstate = NULL;
|
||||
HeapTuple inner_agg_tuple;
|
||||
Form_pg_aggregate inner_agg_form;
|
||||
|
||||
tstate = (FATransitionState *) MemoryContextAlloc(*fa_context, sizeof(*tstate));
|
||||
tstate->combine_meta =
|
||||
(FACombineFnMeta *) MemoryContextAlloc(*fa_context, sizeof(*tstate->combine_meta));
|
||||
tstate->final_meta =
|
||||
(FAFinalFnMeta *) MemoryContextAlloc(*fa_context, sizeof(*tstate->final_meta));
|
||||
tstate->per_query_state = qstate;
|
||||
tstate->per_group_state =
|
||||
(FAPerGroupState *) MemoryContextAlloc(*fa_context, sizeof(*tstate->per_group_state));
|
||||
|
||||
/* Need to init tstate->per_group_state->trans_value */
|
||||
tstate->per_group_state->trans_value_isnull = true;
|
||||
tstate->per_group_state->trans_value_initialized = false;
|
||||
return tstate;
|
||||
}
|
||||
|
||||
static FAPerQueryState *
|
||||
fa_perquery_state_init(FunctionCallInfo fcinfo)
|
||||
{
|
||||
char *inner_agg_input_coll_schema = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2));
|
||||
char *inner_agg_input_coll_name = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3));
|
||||
ArrayType *input_types = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4);
|
||||
Oid inner_agg_fn_oid = aggfnoid_from_aggname(PG_GETARG_TEXT_PP(1));
|
||||
|
||||
Oid collation = collation_oid_from_name(inner_agg_input_coll_schema, inner_agg_input_coll_name);
|
||||
FAPerQueryState *tstate;
|
||||
HeapTuple inner_agg_tuple;
|
||||
Form_pg_aggregate inner_agg_form;
|
||||
MemoryContext qcontext = fcinfo->flinfo->fn_mcxt;
|
||||
MemoryContext oldcontext = MemoryContextSwitchTo(qcontext);
|
||||
AggState *fa_aggstate = (AggState *) fcinfo->context;
|
||||
|
||||
/* look up catalog entry and populate what we need */
|
||||
inner_agg_tuple = SearchSysCache1(AGGFNOID, inner_agg_fn_oid);
|
||||
if (!HeapTupleIsValid(inner_agg_tuple))
|
||||
@ -274,40 +301,59 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co
|
||||
if (inner_agg_form->aggnumdirectargs != 0)
|
||||
elog(ERROR,
|
||||
"function calls with direct args are not supported by TimescaleDB finalize agg");
|
||||
tstate = (FAPerQueryState *) MemoryContextAlloc(qcontext, sizeof(FAPerQueryState));
|
||||
|
||||
tstate->final_meta->finalfnoid = inner_agg_form->aggfinalfn;
|
||||
tstate->combine_meta->combinefnoid = inner_agg_form->aggcombinefn;
|
||||
tstate->combine_meta->deserialfnoid = inner_agg_form->aggdeserialfn;
|
||||
tstate->combine_meta->transtype = inner_agg_form->aggtranstype;
|
||||
tstate->final_meta.finalfnoid = inner_agg_form->aggfinalfn;
|
||||
tstate->combine_meta.combinefnoid = inner_agg_form->aggcombinefn;
|
||||
tstate->combine_meta.deserialfnoid = inner_agg_form->aggdeserialfn;
|
||||
tstate->combine_meta.transtype = inner_agg_form->aggtranstype;
|
||||
ReleaseSysCache(inner_agg_tuple);
|
||||
|
||||
/* initialize combine specific state, both the deserialize function and combine function */
|
||||
if (!OidIsValid(tstate->combine_meta->combinefnoid))
|
||||
if (!OidIsValid(tstate->combine_meta.combinefnoid))
|
||||
elog(ERROR,
|
||||
"no valid combine function for the aggregate specified in Timescale finalize call");
|
||||
|
||||
fmgr_info(tstate->combine_meta->combinefnoid, &tstate->combine_meta->combinefn);
|
||||
InitFunctionCallInfoData(tstate->combine_meta->combfn_fcinfo,
|
||||
&tstate->combine_meta->combinefn,
|
||||
fmgr_info_cxt(tstate->combine_meta.combinefnoid, &tstate->combine_meta.combinefn, qcontext);
|
||||
InitFunctionCallInfoData(tstate->combine_meta.combfn_fcinfo,
|
||||
&tstate->combine_meta.combinefn,
|
||||
2, /* combine fn always has two args */
|
||||
collation,
|
||||
(void *) fa_aggstate,
|
||||
NULL);
|
||||
|
||||
if (OidIsValid(tstate->combine_meta->deserialfnoid)) /* deserial fn not necessary, no need to
|
||||
if (OidIsValid(tstate->combine_meta.deserialfnoid)) /* deserial fn not necessary, no need to
|
||||
throw errors if not found */
|
||||
{
|
||||
fmgr_info(tstate->combine_meta->deserialfnoid, &tstate->combine_meta->deserialfn);
|
||||
InitFunctionCallInfoData(tstate->combine_meta->deserialfn_fcinfo,
|
||||
&tstate->combine_meta->deserialfn,
|
||||
fmgr_info_cxt(tstate->combine_meta.deserialfnoid,
|
||||
&tstate->combine_meta.deserialfn,
|
||||
qcontext);
|
||||
InitFunctionCallInfoData(tstate->combine_meta.deserialfn_fcinfo,
|
||||
&tstate->combine_meta.deserialfn,
|
||||
1, /* deserialize always has 1 arg */
|
||||
collation,
|
||||
(void *) fa_aggstate,
|
||||
NULL);
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
/* save information for internal deserialization. caching instead
|
||||
of calling ReceiveFunctionCall */
|
||||
getTypeBinaryInputInfo(tstate->combine_meta.transtype,
|
||||
&tstate->combine_meta.recv_fn,
|
||||
&tstate->combine_meta.typIOParam);
|
||||
fmgr_info_cxt(tstate->combine_meta.recv_fn,
|
||||
&tstate->combine_meta.internal_deserialfn,
|
||||
qcontext);
|
||||
InitFunctionCallInfoData(tstate->combine_meta.internal_deserialfn_fcinfo,
|
||||
&tstate->combine_meta.internal_deserialfn,
|
||||
3,
|
||||
InvalidOid,
|
||||
NULL,
|
||||
NULL);
|
||||
}
|
||||
/* initialize finalfn specific state */
|
||||
if (OidIsValid(tstate->final_meta->finalfnoid))
|
||||
if (OidIsValid(tstate->final_meta.finalfnoid))
|
||||
{
|
||||
int num_args = 1;
|
||||
Oid *types = NULL;
|
||||
@ -317,13 +363,13 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co
|
||||
types = get_input_types(input_types, &number_types);
|
||||
num_args += number_types;
|
||||
}
|
||||
if (num_args != get_func_nargs(tstate->final_meta->finalfnoid))
|
||||
if (num_args != get_func_nargs(tstate->final_meta.finalfnoid))
|
||||
elog(ERROR, "invalid number of input types");
|
||||
|
||||
fmgr_info(tstate->final_meta->finalfnoid, &tstate->final_meta->finalfn);
|
||||
fmgr_info_cxt(tstate->final_meta.finalfnoid, &tstate->final_meta.finalfn, qcontext);
|
||||
/* pass the aggstate information from our current call context */
|
||||
InitFunctionCallInfoData(tstate->final_meta->finalfn_fcinfo,
|
||||
&tstate->final_meta->finalfn,
|
||||
InitFunctionCallInfoData(tstate->final_meta.finalfn_fcinfo,
|
||||
&tstate->final_meta.finalfn,
|
||||
num_args,
|
||||
collation,
|
||||
(void *) fa_aggstate,
|
||||
@ -337,20 +383,20 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co
|
||||
inner_agg_form->aggtranstype,
|
||||
types[number_types - 1],
|
||||
collation,
|
||||
tstate->final_meta->finalfnoid,
|
||||
tstate->final_meta.finalfnoid,
|
||||
&expr);
|
||||
fmgr_info_set_expr((Node *) expr, &tstate->final_meta->finalfn);
|
||||
fmgr_info_set_expr((Node *) expr, &tstate->final_meta.finalfn);
|
||||
for (i = 1; i < num_args; i++)
|
||||
{
|
||||
tstate->final_meta->finalfn_fcinfo.arg[i] = (Datum) 0;
|
||||
tstate->final_meta->finalfn_fcinfo.argnull[i] = true;
|
||||
tstate->final_meta.finalfn_fcinfo.arg[i] = (Datum) 0;
|
||||
tstate->final_meta.finalfn_fcinfo.argnull[i] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
fcinfo->flinfo->fn_extra = (void *) tstate;
|
||||
|
||||
MemoryContextSwitchTo(oldcontext);
|
||||
|
||||
/* Need to init tstate->per_group_state->trans_value */
|
||||
tstate->per_group_state->trans_value_isnull = true;
|
||||
tstate->per_group_state->trans_value_initialized = false;
|
||||
return tstate;
|
||||
}
|
||||
|
||||
@ -366,6 +412,7 @@ group_state_advance(FAPerGroupState *per_group_state, FACombineFnMeta *combine_m
|
||||
combine_meta->combfn_fcinfo.argnull[0] = per_group_state->trans_value_isnull;
|
||||
combine_meta->combfn_fcinfo.arg[1] = newval;
|
||||
combine_meta->combfn_fcinfo.argnull[1] = newval_isnull;
|
||||
combine_meta->combfn_fcinfo.isnull = false;
|
||||
per_group_state->trans_value = FunctionCallInvoke(&combine_meta->combfn_fcinfo);
|
||||
per_group_state->trans_value_isnull = combine_meta->combfn_fcinfo.isnull;
|
||||
};
|
||||
@ -389,48 +436,33 @@ group_state_advance(FAPerGroupState *per_group_state, FACombineFnMeta *combine_m
|
||||
Datum
|
||||
tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid inner_agg_input_collid;
|
||||
FATransitionState *tstate = PG_ARGISNULL(0) ? NULL : (FATransitionState *) PG_GETARG_POINTER(0);
|
||||
bytea *inner_agg_serialized_state = PG_ARGISNULL(5) ? NULL : PG_GETARG_BYTEA_P(5);
|
||||
bool inner_agg_serialized_state_isnull = PG_ARGISNULL(5) ? true : false;
|
||||
Datum inner_agg_deserialized_state;
|
||||
AggState *fa_aggstate;
|
||||
MemoryContext fa_context, old_context;
|
||||
|
||||
/*
|
||||
* we check for AggState explicitly as well because AggCheckCallContext
|
||||
* will return true for window function contexts
|
||||
*/
|
||||
if (!AggCheckCallContext(fcinfo, &fa_context) || !IsA(fcinfo->context, AggState))
|
||||
{
|
||||
/* cannot be called directly because of internal-type argument */
|
||||
elog(ERROR, "finalize_agg_sfunc called in non-aggregate context");
|
||||
}
|
||||
fa_aggstate = castNode(AggState, fcinfo->context);
|
||||
|
||||
if (PG_ARGISNULL(1))
|
||||
elog(ERROR, "finalize_agg_sfunc called with NULL aggfn");
|
||||
old_context = MemoryContextSwitchTo(fa_context);
|
||||
|
||||
if (tstate == NULL)
|
||||
{
|
||||
char *inner_agg_input_coll_schema = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2));
|
||||
char *inner_agg_input_coll_name = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3));
|
||||
ArrayType *input_types = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4);
|
||||
Oid inner_agg_fn_oid = aggfnoid_from_aggname(PG_GETARG_TEXT_PP(1));
|
||||
|
||||
inner_agg_input_collid =
|
||||
collation_oid_from_name(inner_agg_input_coll_schema, inner_agg_input_coll_name);
|
||||
|
||||
/* okay, now we can initialize our transition state */
|
||||
tstate = fa_transition_state_init(&fa_context,
|
||||
inner_agg_fn_oid,
|
||||
inner_agg_input_collid,
|
||||
fa_aggstate,
|
||||
input_types);
|
||||
FAPerQueryState *qstate = (FAPerQueryState *) fcinfo->flinfo->fn_extra;
|
||||
if (qstate == NULL)
|
||||
{
|
||||
qstate = fa_perquery_state_init(fcinfo);
|
||||
Assert(fcinfo->flinfo->fn_extra != NULL);
|
||||
}
|
||||
tstate = fa_transition_state_init(&fa_context, qstate, (AggState *) fcinfo->context);
|
||||
/* initial trans_value = the partial state of the inner agg from first invocation */
|
||||
tstate->per_group_state->trans_value =
|
||||
inner_agg_deserialize(tstate->combine_meta,
|
||||
inner_agg_deserialize(&tstate->per_query_state->combine_meta,
|
||||
inner_agg_serialized_state,
|
||||
inner_agg_serialized_state_isnull,
|
||||
&tstate->per_group_state->trans_value_isnull);
|
||||
@ -441,7 +473,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS)
|
||||
{
|
||||
bool deser_isnull;
|
||||
bool call_combine;
|
||||
inner_agg_deserialized_state = inner_agg_deserialize(tstate->combine_meta,
|
||||
inner_agg_deserialized_state = inner_agg_deserialize(&tstate->per_query_state->combine_meta,
|
||||
inner_agg_serialized_state,
|
||||
inner_agg_serialized_state_isnull,
|
||||
&deser_isnull);
|
||||
@ -452,7 +484,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS)
|
||||
* need to try that again if so.
|
||||
*/
|
||||
call_combine = true;
|
||||
if (tstate->combine_meta->combinefn.fn_strict)
|
||||
if (tstate->per_query_state->combine_meta.combinefn.fn_strict)
|
||||
{
|
||||
if (tstate->per_group_state->trans_value_initialized == false && deser_isnull == false)
|
||||
{
|
||||
@ -467,7 +499,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS)
|
||||
}
|
||||
if (call_combine)
|
||||
group_state_advance(tstate->per_group_state,
|
||||
tstate->combine_meta,
|
||||
&tstate->per_query_state->combine_meta,
|
||||
inner_agg_deserialized_state,
|
||||
deser_isnull);
|
||||
}
|
||||
@ -491,21 +523,24 @@ tsl_finalize_agg_ffunc(PG_FUNCTION_ARGS)
|
||||
elog(ERROR, "finalize_agg_ffunc called in non-aggregate context");
|
||||
}
|
||||
old_context = MemoryContextSwitchTo(fa_context);
|
||||
if (OidIsValid(tstate->final_meta->finalfnoid))
|
||||
if (OidIsValid(tstate->per_query_state->final_meta.finalfnoid))
|
||||
{
|
||||
/* don't execute if strict and the trans value is NULL or there are extra args (all extra
|
||||
* args are always NULL) */
|
||||
if (!(tstate->final_meta->finalfn.fn_strict &&
|
||||
if (!(tstate->per_query_state->final_meta.finalfn.fn_strict &&
|
||||
tstate->per_group_state->trans_value_isnull) &&
|
||||
!(tstate->final_meta->finalfn.fn_strict &&
|
||||
tstate->final_meta->finalfn_fcinfo.nargs > 1))
|
||||
!(tstate->per_query_state->final_meta.finalfn.fn_strict &&
|
||||
tstate->per_query_state->final_meta.finalfn_fcinfo.nargs > 1))
|
||||
{
|
||||
tstate->final_meta->finalfn_fcinfo.arg[0] = tstate->per_group_state->trans_value;
|
||||
tstate->final_meta->finalfn_fcinfo.argnull[0] =
|
||||
tstate->per_query_state->final_meta.finalfn_fcinfo.arg[0] =
|
||||
tstate->per_group_state->trans_value;
|
||||
tstate->per_query_state->final_meta.finalfn_fcinfo.argnull[0] =
|
||||
tstate->per_group_state->trans_value_isnull;
|
||||
tstate->per_query_state->final_meta.finalfn_fcinfo.isnull = false;
|
||||
tstate->per_group_state->trans_value =
|
||||
FunctionCallInvoke(&tstate->final_meta->finalfn_fcinfo);
|
||||
tstate->per_group_state->trans_value_isnull = tstate->final_meta->finalfn_fcinfo.isnull;
|
||||
FunctionCallInvoke(&tstate->per_query_state->final_meta.finalfn_fcinfo);
|
||||
tstate->per_group_state->trans_value_isnull =
|
||||
tstate->per_query_state->final_meta.finalfn_fcinfo.isnull;
|
||||
}
|
||||
}
|
||||
MemoryContextSwitchTo(old_context);
|
||||
|
Loading…
x
Reference in New Issue
Block a user