diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9c3959..02b6484 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -191,7 +191,37 @@ static bool _equalAggref(const Aggref *a, const Aggref *b) { COMPARE_SCALAR_FIELD(aggfnoid); - COMPARE_SCALAR_FIELD(aggtype); + + /* + * XXX Temporary fix, until we find a better one. + * To avoid the failure in setting the upper references in upper plans of + * partial aggregate, with its modified targetlist aggregate references, + * As the aggtype of aggref is changed while forming the targetlist + * of partial aggregate for worker process. + */ + if (a->aggtype != b->aggtype) + { + /* + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(a->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + a->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + if (a->aggtype != aggform->aggtranstype) + { + ReleaseSysCache(aggTuple); + return false; + } + + ReleaseSysCache(aggTuple); + */ + } + COMPARE_SCALAR_FIELD(aggcollid); COMPARE_SCALAR_FIELD(inputcollid); COMPARE_NODE_FIELD(aggdirectargs); diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index a08c248..9f1416c 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -1968,7 +1968,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel) */ cheapest_partial_path = linitial(rel->partial_pathlist); simple_gather_path = (Path *) - create_gather_path(root, rel, cheapest_partial_path, NULL); + create_gather_path(root, rel, cheapest_partial_path, NULL, NULL); add_path(rel, simple_gather_path); } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index ffff3c0..cfd0c35 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -350,16 +350,21 @@ cost_samplescan(Path *path, PlannerInfo *root, * * 'rel' is the relation to be operated upon * 'param_info' is the ParamPathInfo if this is a parameterized path, else NULL + * 'rows' may be used to point to a row estimate, this may be used when a rel + * is unavailable to retrieve row estimates from. */ void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *rel, ParamPathInfo *param_info) + RelOptInfo *rel, ParamPathInfo *param_info, + double *rows) { Cost startup_cost = 0; Cost run_cost = 0; /* Mark the path with the correct row estimate */ - if (param_info) + if (rows) + path->path.rows = *rows; + else if (param_info) path->path.rows = param_info->ppi_rows; else path->path.rows = rel->rows; diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 12069ae..aaf33d2 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1532,8 +1532,8 @@ create_agg_plan(PlannerInfo *root, AggPath *best_path) plan = make_agg(tlist, quals, best_path->aggstrategy, - false, - true, + best_path->combineStates, + best_path->finalizeAggs, list_length(best_path->groupClause), extract_grouping_cols(best_path->groupClause, subplan->targetlist), diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 97cd1f2..efefa1f 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -23,6 +23,7 @@ #include "access/sysattr.h" #include "access/xact.h" #include "catalog/pg_constraint_fn.h" +#include "catalog/pg_aggregate.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -81,6 +82,13 @@ typedef struct List *groupClause; /* overrides parse->groupClause */ } standard_qp_extra; +typedef struct +{ + AttrNumber resno; + List *targetlist; +} AddQualInTListExprContext; + + /* Local functions */ static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind); static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); @@ -107,6 +115,19 @@ static RelOptInfo *create_grouping_paths(PlannerInfo *root, AttrNumber *groupColIdx, List *rollup_lists, List *rollup_groupclauses); +static void create_parallelagg_path(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + List *tlist, + PathTarget *target, + AggStrategy aggstrategy, + double dNumGroups, + AggClauseCosts *agg_costs); +static void create_parallelgroup_path(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + PathTarget *target, + double dNumGroups); static RelOptInfo *create_window_paths(PlannerInfo *root, RelOptInfo *input_rel, List *base_tlist, @@ -134,6 +155,10 @@ static List *make_windowInputTargetList(PlannerInfo *root, List *tlist, List *activeWindows); static List *make_pathkeys_for_window(PlannerInfo *root, WindowClause *wc, List *tlist); +static List *make_partial_agg_tlist(List *tlist,List *groupClause); +static List* add_qual_in_tlist(List *targetlist, Node *qual); +static bool add_qual_in_tlist_walker (Node *node, + AddQualInTListExprContext *context); /***************************************************************************** @@ -1687,6 +1712,20 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + /* Likewise for any partial paths. */ + foreach(lc, current_rel->partial_pathlist) + { + Path *subpath = (Path *) lfirst(lc); + Path *path; + + Assert(subpath->param_info == NULL); + path = apply_projection_to_path(root, current_rel, + subpath, sub_target); + if (path != subpath) + current_rel->partial_pathlist = + lappend(current_rel->partial_pathlist, path); + } + /* * Determine the tlist we need grouping paths to emit. While we could * skip this if we're not going to call create_grouping_paths, it's @@ -1701,6 +1740,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, else grouping_tlist = tlist; + /* * If we have grouping and/or aggregation, consider ways to implement * that. We build a new upperrel representing the output of this @@ -3101,7 +3141,9 @@ create_grouping_paths(PlannerInfo *root, AggClauseCosts agg_costs; double dNumGroups; bool allow_hash; + bool can_parallel; ListCell *lc; + List *tlist = make_tlist_from_pathtarget(target); /* For now, do all work in the (GROUP_AGG, NULL) upperrel */ grouped_rel = fetch_upper_rel(root, UPPERREL_GROUP_AGG, NULL); @@ -3171,6 +3213,48 @@ create_grouping_paths(PlannerInfo *root, return grouped_rel; } + /* + * Here we consider performing aggregation in parallel using multiple + * worker processes. We can permit this when there's at least one + * partial_path in input_rel, but not if the query has grouping sets, + * (although this likely just requires a bit more thought). We also + * disallow parallel mode when the target list contains any volatile + * functions, as this would cause a multiple evaluation hazard. + * + * Parallel grouping and aggregation occurs in two phases. In the first + * phase, which occurs in parallel, groups are created for each input tuple + * of the partial path, each parallel worker's groups are then gathered + * with a Gather node and serialised into the master backend process, which + * performs the 2nd and final grouping or aggregation phase. This is + * supported for both Hash Aggregate and Group Aggregate, although + * currently we only consider paths to generate plans which either use hash + * aggregate for both phases or group aggregate for both phases, we never + * mix the two to try hashing for the 1st phase then group agg on the 2nd + * phase or vice versa. Perhaps this would be a worthwhile future addition, + * but for now, let's keep it simple. + */ + can_parallel = false; + + if ((parse->hasAggs || parse->groupClause != NIL) && + input_rel->partial_pathlist != NIL && + parse->groupingSets == NIL && + !contain_volatile_functions((Node *) tlist)) + { + /* + * Check that all aggregate functions support partial mode, + * however if there are no aggregate functions then we can skip + * this check. + */ + if (!parse->hasAggs) + can_parallel = true; + else if (aggregates_allow_partial((Node *) tlist) == PAT_ANY && + aggregates_allow_partial(root->parse->havingQual) == PAT_ANY) + can_parallel = true; + } + + if (can_parallel) + grouped_rel->consider_parallel = input_rel->consider_parallel; + /* * Collect statistics about aggregates for estimating costs. Note: we do * not detect duplicate aggregates here; a somewhat-overestimated cost is @@ -3256,7 +3340,20 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); + + if (can_parallel) + create_parallelagg_path(root, + input_rel, + grouped_rel, + tlist, + target, + parse->groupClause ? AGG_SORTED : AGG_PLAIN, + dNumGroups, + &agg_costs); + } else if (parse->groupClause) { @@ -3272,6 +3369,13 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, dNumGroups)); + + if (can_parallel) + create_parallelgroup_path(root, + input_rel, + grouped_rel, + target, + dNumGroups); } else { @@ -3342,7 +3446,20 @@ create_grouping_paths(PlannerInfo *root, parse->groupClause, (List *) parse->havingQual, &agg_costs, - dNumGroups)); + dNumGroups, + false, + true)); + + if (can_parallel) + create_parallelagg_path(root, + input_rel, + grouped_rel, + tlist, + target, + AGG_HASHED, + dNumGroups, + &agg_costs); + } /* Give a helpful error if we failed to find any implementation */ @@ -3358,6 +3475,145 @@ create_grouping_paths(PlannerInfo *root, return grouped_rel; } + +static void +create_parallelagg_path(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + List *tlist, + PathTarget *target, + AggStrategy aggstrategy, + double dNumGroups, + AggClauseCosts *agg_costs) +{ + Query *parse = root->parse; + Path *path; + List *partial_agg_tlist; + double numPartialGroups; + + /* + * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs + * needed to evaluate the expressions and final values of aggregates present + * in the main target list. The quals also should be included. + */ + partial_agg_tlist = make_partial_agg_tlist( + add_qual_in_tlist(tlist, parse->havingQual), + parse->groupClause); + + path = linitial(input_rel->partial_pathlist); + + if (aggstrategy == AGG_SORTED) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + path = (Path *)create_agg_path(root, grouped_rel, + path, + make_pathtarget_from_tlist(partial_agg_tlist), + aggstrategy, + parse->groupClause, + (List *) parse->havingQual, + agg_costs, + dNumGroups, + false, + false); + + /* + * Estimate the total number of groups which the gather will receive + * from the aggregate worker processes. We'll assume that each worker + * will produce every possible group, this might be an overestimate, + * although it seems safer to over estimate here rather than + * underestimate. To keep this number sane we cap the number of groups + * so it's never larger than the number of rows in the input path. This + * covers the case when there are less than an average of + * parallel_degree input tuples per group. + */ + numPartialGroups = Min(dNumGroups, path->rows) * (path->parallel_degree + 1); + + path = (Path *) create_gather_path(root, grouped_rel, path, NULL, + &numPartialGroups); + + if (aggstrategy == AGG_SORTED) + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + add_path(grouped_rel, (Path *) + create_agg_path(root, + grouped_rel, + path, + target, + aggstrategy, + parse->groupClause, + (List *) parse->havingQual, + agg_costs, + dNumGroups, + true, + true)); +} + +static void +create_parallelgroup_path(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel, + PathTarget *target, + double dNumGroups) +{ + Query *parse = root->parse; + Path *path; + double numPartialGroups; + + path = linitial(input_rel->partial_pathlist); + + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + path = (Path *)create_group_path(root, grouped_rel, + path, + target, + parse->groupClause, + NULL, /* Having clause is only applied at finalize node */ + dNumGroups); + + /* + * Estimate the total number of groups which the gather will receive + * from the aggregate worker processes. We'll assume that each worker + * will produce every possible group, this might be an overestimate, + * although it seems safer to over estimate here rather than + * underestimate. To keep this number sane we cap the number of groups + * so it's never larger than the number of rows in the input path. This + * covers the case when there are less than an average of + * parallel_degree input tuples per group. + */ + numPartialGroups = Min(dNumGroups, path->rows) * (path->parallel_degree + 1); + + path = (Path *) create_gather_path(root, grouped_rel, path, NULL, + &numPartialGroups); + + path = (Path *) create_sort_path(root, + grouped_rel, + path, + root->group_pathkeys, + -1.0); + + add_path(grouped_rel, (Path *) + create_group_path(root, + grouped_rel, + path, + target, + parse->groupClause, + (List *) parse->havingQual, + dNumGroups)); +} + + /* * create_window_paths * @@ -3664,7 +3920,9 @@ create_distinct_paths(PlannerInfo *root, parse->distinctClause, NIL, NULL, - numDistinctRows)); + numDistinctRows, + false, + true)); } /* Give a helpful error if we failed to find any implementation */ @@ -4390,3 +4648,183 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } + +/* + * make_partial_agg_tlist + * Generate appropriate Agg node target list for input to ParallelAgg nodes. + * + * The initial target list passed to ParallelAgg node from the parser contains + * aggregates and GROUP BY columns. For the underlying agg node, we want to + * generate a tlist containing bare aggregate references (Aggref) and GROUP BY + * expressions. So we flatten all expressions except GROUP BY items into their + * component variables. + * For example, given a query like + * SELECT a+b, 2 * SUM(c+d) , AVG(d)+SUM(c+d) FROM table GROUP BY a+b; + * we want to pass this targetlist to the Agg plan: + * a+b, SUM(c+d), AVG(d) + * where the a+b target will be used by the Sort/Group steps, and the + * other targets will be used for computing the final results. + * Note that we don't flatten Aggref's , since those are to be computed + * by the underlying Agg node, and they will be referenced like Vars above it. + * + * 'tlist' is the ParallelAgg's final target list. + * + * The result is the targetlist to be computed by the Agg node below the + * ParallelAgg node. + */ +static List * +make_partial_agg_tlist(List *tlist,List *groupClause) +{ + Bitmapset *sgrefs; + List *new_tlist; + List *flattenable_cols; + List *flattenable_vars; + ListCell *lc; + + /* + * Collect the sortgroupref numbers of GROUP BY clauses + * into a bitmapset for convenient reference below. + */ + sgrefs = NULL; + + /* Add in sortgroupref numbers of GROUP BY clauses */ + foreach(lc, groupClause) + { + SortGroupClause *grpcl = (SortGroupClause *) lfirst(lc); + + sgrefs = bms_add_member(sgrefs, grpcl->tleSortGroupRef); + } + + /* + * Construct a tlist containing all the non-flattenable tlist items, and + * save aside the others for a moment. + */ + new_tlist = NIL; + flattenable_cols = NIL; + + foreach(lc, tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + /* Don't want to deconstruct GROUP BY items. */ + if (tle->ressortgroupref != 0 && + bms_is_member(tle->ressortgroupref, sgrefs)) + { + /* Don't want to deconstruct this value, so add to new_tlist */ + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false); + /* Preserve its sortgroupref marking, in case it's volatile */ + newtle->ressortgroupref = tle->ressortgroupref; + new_tlist = lappend(new_tlist, newtle); + } + else + { + /* + * Column is to be flattened, so just remember the expression for + * later call to pull_var_clause. There's no need for + * pull_var_clause to examine the TargetEntry node itself. + */ + flattenable_cols = lappend(flattenable_cols, tle->expr); + } + } + + /* + * Pull out all the Vars and Aggrefs mentioned in flattenable columns, and + * add them to the result tlist if not already present. (Some might be + * there already because they're used directly as group clauses.) + * + * Note: it's essential to use PVC_INCLUDE_AGGREGATES here, so that the + * Aggrefs are placed in the Agg node's tlist and not left to be computed + * at higher levels. + */ + flattenable_vars = pull_var_clause((Node *) flattenable_cols, + PVC_INCLUDE_AGGREGATES, + PVC_INCLUDE_PLACEHOLDERS); + new_tlist = add_to_flat_tlist(new_tlist, flattenable_vars); + + /* clean up cruft */ + list_free(flattenable_vars); + list_free(flattenable_cols); + + /* + * Update the targetlist aggref->aggtype with the transtype. This is required to + * send the aggregate transition data from workers to the backend for combining + * and returning the final result. + */ + foreach(lc, new_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (IsA(tle->expr, Aggref)) + { + Aggref *aggref = (Aggref *) tle->expr; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + aggref->aggtype = aggform->aggtranstype; + + ReleaseSysCache(aggTuple); + } + } + + return new_tlist; +} + +/* + * add_qual_in_tlist + * Add the agg functions in qual into the target list used in agg plan + */ +static List* +add_qual_in_tlist(List *targetlist, Node *qual) +{ + AddQualInTListExprContext context; + + if(qual == NULL) + return targetlist; + + context.targetlist = copyObject(targetlist); + context.resno = list_length(context.targetlist) + 1;; + + add_qual_in_tlist_walker(qual, &context); + + return context.targetlist; +} + +/* + * add_qual_in_tlist_walker + * Go through the qual list to get the aggref and add it in targetlist + */ +static bool +add_qual_in_tlist_walker (Node *node, AddQualInTListExprContext *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Aggref)) + { + List *tlist = context->targetlist; + TargetEntry *te = makeNode(TargetEntry); + + te = makeTargetEntry((Expr *) node, + context->resno++, + NULL, + false); + + tlist = lappend(tlist,te); + } + else + return expression_tree_walker(node, add_qual_in_tlist_walker, context); + + return false; +} diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index d296d09..331b983 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -140,6 +140,16 @@ static bool fix_opfuncids_walker(Node *node, void *context); static bool extract_query_dependencies_walker(Node *node, PlannerInfo *context); +static void set_combineagg_references(PlannerInfo *root, Plan *plan, + int rtoffset); +static Node *fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset); +static Node *fix_combine_agg_expr_mutator(Node *node, + fix_upper_expr_context *context); + /***************************************************************************** * * SUBPLAN REFERENCES @@ -667,8 +677,17 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) } break; case T_Agg: - set_upper_references(root, plan, rtoffset); - break; + { + Agg *aggplan = (Agg *) plan; + + if (aggplan->combineStates) + set_combineagg_references(root, plan, rtoffset); + else + set_upper_references(root, plan, rtoffset); + + break; + } + case T_Group: set_upper_references(root, plan, rtoffset); break; @@ -2478,3 +2497,159 @@ extract_query_dependencies_walker(Node *node, PlannerInfo *context) return expression_tree_walker(node, extract_query_dependencies_walker, (void *) context); } + + +static void +set_combineagg_references(PlannerInfo *root, Plan *plan, int rtoffset) +{ + Plan *subplan = plan->lefttree; + indexed_tlist *subplan_itlist; + List *output_targetlist; + ListCell *l; + + Assert(IsA(plan, Agg)); + Assert(((Agg *) plan)->combineStates); + + subplan_itlist = build_tlist_index(subplan->targetlist); + + output_targetlist = NIL; + + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Node *newexpr; + + /* If it's a non-Var sort/group item, first try to match by sortref */ + if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var)) + { + newexpr = (Node *) + search_indexed_tlist_for_sortgroupref((Node *) tle->expr, + tle->ressortgroupref, + subplan_itlist, + OUTER_VAR); + if (!newexpr) + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + } + else + newexpr = fix_combine_agg_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset); + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newexpr; + output_targetlist = lappend(output_targetlist, tle); + } + + plan->targetlist = output_targetlist; + + plan->qual = (List *) + fix_upper_expr(root, + (Node *) plan->qual, + subplan_itlist, + OUTER_VAR, + rtoffset); + + pfree(subplan_itlist); +} + + +/* + * Adjust the Aggref'a args to reference the correct Aggref target in the outer + * subplan. + */ +static Node * +fix_combine_agg_expr(PlannerInfo *root, + Node *node, + indexed_tlist *subplan_itlist, + Index newvarno, + int rtoffset) +{ + fix_upper_expr_context context; + + context.root = root; + context.subplan_itlist = subplan_itlist; + context.newvarno = newvarno; + context.rtoffset = rtoffset; + return fix_combine_agg_expr_mutator(node, &context); +} + +static Node * +fix_combine_agg_expr_mutator(Node *node, fix_upper_expr_context *context) +{ + Var *newvar; + + if (node == NULL) + return NULL; + if (IsA(node, Var)) + { + Var *var = (Var *) node; + + newvar = search_indexed_tlist_for_var(var, + context->subplan_itlist, + context->newvarno, + context->rtoffset); + if (!newvar) + elog(ERROR, "variable not found in subplan target list"); + return (Node *) newvar; + } + if (IsA(node, Aggref)) + { + TargetEntry *tle; + Aggref *aggref = (Aggref*) node; + + tle = tlist_member(node, context->subplan_itlist->tlist); + if (tle) + { + /* Found a matching subplan output expression */ + Var *newvar; + TargetEntry *newtle; + + newvar = makeVarFromTargetEntry(context->newvarno, tle); + newvar->varnoold = 0; /* wasn't ever a plain Var */ + newvar->varoattno = 0; + + /* update the args in the aggref */ + + /* makeTargetEntry ,always set resno to one for finialize agg */ + newtle = makeTargetEntry((Expr*) newvar, 1, NULL, false); + + /* + * Updated the args, let the newvar refer to the right position of + * the agg function in the subplan + */ + aggref->args = list_make1(newtle); + + return (Node *) aggref; + } + else + elog(ERROR, "aggref not found in subplan target list"); + } + if (IsA(node, PlaceHolderVar)) + { + PlaceHolderVar *phv = (PlaceHolderVar *) node; + + /* See if the PlaceHolderVar has bubbled up from a lower plan node */ + if (context->subplan_itlist->has_ph_vars) + { + newvar = search_indexed_tlist_for_non_var((Node *) phv, + context->subplan_itlist, + context->newvarno); + if (newvar) + return (Node *) newvar; + } + /* If not supplied by input plan, evaluate the contained expr */ + return fix_upper_expr_mutator((Node *) phv->phexpr, context); + } + if (IsA(node, Param)) + return fix_param_node(context->root, (Param *) node); + + fix_expr_common(context->root, node); + return expression_tree_mutator(node, + fix_combine_agg_expr_mutator, + (void *) context); +} diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index 6ea3319..fb139af 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -859,7 +859,9 @@ make_union_unique(SetOperationStmt *op, Path *path, List *tlist, groupList, NIL, NULL, - dNumGroups); + dNumGroups, + false, + true); } else { diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 6ac25dc..349cfb1 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -52,6 +52,10 @@ #include "utils/syscache.h" #include "utils/typcache.h" +typedef struct +{ + PartialAggType allowedtype; +} partial_agg_context; typedef struct { @@ -93,6 +97,7 @@ typedef struct bool allow_restricted; } has_parallel_hazard_arg; +static bool partial_aggregate_walker(Node *node, partial_agg_context *context); static bool contain_agg_clause_walker(Node *node, void *context); static bool count_agg_clauses_walker(Node *node, count_agg_clauses_context *context); @@ -400,6 +405,80 @@ make_ands_implicit(Expr *clause) *****************************************************************************/ /* + * aggregates_allow_partial + * Recursively search for Aggref clauses and determine the maximum + * 'degree' of partial aggregation which can be supported. Partial + * aggregation requires that each aggregate does not have a DISTINCT or + * ORDER BY clause, and that it also has a combine function set. + */ +PartialAggType +aggregates_allow_partial(Node *clause) +{ + partial_agg_context context; + + /* initially any type is ok, until we find Aggrefs which say otherwise */ + context.allowedtype = PAT_ANY; + + if (!partial_aggregate_walker(clause, &context)) + return context.allowedtype; + return context.allowedtype; +} + +static bool +partial_aggregate_walker(Node *node, partial_agg_context *context) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + Assert(aggref->agglevelsup == 0); + + /* + * We can't perform partial aggregation with Aggrefs containing a + * DISTINCT or ORDER BY clause. + */ + if (aggref->aggdistinct || aggref->aggorder) + { + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + /* + * If there is no combine func, then partial aggregation is not + * possible. + */ + if (!OidIsValid(aggform->aggcombinefn)) + { + ReleaseSysCache(aggTuple); + context->allowedtype = PAT_DISABLED; + return true; /* abort search */ + } + + /* + * If we find any aggs with an internal transtype then we must ensure + * that pointers to aggregate states are not passed to other processes, + * therefore we set the maximum degree to PAT_INTERNAL_ONLY. + */ + if (aggform->aggtranstype == INTERNALOID) + context->allowedtype = PAT_INTERNAL_ONLY; + + ReleaseSysCache(aggTuple); + return false; /* continue searching */ + } + return expression_tree_walker(node, partial_aggregate_walker, + (void *) context); +} + +/* * contain_agg_clause * Recursively search for Aggref/GroupingFunc nodes within a clause. * diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 272f368..e48abcc 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -1648,7 +1648,7 @@ translate_sub_tlist(List *tlist, int relid) */ GatherPath * create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, - Relids required_outer) + Relids required_outer, double *rows) { GatherPath *pathnode = makeNode(GatherPath); @@ -1674,7 +1674,7 @@ create_gather_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, pathnode->single_copy = true; } - cost_gather(pathnode, root, rel, pathnode->path.param_info); + cost_gather(pathnode, root, rel, pathnode->path.param_info, rows); return pathnode; } @@ -2393,7 +2393,9 @@ create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups) + double numGroups, + bool combine_agg, + bool finalize_agg) { AggPath *pathnode = makeNode(AggPath); @@ -2415,7 +2417,11 @@ create_agg_path(PlannerInfo *root, pathnode->aggstrategy = aggstrategy; pathnode->numGroups = numGroups; pathnode->groupClause = groupClause; - pathnode->qual = qual; + + /* Only apply HAVING clause for final aggregation */ + pathnode->qual = finalize_agg ? qual : NULL; + pathnode->combineStates = combine_agg; + pathnode->finalizeAggs = finalize_agg; cost_agg(&pathnode->path, root, aggstrategy, aggcosts, diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 3d7d07e..76ea42f 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1300,6 +1300,8 @@ typedef struct AggPath double numGroups; /* estimated number of groups in input */ List *groupClause; /* a list of SortGroupClause's */ List *qual; /* quals (HAVING quals), if any */ + bool combineStates; /* input is partially aggregated agg states */ + bool finalizeAggs; /* should the executor call the finalfn? */ } AggPath; /* diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 3b3fd0f..d03ccc9 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -27,6 +27,26 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +/* + * PartialAggType + * PartialAggType stores whether partial aggregation is allowed and + * which context it is allowed in. We require three states here as there are + * two different contexts in which partial aggregation is safe. For aggregates + * which have an 'stype' of INTERNAL, within a single backend process it is + * okay to pass a pointer to the aggregate state, as the memory to which the + * pointer points to will belong to the same process. In cases where the + * aggregate state must be passed between different processes, for example + * during parallel aggregation, passing the pointer is not okay due to the + * fact that the memory being referenced won't be accessible from another + * process. + */ +typedef enum +{ + PAT_ANY = 0, /* Any type of partial aggregation is ok. */ + PAT_INTERNAL_ONLY, /* Some aggregates support only internal mode. */ + PAT_DISABLED /* Some aggregates don't support partial mode at all */ +} PartialAggType; + extern Expr *make_opclause(Oid opno, Oid opresulttype, bool opretset, Expr *leftop, Expr *rightop, @@ -47,6 +67,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2); extern Expr *make_ands_explicit(List *andclauses); extern List *make_ands_implicit(Expr *clause); +extern PartialAggType aggregates_allow_partial(Node *clause); extern bool contain_agg_clause(Node *clause); extern void count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index fea2bb7..ce61d70 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -150,7 +150,8 @@ extern void final_cost_hashjoin(PlannerInfo *root, HashPath *path, SpecialJoinInfo *sjinfo, SemiAntiJoinFactors *semifactors); extern void cost_gather(GatherPath *path, PlannerInfo *root, - RelOptInfo *baserel, ParamPathInfo *param_info); + RelOptInfo *baserel, ParamPathInfo *param_info, + double *rows); extern void cost_subplan(PlannerInfo *root, SubPlan *subplan, Plan *plan); extern void cost_qual_eval(QualCost *cost, List *quals, PlannerInfo *root); extern void cost_qual_eval_node(QualCost *cost, Node *qual, PlannerInfo *root); diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 37744bf..e308fab 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -74,7 +74,8 @@ extern MaterialPath *create_material_path(RelOptInfo *rel, Path *subpath); extern UniquePath *create_unique_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, SpecialJoinInfo *sjinfo); extern GatherPath *create_gather_path(PlannerInfo *root, - RelOptInfo *rel, Path *subpath, Relids required_outer); + RelOptInfo *rel, Path *subpath, Relids required_outer, + double *rows); extern SubqueryScanPath *create_subqueryscan_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, List *pathkeys, Relids required_outer); @@ -167,7 +168,9 @@ extern AggPath *create_agg_path(PlannerInfo *root, List *groupClause, List *qual, const AggClauseCosts *aggcosts, - double numGroups); + double numGroups, + bool combine_agg, + bool finalize_agg); extern GroupingSetsPath *create_groupingsets_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath,