diff --git a/tsl/src/partialize_finalize.c b/tsl/src/partialize_finalize.c index fadf93442..04f2f532a 100644 --- a/tsl/src/partialize_finalize.c +++ b/tsl/src/partialize_finalize.c @@ -76,9 +76,14 @@ typedef struct FACombineFnMeta Oid combinefnoid; Oid deserialfnoid; Oid transtype; + Oid recv_fn; + Oid typIOParam; FmgrInfo deserialfn; + FmgrInfo internal_deserialfn; FmgrInfo combinefn; + /* either deserialfn_fcinfo or internal_deserialfn_fcinfo is valid*/ FunctionCallInfoData deserialfn_fcinfo; + FunctionCallInfoData internal_deserialfn_fcinfo; FunctionCallInfoData combfn_fcinfo; } FACombineFnMeta; @@ -104,12 +109,17 @@ typedef struct FAPerGroupState bool trans_value_initialized; } FAPerGroupState; +/* metadata information that is common for the entire query */ +typedef struct FAPerQueryState +{ + FACombineFnMeta combine_meta; + FAFinalFnMeta final_meta; +} FAPerQueryState; + typedef struct FATransitionState { - FACombineFnMeta *combine_meta; - FAFinalFnMeta *final_meta; + FAPerQueryState *per_query_state; FAPerGroupState *per_group_state; - } FATransitionState; static Oid @@ -162,27 +172,29 @@ inner_agg_deserialize(FACombineFnMeta *combine_meta, bytea *serialized_partial, } deser_fcinfo->arg[0] = PointerGetDatum(serialized_partial); deser_fcinfo->argnull[0] = serialized_isnull; + combine_meta->deserialfn_fcinfo.isnull = false; deserialized = FunctionCallInvoke(deser_fcinfo); *deserialized_isnull = deser_fcinfo->isnull; } else if (!serialized_isnull) { + int32 typmod = -1; StringInfo string = makeStringInfo(); - Oid recv_fn, typIOParam; - - getTypeBinaryInputInfo(combine_meta->transtype, &recv_fn, &typIOParam); appendBinaryStringInfo(string, VARDATA_ANY(serialized_partial), VARSIZE_ANY_EXHDR(serialized_partial)); - /* - * Note that we may want to switch this to ReceiveFunctionCall at some - * point in the future because OidReceiveFunctionCall puts a lot of - * stuff into CurrentMemoryContext that we may eventually want to manage - * ourselves. - */ - deserialized = OidReceiveFunctionCall(recv_fn, string, typIOParam, 0); - *deserialized_isnull = false; + combine_meta->internal_deserialfn_fcinfo.arg[0] = PointerGetDatum(string); + combine_meta->internal_deserialfn_fcinfo.arg[1] = + ObjectIdGetDatum(combine_meta->typIOParam); + combine_meta->internal_deserialfn_fcinfo.arg[2] = Int32GetDatum(typmod); + combine_meta->internal_deserialfn_fcinfo.argnull[0] = false; + combine_meta->internal_deserialfn_fcinfo.argnull[1] = false; + combine_meta->internal_deserialfn_fcinfo.argnull[2] = false; + combine_meta->internal_deserialfn_fcinfo.isnull = false; + + deserialized = FunctionCallInvoke(&combine_meta->internal_deserialfn_fcinfo); + *deserialized_isnull = combine_meta->internal_deserialfn_fcinfo.isnull; } PG_RETURN_DATUM(deserialized); } @@ -249,21 +261,36 @@ get_input_types(ArrayType *input_types, size_t *number_types) }; static FATransitionState * -fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid collation, - AggState *fa_aggstate, ArrayType *input_types) +fa_transition_state_init(MemoryContext *fa_context, FAPerQueryState *qstate, AggState *fa_aggstate) { FATransitionState *tstate = NULL; - HeapTuple inner_agg_tuple; - Form_pg_aggregate inner_agg_form; - tstate = (FATransitionState *) MemoryContextAlloc(*fa_context, sizeof(*tstate)); - tstate->combine_meta = - (FACombineFnMeta *) MemoryContextAlloc(*fa_context, sizeof(*tstate->combine_meta)); - tstate->final_meta = - (FAFinalFnMeta *) MemoryContextAlloc(*fa_context, sizeof(*tstate->final_meta)); + tstate->per_query_state = qstate; tstate->per_group_state = (FAPerGroupState *) MemoryContextAlloc(*fa_context, sizeof(*tstate->per_group_state)); + /* Need to init tstate->per_group_state->trans_value */ + tstate->per_group_state->trans_value_isnull = true; + tstate->per_group_state->trans_value_initialized = false; + return tstate; +} + +static FAPerQueryState * +fa_perquery_state_init(FunctionCallInfo fcinfo) +{ + char *inner_agg_input_coll_schema = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); + char *inner_agg_input_coll_name = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3)); + ArrayType *input_types = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4); + Oid inner_agg_fn_oid = aggfnoid_from_aggname(PG_GETARG_TEXT_PP(1)); + + Oid collation = collation_oid_from_name(inner_agg_input_coll_schema, inner_agg_input_coll_name); + FAPerQueryState *tstate; + HeapTuple inner_agg_tuple; + Form_pg_aggregate inner_agg_form; + MemoryContext qcontext = fcinfo->flinfo->fn_mcxt; + MemoryContext oldcontext = MemoryContextSwitchTo(qcontext); + AggState *fa_aggstate = (AggState *) fcinfo->context; + /* look up catalog entry and populate what we need */ inner_agg_tuple = SearchSysCache1(AGGFNOID, inner_agg_fn_oid); if (!HeapTupleIsValid(inner_agg_tuple)) @@ -274,40 +301,59 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co if (inner_agg_form->aggnumdirectargs != 0) elog(ERROR, "function calls with direct args are not supported by TimescaleDB finalize agg"); + tstate = (FAPerQueryState *) MemoryContextAlloc(qcontext, sizeof(FAPerQueryState)); - tstate->final_meta->finalfnoid = inner_agg_form->aggfinalfn; - tstate->combine_meta->combinefnoid = inner_agg_form->aggcombinefn; - tstate->combine_meta->deserialfnoid = inner_agg_form->aggdeserialfn; - tstate->combine_meta->transtype = inner_agg_form->aggtranstype; + tstate->final_meta.finalfnoid = inner_agg_form->aggfinalfn; + tstate->combine_meta.combinefnoid = inner_agg_form->aggcombinefn; + tstate->combine_meta.deserialfnoid = inner_agg_form->aggdeserialfn; + tstate->combine_meta.transtype = inner_agg_form->aggtranstype; ReleaseSysCache(inner_agg_tuple); /* initialize combine specific state, both the deserialize function and combine function */ - if (!OidIsValid(tstate->combine_meta->combinefnoid)) + if (!OidIsValid(tstate->combine_meta.combinefnoid)) elog(ERROR, "no valid combine function for the aggregate specified in Timescale finalize call"); - fmgr_info(tstate->combine_meta->combinefnoid, &tstate->combine_meta->combinefn); - InitFunctionCallInfoData(tstate->combine_meta->combfn_fcinfo, - &tstate->combine_meta->combinefn, + fmgr_info_cxt(tstate->combine_meta.combinefnoid, &tstate->combine_meta.combinefn, qcontext); + InitFunctionCallInfoData(tstate->combine_meta.combfn_fcinfo, + &tstate->combine_meta.combinefn, 2, /* combine fn always has two args */ collation, (void *) fa_aggstate, NULL); - if (OidIsValid(tstate->combine_meta->deserialfnoid)) /* deserial fn not necessary, no need to + if (OidIsValid(tstate->combine_meta.deserialfnoid)) /* deserial fn not necessary, no need to throw errors if not found */ { - fmgr_info(tstate->combine_meta->deserialfnoid, &tstate->combine_meta->deserialfn); - InitFunctionCallInfoData(tstate->combine_meta->deserialfn_fcinfo, - &tstate->combine_meta->deserialfn, + fmgr_info_cxt(tstate->combine_meta.deserialfnoid, + &tstate->combine_meta.deserialfn, + qcontext); + InitFunctionCallInfoData(tstate->combine_meta.deserialfn_fcinfo, + &tstate->combine_meta.deserialfn, 1, /* deserialize always has 1 arg */ collation, (void *) fa_aggstate, NULL); } - + else + { + /* save information for internal deserialization. caching instead + of calling ReceiveFunctionCall */ + getTypeBinaryInputInfo(tstate->combine_meta.transtype, + &tstate->combine_meta.recv_fn, + &tstate->combine_meta.typIOParam); + fmgr_info_cxt(tstate->combine_meta.recv_fn, + &tstate->combine_meta.internal_deserialfn, + qcontext); + InitFunctionCallInfoData(tstate->combine_meta.internal_deserialfn_fcinfo, + &tstate->combine_meta.internal_deserialfn, + 3, + InvalidOid, + NULL, + NULL); + } /* initialize finalfn specific state */ - if (OidIsValid(tstate->final_meta->finalfnoid)) + if (OidIsValid(tstate->final_meta.finalfnoid)) { int num_args = 1; Oid *types = NULL; @@ -317,13 +363,13 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co types = get_input_types(input_types, &number_types); num_args += number_types; } - if (num_args != get_func_nargs(tstate->final_meta->finalfnoid)) + if (num_args != get_func_nargs(tstate->final_meta.finalfnoid)) elog(ERROR, "invalid number of input types"); - fmgr_info(tstate->final_meta->finalfnoid, &tstate->final_meta->finalfn); + fmgr_info_cxt(tstate->final_meta.finalfnoid, &tstate->final_meta.finalfn, qcontext); /* pass the aggstate information from our current call context */ - InitFunctionCallInfoData(tstate->final_meta->finalfn_fcinfo, - &tstate->final_meta->finalfn, + InitFunctionCallInfoData(tstate->final_meta.finalfn_fcinfo, + &tstate->final_meta.finalfn, num_args, collation, (void *) fa_aggstate, @@ -337,20 +383,20 @@ fa_transition_state_init(MemoryContext *fa_context, Oid inner_agg_fn_oid, Oid co inner_agg_form->aggtranstype, types[number_types - 1], collation, - tstate->final_meta->finalfnoid, + tstate->final_meta.finalfnoid, &expr); - fmgr_info_set_expr((Node *) expr, &tstate->final_meta->finalfn); + fmgr_info_set_expr((Node *) expr, &tstate->final_meta.finalfn); for (i = 1; i < num_args; i++) { - tstate->final_meta->finalfn_fcinfo.arg[i] = (Datum) 0; - tstate->final_meta->finalfn_fcinfo.argnull[i] = true; + tstate->final_meta.finalfn_fcinfo.arg[i] = (Datum) 0; + tstate->final_meta.finalfn_fcinfo.argnull[i] = true; } } } + fcinfo->flinfo->fn_extra = (void *) tstate; + + MemoryContextSwitchTo(oldcontext); - /* Need to init tstate->per_group_state->trans_value */ - tstate->per_group_state->trans_value_isnull = true; - tstate->per_group_state->trans_value_initialized = false; return tstate; } @@ -366,6 +412,7 @@ group_state_advance(FAPerGroupState *per_group_state, FACombineFnMeta *combine_m combine_meta->combfn_fcinfo.argnull[0] = per_group_state->trans_value_isnull; combine_meta->combfn_fcinfo.arg[1] = newval; combine_meta->combfn_fcinfo.argnull[1] = newval_isnull; + combine_meta->combfn_fcinfo.isnull = false; per_group_state->trans_value = FunctionCallInvoke(&combine_meta->combfn_fcinfo); per_group_state->trans_value_isnull = combine_meta->combfn_fcinfo.isnull; }; @@ -389,48 +436,33 @@ group_state_advance(FAPerGroupState *per_group_state, FACombineFnMeta *combine_m Datum tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS) { - Oid inner_agg_input_collid; FATransitionState *tstate = PG_ARGISNULL(0) ? NULL : (FATransitionState *) PG_GETARG_POINTER(0); bytea *inner_agg_serialized_state = PG_ARGISNULL(5) ? NULL : PG_GETARG_BYTEA_P(5); bool inner_agg_serialized_state_isnull = PG_ARGISNULL(5) ? true : false; Datum inner_agg_deserialized_state; - AggState *fa_aggstate; MemoryContext fa_context, old_context; - /* - * we check for AggState explicitly as well because AggCheckCallContext - * will return true for window function contexts - */ if (!AggCheckCallContext(fcinfo, &fa_context) || !IsA(fcinfo->context, AggState)) { /* cannot be called directly because of internal-type argument */ elog(ERROR, "finalize_agg_sfunc called in non-aggregate context"); } - fa_aggstate = castNode(AggState, fcinfo->context); - if (PG_ARGISNULL(1)) elog(ERROR, "finalize_agg_sfunc called with NULL aggfn"); old_context = MemoryContextSwitchTo(fa_context); if (tstate == NULL) { - char *inner_agg_input_coll_schema = PG_ARGISNULL(2) ? NULL : NameStr(*PG_GETARG_NAME(2)); - char *inner_agg_input_coll_name = PG_ARGISNULL(3) ? NULL : NameStr(*PG_GETARG_NAME(3)); - ArrayType *input_types = PG_ARGISNULL(4) ? NULL : PG_GETARG_ARRAYTYPE_P(4); - Oid inner_agg_fn_oid = aggfnoid_from_aggname(PG_GETARG_TEXT_PP(1)); - - inner_agg_input_collid = - collation_oid_from_name(inner_agg_input_coll_schema, inner_agg_input_coll_name); - - /* okay, now we can initialize our transition state */ - tstate = fa_transition_state_init(&fa_context, - inner_agg_fn_oid, - inner_agg_input_collid, - fa_aggstate, - input_types); + FAPerQueryState *qstate = (FAPerQueryState *) fcinfo->flinfo->fn_extra; + if (qstate == NULL) + { + qstate = fa_perquery_state_init(fcinfo); + Assert(fcinfo->flinfo->fn_extra != NULL); + } + tstate = fa_transition_state_init(&fa_context, qstate, (AggState *) fcinfo->context); /* initial trans_value = the partial state of the inner agg from first invocation */ tstate->per_group_state->trans_value = - inner_agg_deserialize(tstate->combine_meta, + inner_agg_deserialize(&tstate->per_query_state->combine_meta, inner_agg_serialized_state, inner_agg_serialized_state_isnull, &tstate->per_group_state->trans_value_isnull); @@ -441,7 +473,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS) { bool deser_isnull; bool call_combine; - inner_agg_deserialized_state = inner_agg_deserialize(tstate->combine_meta, + inner_agg_deserialized_state = inner_agg_deserialize(&tstate->per_query_state->combine_meta, inner_agg_serialized_state, inner_agg_serialized_state_isnull, &deser_isnull); @@ -452,7 +484,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS) * need to try that again if so. */ call_combine = true; - if (tstate->combine_meta->combinefn.fn_strict) + if (tstate->per_query_state->combine_meta.combinefn.fn_strict) { if (tstate->per_group_state->trans_value_initialized == false && deser_isnull == false) { @@ -467,7 +499,7 @@ tsl_finalize_agg_sfunc(PG_FUNCTION_ARGS) } if (call_combine) group_state_advance(tstate->per_group_state, - tstate->combine_meta, + &tstate->per_query_state->combine_meta, inner_agg_deserialized_state, deser_isnull); } @@ -491,21 +523,24 @@ tsl_finalize_agg_ffunc(PG_FUNCTION_ARGS) elog(ERROR, "finalize_agg_ffunc called in non-aggregate context"); } old_context = MemoryContextSwitchTo(fa_context); - if (OidIsValid(tstate->final_meta->finalfnoid)) + if (OidIsValid(tstate->per_query_state->final_meta.finalfnoid)) { /* don't execute if strict and the trans value is NULL or there are extra args (all extra * args are always NULL) */ - if (!(tstate->final_meta->finalfn.fn_strict && + if (!(tstate->per_query_state->final_meta.finalfn.fn_strict && tstate->per_group_state->trans_value_isnull) && - !(tstate->final_meta->finalfn.fn_strict && - tstate->final_meta->finalfn_fcinfo.nargs > 1)) + !(tstate->per_query_state->final_meta.finalfn.fn_strict && + tstate->per_query_state->final_meta.finalfn_fcinfo.nargs > 1)) { - tstate->final_meta->finalfn_fcinfo.arg[0] = tstate->per_group_state->trans_value; - tstate->final_meta->finalfn_fcinfo.argnull[0] = + tstate->per_query_state->final_meta.finalfn_fcinfo.arg[0] = + tstate->per_group_state->trans_value; + tstate->per_query_state->final_meta.finalfn_fcinfo.argnull[0] = tstate->per_group_state->trans_value_isnull; + tstate->per_query_state->final_meta.finalfn_fcinfo.isnull = false; tstate->per_group_state->trans_value = - FunctionCallInvoke(&tstate->final_meta->finalfn_fcinfo); - tstate->per_group_state->trans_value_isnull = tstate->final_meta->finalfn_fcinfo.isnull; + FunctionCallInvoke(&tstate->per_query_state->final_meta.finalfn_fcinfo); + tstate->per_group_state->trans_value_isnull = + tstate->per_query_state->final_meta.finalfn_fcinfo.isnull; } } MemoryContextSwitchTo(old_context);