diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 5fc80e7..184e1e0 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -126,6 +126,7 @@ bool enable_material = true; bool enable_mergejoin = true; bool enable_hashjoin = true; +bool enable_parallelagg = false; typedef struct { PlannerInfo *root; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index c0ec905..0ac84f7 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -49,6 +49,8 @@ #include "utils/rel.h" #include "utils/selfuncs.h" +#include "utils/syscache.h" +#include "catalog/pg_aggregate.h" /* GUC parameter */ double cursor_tuple_fraction = DEFAULT_CURSOR_TUPLE_FRACTION; @@ -77,6 +79,12 @@ typedef struct List *groupClause; /* overrides parse->groupClause */ } standard_qp_extra; +typedef struct +{ + AttrNumber resno; + List *targetlist; +} AddQualInTListExprContext; + /* Local functions */ static Node *preprocess_expression(PlannerInfo *root, Node *expr, int kind); static void preprocess_qual_conditions(PlannerInfo *root, Node *jtnode); @@ -134,8 +142,35 @@ static Plan *build_grouping_chain(PlannerInfo *root, AttrNumber *groupColIdx, AggClauseCosts *agg_costs, long numGroups, + bool combineStates, + bool finalizeAggs, + Plan *result_plan); +static Plan *make_group_agg(PlannerInfo *root, + Query *parse, + List *tlist, + bool need_sort_for_grouping, + List *rollup_groupclauses, + List *rollup_lists, + AttrNumber *groupColIdx, + AggClauseCosts *agg_costs, + long numGroups, + bool parallel_agg, Plan *result_plan); +static AttrNumber*get_grpColIdx_from_subPlan(PlannerInfo *root, List *tlist); +static List *make_partial_agg_tlist(List *tlist,List *groupClause); +static List* add_qual_in_tlist(List *targetlist, List *qual); +static bool add_qual_in_tlist_walker (Node *node, + AddQualInTListExprContext *context); +static Plan *make_hash_agg(PlannerInfo *root, + Query *parse, + List *tlist, + AggClauseCosts *aggcosts, + int numGroupCols, + AttrNumber *grpColIdx, + long numGroups, + bool parallel_agg, + Plan *lefttree); /***************************************************************************** * * Query optimizer entry point @@ -1329,6 +1364,7 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) double dNumGroups = 0; bool use_hashed_distinct = false; bool tested_hashed_distinct = false; + bool parallel_agg = false; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -1411,6 +1447,9 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) else { /* No set operations, do regular planning */ + List *sub_tlist; + AttrNumber *groupColIdx = NULL; + bool need_tlist_eval = true; long numGroups = 0; AggClauseCosts agg_costs; int numGroupCols; @@ -1425,8 +1464,8 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) List *rollup_groupclauses = NIL; standard_qp_extra qp_extra; RelOptInfo *final_rel; - Path *cheapest_path; - Path *sorted_path; + Path *cheapest_path = NULL; + Path *sorted_path = NULL; Path *best_path; MemSet(&agg_costs, 0, sizeof(AggClauseCosts)); @@ -1752,22 +1791,54 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) } /* - * Pick out the cheapest-total path as well as the cheapest presorted - * path for the requested pathkeys (if there is one). We should take - * the tuple fraction into account when selecting the cheapest - * presorted path, but not when selecting the cheapest-total path, - * since if we have to sort then we'll have to fetch all the tuples. - * (But there's a special case: if query_pathkeys is NIL, meaning - * order doesn't matter, then the "cheapest presorted" path will be - * the cheapest overall for the tuple fraction.) + * Prepare a gather path on the partial path, in case if it satisfies + * parallel aggregate plan. */ - cheapest_path = final_rel->cheapest_total_path; + if (enable_parallelagg + && final_rel->partial_pathlist + && (dNumGroups < (path_rows / 4))) + { + /* + * check for parallel aggregate eligibility by referring all aggregate + * functions in both qualification and targetlist. + */ + if (aggregates_allow_partial((Node *)tlist) + && aggregates_allow_partial(parse->havingQual)) + { + Path *cheapest_partial_path; + + cheapest_partial_path = linitial(final_rel->partial_pathlist); + cheapest_path = (Path *) + create_gather_path(root, final_rel, cheapest_partial_path, NULL); + + sorted_path = + get_cheapest_fractional_path_for_pathkeys(final_rel->partial_pathlist, + root->query_pathkeys, + NULL, + tuple_fraction); + parallel_agg = true; + } + } + else + { + /* + * Pick out the cheapest-total path as well as the cheapest presorted + * path for the requested pathkeys (if there is one). We should take + * the tuple fraction into account when selecting the cheapest + * presorted path, but not when selecting the cheapest-total path, + * since if we have to sort then we'll have to fetch all the tuples. + * (But there's a special case: if query_pathkeys is NIL, meaning + * order doesn't matter, then the "cheapest presorted" path will be + * the cheapest overall for the tuple fraction.) + */ + cheapest_path = final_rel->cheapest_total_path; - sorted_path = - get_cheapest_fractional_path_for_pathkeys(final_rel->pathlist, - root->query_pathkeys, - NULL, - tuple_fraction); + sorted_path = + get_cheapest_fractional_path_for_pathkeys(final_rel->pathlist, + root->query_pathkeys, + NULL, + tuple_fraction); + } /* Don't consider same path in both guises; just wastes effort */ if (sorted_path == cheapest_path) @@ -1892,9 +1963,6 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) * Normal case --- create a plan according to query_planner's * results. */ - List *sub_tlist; - AttrNumber *groupColIdx = NULL; - bool need_tlist_eval = true; bool need_sort_for_grouping = false; result_plan = create_plan(root, best_path); @@ -1903,15 +1971,22 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) /* Detect if we'll need an explicit sort for grouping */ if (parse->groupClause && !use_hashed_grouping && !pathkeys_contained_in(root->group_pathkeys, current_pathkeys)) + { need_sort_for_grouping = true; + /* + * Always override create_plan's tlist, so that we don't sort + * useless data from a "physical" tlist. + */ + need_tlist_eval = true; + } + /* - * Generate appropriate target list for scan/join subplan; may be - * different from tlist if grouping or aggregation is needed. + * Generate appropriate target list for subplan; may be different from + * tlist if grouping or aggregation is needed. */ sub_tlist = make_subplanTargetList(root, tlist, - &groupColIdx, - &need_tlist_eval); + &groupColIdx, &need_tlist_eval); /* * create_plan returns a plan with just a "flat" tlist of required @@ -1994,20 +2069,16 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) */ if (use_hashed_grouping) { - /* Hashed aggregate plan --- no sort needed */ - result_plan = (Plan *) make_agg(root, - tlist, - (List *) parse->havingQual, - AGG_HASHED, - &agg_costs, - numGroupCols, - groupColIdx, - extract_grouping_ops(parse->groupClause), - NIL, - numGroups, - false, - true, - result_plan); + result_plan = make_hash_agg(root, + parse, + tlist, + &agg_costs, + numGroupCols, + groupColIdx, + numGroups, + parallel_agg, + result_plan); + /* Hashed aggregation produces randomly-ordered results */ current_pathkeys = NIL; } @@ -2027,16 +2098,24 @@ grouping_planner(PlannerInfo *root, double tuple_fraction) else current_pathkeys = NIL; - result_plan = build_grouping_chain(root, - parse, - tlist, - need_sort_for_grouping, - rollup_groupclauses, - rollup_lists, - groupColIdx, - &agg_costs, - numGroups, - result_plan); + result_plan = make_group_agg(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + &agg_costs, + numGroups, + parallel_agg, + result_plan); + + /* + * these are destroyed by build_grouping_chain, so make sure + * we don't try and touch them again + */ + rollup_groupclauses = NIL; + rollup_lists = NIL; } else if (parse->groupClause) { @@ -2481,6 +2560,8 @@ build_grouping_chain(PlannerInfo *root, AttrNumber *groupColIdx, AggClauseCosts *agg_costs, long numGroups, + bool combineStates, + bool finalizeAggs, Plan *result_plan) { AttrNumber *top_grpColIdx = groupColIdx; @@ -2553,8 +2634,8 @@ build_grouping_chain(PlannerInfo *root, extract_grouping_ops(groupClause), gsets, numGroups, - false, - true, + combineStates, + finalizeAggs, sort_plan); /* @@ -2594,8 +2675,8 @@ build_grouping_chain(PlannerInfo *root, extract_grouping_ops(groupClause), gsets, numGroups, - false, - true, + combineStates, + finalizeAggs, result_plan); ((Agg *) result_plan)->chain = chain; @@ -4718,3 +4799,396 @@ plan_cluster_use_sort(Oid tableOid, Oid indexOid) return (seqScanAndSortPath.total_cost < indexScanPath->path.total_cost); } + +/* + * This function build a hash parallelagg plan as result_plan as following : + * Finalize Hash Aggregate + * -> Gather + * -> Partial Hash Aggregate + * -> Any partial plan + * The input result_plan will be + * Gather + * -> Any partial plan + * So this function will do the following steps: + * 1. Make a PartialHashAgg and set Gather node as above node + * 2. Change the targetlist of Gather node + * 3. Make a FinalizeHashAgg as top node above the Gather node + */ + +static Plan * +make_hash_agg(PlannerInfo *root, + Query *parse, + List *tlist, + AggClauseCosts *agg_costs, + int numGroupCols, + AttrNumber *groupColIdx, + long numGroups, + bool parallel_agg, + Plan *lefttree) +{ + Plan *result_plan = NULL; + Plan *partial_agg_plan = NULL; + Plan *gather_plan = NULL; + List *partial_agg_tlist = NIL; + List *qual = (List*)parse->havingQual; + AttrNumber *topgroupColIdx = NULL; + + if (!parallel_agg || nodeTag(lefttree) != T_Gather) + { + result_plan = (Plan *) make_agg(root, + tlist, + (List *) parse->havingQual, + AGG_HASHED, + agg_costs, + numGroupCols, + groupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + false, + true, + lefttree); + return result_plan; + } + + Assert(nodeTag(lefttree) == T_Gather); + gather_plan = lefttree; + + /* + * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs + * needed to evaluate the expressions and final values of aggregates present + * in the main target list. The quals also should be included. + */ + partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual), + parse->groupClause); + + /* Make PartialHashAgg plan node */ + partial_agg_plan = (Plan *) make_agg(root, + partial_agg_tlist, + NULL, + AGG_HASHED, + agg_costs, + numGroupCols, + groupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + false, + false, + gather_plan->lefttree); + + gather_plan->lefttree = partial_agg_plan; + gather_plan->targetlist = partial_agg_plan->targetlist; + + /* + * Get the sortIndex according the subplan + */ + topgroupColIdx = get_grpColIdx_from_subPlan(root, partial_agg_tlist); + + /* Make FinalizeHashAgg plan node */ + result_plan = (Plan *) make_agg(root, + tlist, + (List *) parse->havingQual, + AGG_HASHED, + agg_costs, + numGroupCols, + topgroupColIdx, + extract_grouping_ops(parse->groupClause), + NIL, + numGroups, + true, + true, + gather_plan); + + return result_plan; +} + +/* + * This function build a group parallelagg plan as result_plan as following : + * Finalize Group Aggregate + * -> Sort + * -> Gather + * -> Partial Group Aggregate + * -> Sort + * -> Any partial plan + * The input result_plan will be + * Gather + * -> Any partial plan + * So this function will do the following steps: + * 1. Move up the Gather node and change its targetlist + * 2. Change the Group Aggregate to be Partial Group Aggregate + * 3. Add Finalize Group Aggregate and Sort node + */ +static Plan * +make_group_agg(PlannerInfo *root, + Query *parse, + List *tlist, + bool need_sort_for_grouping, + List *rollup_groupclauses, + List *rollup_lists, + AttrNumber *groupColIdx, + AggClauseCosts *agg_costs, + long numGroups, + bool parallel_agg, + Plan *result_plan) +{ + Plan *partial_agg = NULL; + Plan *gather_plan = NULL; + List *qual = (List*)parse->havingQual; + List *partial_agg_tlist = NULL; + AttrNumber *topgroupColIdx = NULL; + + if (!parallel_agg || nodeTag(result_plan) != T_Gather) + { + result_plan = build_grouping_chain(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + &agg_costs, + numGroups, + false, + true, + result_plan); + return result_plan; + } + + Assert(nodeTag(result_plan) == T_Gather); + gather_plan = result_plan; + + /* + * The underlying Agg targetlist should be a flat tlist of all Vars and Aggs + * needed to evaluate the expressions and final values of aggregates present + * in the main target list. The quals also should be included. + */ + partial_agg_tlist = make_partial_agg_tlist(add_qual_in_tlist(tlist, qual), + llast(rollup_groupclauses)); + + /* Add PartialAgg and Sort node */ + partial_agg = build_grouping_chain(root, + parse, + partial_agg_tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + groupColIdx, + agg_costs, + numGroups, + false, + false, + gather_plan->lefttree); + + + + /* Let the Gather node as upper node of partial_agg node */ + gather_plan->targetlist = partial_agg->targetlist; + gather_plan->lefttree = partial_agg; + + /* + * Get the sortIndex according the subplan + */ + topgroupColIdx = get_grpColIdx_from_subPlan(root, partial_agg_tlist); + + /* Make the Finalize Group Aggregate node */ + result_plan = build_grouping_chain(root, + parse, + tlist, + need_sort_for_grouping, + rollup_groupclauses, + rollup_lists, + topgroupColIdx, + agg_costs, + numGroups, + true, + true, + gather_plan); + + return result_plan; +} + +/* Function to get the grouping column index from the provided plan */ +static AttrNumber* +get_grpColIdx_from_subPlan(PlannerInfo *root, List *tlist) +{ + Query *parse = root->parse; + int numCols; + + AttrNumber *grpColIdx = NULL; + + numCols = list_length(parse->groupClause); + if (numCols > 0) + { + ListCell *tl; + + grpColIdx = (AttrNumber *) palloc0(sizeof(AttrNumber) * numCols); + + foreach(tl, tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(tl); + int colno; + + colno = get_grouping_column_index(parse, tle); + if (colno >= 0) + { + Assert(grpColIdx[colno] == 0); /* no dups expected */ + grpColIdx[colno] = tle->resno; + } + } + } + + return grpColIdx; +} + +/* + * make_partial_agg_tlist + * Generate appropriate Agg node target list for input to ParallelAgg nodes. + * + * The initial target list passed to ParallelAgg node from the parser contains + * aggregates and GROUP BY columns. For the underlying agg node, we want to + * generate a tlist containing bare aggregate references (Aggref) and GROUP BY + * expressions. So we flatten all expressions except GROUP BY items into their + * component variables. + * For example, given a query like + * SELECT a+b, 2 * SUM(c+d) , AVG(d)+SUM(c+d) FROM table GROUP BY a+b; + * we want to pass this targetlist to the Agg plan: + * a+b, SUM(c+d), AVG(d) + * where the a+b target will be used by the Sort/Group steps, and the + * other targets will be used for computing the final results. + * Note that we don't flatten Aggref's , since those are to be computed + * by the underlying Agg node, and they will be referenced like Vars above it. + * + * 'tlist' is the ParallelAgg's final target list. + * + * The result is the targetlist to be computed by the Agg node below the + * ParallelAgg node. + */ +static List * +make_partial_agg_tlist(List *tlist,List *groupClause) +{ + Bitmapset *sgrefs; + List *new_tlist; + List *flattenable_cols; + List *flattenable_vars; + ListCell *lc; + + /* + * Collect the sortgroupref numbers of GROUP BY clauses + * into a bitmapset for convenient reference below. + */ + sgrefs = NULL; + + /* Add in sortgroupref numbers of GROUP BY clauses */ + foreach(lc, groupClause) + { + SortGroupClause *grpcl = (SortGroupClause *) lfirst(lc); + + sgrefs = bms_add_member(sgrefs, grpcl->tleSortGroupRef); + } + + /* + * Construct a tlist containing all the non-flattenable tlist items, and + * save aside the others for a moment. + */ + new_tlist = NIL; + flattenable_cols = NIL; + + foreach(lc, tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + /* Don't want to deconstruct GROUP BY items. */ + if (tle->ressortgroupref != 0 && + bms_is_member(tle->ressortgroupref, sgrefs)) + { + /* Don't want to deconstruct this value, so add to new_tlist */ + TargetEntry *newtle; + + newtle = makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false); + /* Preserve its sortgroupref marking, in case it's volatile */ + newtle->ressortgroupref = tle->ressortgroupref; + new_tlist = lappend(new_tlist, newtle); + } + else + { + /* + * Column is to be flattened, so just remember the expression for + * later call to pull_var_clause. There's no need for + * pull_var_clause to examine the TargetEntry node itself. + */ + flattenable_cols = lappend(flattenable_cols, tle->expr); + } + } + + /* + * Pull out all the Vars and Aggrefs mentioned in flattenable columns, and + * add them to the result tlist if not already present. (Some might be + * there already because they're used directly as group clauses.) + * + * Note: it's essential to use PVC_INCLUDE_AGGREGATES here, so that the + * Aggrefs are placed in the Agg node's tlist and not left to be computed + * at higher levels. + */ + flattenable_vars = pull_var_clause((Node *) flattenable_cols, + PVC_INCLUDE_AGGREGATES, + PVC_INCLUDE_PLACEHOLDERS); + new_tlist = add_to_flat_tlist(new_tlist, flattenable_vars); + + /* clean up cruft */ + list_free(flattenable_vars); + list_free(flattenable_cols); + + return new_tlist; +} + +/* + * add_qual_in_tlist + * Add the agg functions in qual into the target list used in agg plan + */ +static List* +add_qual_in_tlist(List *targetlist, List *qual) +{ + AddQualInTListExprContext context; + + if(qual == NULL) + return targetlist; + + context.targetlist = copyObject(targetlist); + context.resno = list_length(context.targetlist) + 1;; + + add_qual_in_tlist_walker((Node*)qual, &context); + + return context.targetlist; +} + +/* + * add_qual_in_tlist_walker + * Go through the qual list to get the aggref and add it in targetlist + */ +static bool +add_qual_in_tlist_walker (Node *node, AddQualInTListExprContext *context) +{ + if (node == NULL) + return false; + + if (IsA(node, Aggref)) + { + List *tlist = context->targetlist; + TargetEntry *te = makeNode(TargetEntry); + + te = makeTargetEntry((Expr *) node, + context->resno++, + NULL, + false); + + tlist = lappend(tlist,te); + } + else + return expression_tree_walker(node, add_qual_in_tlist_walker, context); + + return false; +} diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 615f3a2..85b649e 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -15,7 +15,9 @@ */ #include "postgres.h" +#include "access/htup_details.h" #include "access/transam.h" +#include "catalog/pg_aggregate.h" #include "catalog/pg_type.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -65,6 +67,7 @@ typedef struct indexed_tlist *subplan_itlist; Index newvarno; int rtoffset; + bool partial_agg; } fix_upper_expr_context; /* @@ -104,6 +107,8 @@ static Node *fix_scan_expr_mutator(Node *node, fix_scan_expr_context *context); static bool fix_scan_expr_walker(Node *node, fix_scan_expr_context *context); static void set_join_references(PlannerInfo *root, Join *join, int rtoffset); static void set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset); +static void set_partialagg_aggref_types(PlannerInfo *root, Plan *plan); static void set_dummy_tlist_references(Plan *plan, int rtoffset); static indexed_tlist *build_tlist_index(List *tlist); static Var *search_indexed_tlist_for_var(Var *var, @@ -128,7 +133,8 @@ static Node *fix_upper_expr(PlannerInfo *root, Node *node, indexed_tlist *subplan_itlist, Index newvarno, - int rtoffset); + int rtoffset, + bool partial_agg); static Node *fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context); static List *set_returning_clause_references(PlannerInfo *root, @@ -140,6 +146,7 @@ static bool fix_opfuncids_walker(Node *node, void *context); static bool extract_query_dependencies_walker(Node *node, PlannerInfo *context); + /***************************************************************************** * * SUBPLAN REFERENCES @@ -668,7 +675,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) } break; case T_Agg: - set_upper_references(root, plan, rtoffset); + set_agg_references(root, plan, rtoffset); break; case T_Group: set_upper_references(root, plan, rtoffset); @@ -943,13 +950,15 @@ set_indexonlyscan_references(PlannerInfo *root, (Node *) plan->scan.plan.targetlist, index_itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); plan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) plan->scan.plan.qual, index_itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); /* indexqual is already transformed to reference index columns */ plan->indexqual = fix_scan_list(root, plan->indexqual, rtoffset); /* indexorderby is already transformed to reference index columns */ @@ -1116,25 +1125,29 @@ set_foreignscan_references(PlannerInfo *root, (Node *) fscan->scan.plan.targetlist, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) fscan->scan.plan.qual, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->fdw_exprs = (List *) fix_upper_expr(root, (Node *) fscan->fdw_exprs, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); fscan->fdw_recheck_quals = (List *) fix_upper_expr(root, (Node *) fscan->fdw_recheck_quals, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); pfree(itlist); /* fdw_scan_tlist itself just needs fix_scan_list() adjustments */ fscan->fdw_scan_tlist = @@ -1190,19 +1203,22 @@ set_customscan_references(PlannerInfo *root, (Node *) cscan->scan.plan.targetlist, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); cscan->scan.plan.qual = (List *) fix_upper_expr(root, (Node *) cscan->scan.plan.qual, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); cscan->custom_exprs = (List *) fix_upper_expr(root, (Node *) cscan->custom_exprs, itlist, INDEX_VAR, - rtoffset); + rtoffset, + false); pfree(itlist); /* custom_scan_tlist itself just needs fix_scan_list() adjustments */ cscan->custom_scan_tlist = @@ -1524,7 +1540,8 @@ set_join_references(PlannerInfo *root, Join *join, int rtoffset) (Node *) nlp->paramval, outer_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); /* Check we replaced any PlaceHolderVar with simple Var */ if (!(IsA(nlp->paramval, Var) && nlp->paramval->varno == OUTER_VAR)) @@ -1648,14 +1665,16 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) (Node *) tle->expr, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); } else newexpr = fix_upper_expr(root, (Node *) tle->expr, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); tle = flatCopyTargetEntry(tle); tle->expr = (Expr *) newexpr; output_targetlist = lappend(output_targetlist, tle); @@ -1667,7 +1686,8 @@ set_upper_references(PlannerInfo *root, Plan *plan, int rtoffset) (Node *) plan->qual, subplan_itlist, OUTER_VAR, - rtoffset); + rtoffset, + false); pfree(subplan_itlist); } @@ -2121,7 +2141,8 @@ fix_upper_expr(PlannerInfo *root, Node *node, indexed_tlist *subplan_itlist, Index newvarno, - int rtoffset) + int rtoffset, + bool partial_agg) { fix_upper_expr_context context; @@ -2129,6 +2150,7 @@ fix_upper_expr(PlannerInfo *root, context.subplan_itlist = subplan_itlist; context.newvarno = newvarno; context.rtoffset = rtoffset; + context.partial_agg = partial_agg; return fix_upper_expr_mutator(node, &context); } @@ -2151,6 +2173,36 @@ fix_upper_expr_mutator(Node *node, fix_upper_expr_context *context) elog(ERROR, "variable not found in subplan target list"); return (Node *) newvar; } + if (IsA(node, Aggref) && context->partial_agg) + { + TargetEntry *tle; + Aggref *aggref = (Aggref*)node; + List *args = NIL; + + tle = tlist_member(node, context->subplan_itlist->tlist); + if (tle) + { + /* Found a matching subplan output expression */ + Var *newvar; + TargetEntry *newtle; + + newvar = makeVarFromTargetEntry(context->newvarno, tle); + newvar->varnoold = 0; /* wasn't ever a plain Var */ + newvar->varoattno = 0; + + /* makeTargetEntry ,always set resno to one for finialize agg */ + newtle = makeTargetEntry((Expr*)newvar,1,NULL,false); + args = lappend(args,newtle); + + /* + * Updated the args, let the newvar refer to the right position of + * the agg function in the subplan + */ + aggref->args = args; + + return (Node *) aggref; + } + } if (IsA(node, PlaceHolderVar)) { PlaceHolderVar *phv = (PlaceHolderVar *) node; @@ -2432,3 +2484,123 @@ extract_query_dependencies_walker(Node *node, PlannerInfo *context) return expression_tree_walker(node, extract_query_dependencies_walker, (void *) context); } + +/* + * set_agg_references + * Update the targetlist and quals of an upper-level plan node + * to refer to the tuples returned by its lefttree subplan. + * Also perform opcode lookup for these expressions, and + * add regclass OIDs to root->glob->relationOids. + * + * This is used for single-input plan types like Agg, Group, Result. + * + * In most cases, we have to match up individual Vars in the tlist and + * qual expressions with elements of the subplan's tlist (which was + * generated by flatten_tlist() from these selfsame expressions, so it + * should have all the required variables). There is an important exception, + * however: GROUP BY and ORDER BY expressions will have been pushed into the + * subplan tlist unflattened. If these values are also needed in the output + * then we want to reference the subplan tlist element rather than recomputing + * the expression. + */ +static void +set_agg_references(PlannerInfo *root, Plan *plan, int rtoffset) +{ + Agg *agg = (Agg*)plan; + Plan *subplan = plan->lefttree; + indexed_tlist *subplan_itlist; + List *output_targetlist; + ListCell *l; + + if (!agg->combineStates) + return set_upper_references(root, plan, rtoffset); + + /* + * For partial aggregation we must adjust the return types of + * the Aggrefs + */ + if (!agg->finalizeAggs) + set_partialagg_aggref_types(root, plan); + + subplan_itlist = build_tlist_index(subplan->targetlist); + + output_targetlist = NIL; + + if(agg->combineStates) + { + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + Node *newexpr; + + /* If it's a non-Var sort/group item, first try to match by sortref */ + if (tle->ressortgroupref != 0 && !IsA(tle->expr, Var)) + { + newexpr = (Node *) + search_indexed_tlist_for_sortgroupref((Node *) tle->expr, + tle->ressortgroupref, + subplan_itlist, + OUTER_VAR); + if (!newexpr) + newexpr = fix_upper_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset, + true); + } + else + newexpr = fix_upper_expr(root, + (Node *) tle->expr, + subplan_itlist, + OUTER_VAR, + rtoffset, + true); + tle = flatCopyTargetEntry(tle); + tle->expr = (Expr *) newexpr; + output_targetlist = lappend(output_targetlist, tle); + } + } + + plan->targetlist = output_targetlist; + + plan->qual = (List *) + fix_upper_expr(root, + (Node *) plan->qual, + subplan_itlist, + OUTER_VAR, + rtoffset, + false); + + pfree(subplan_itlist); +} + +/* XXX is this really the best place and way to do this? */ +static void +set_partialagg_aggref_types(PlannerInfo *root, Plan *plan) +{ + ListCell *l; + + foreach(l, plan->targetlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(l); + + if (IsA(tle->expr, Aggref)) + { + Aggref *aggref = (Aggref *) tle->expr; + HeapTuple aggTuple; + Form_pg_aggregate aggform; + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + + aggref->aggtype = aggform->aggtranstype; + + ReleaseSysCache(aggTuple); + } + } +} diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index ace8b38..a00259b 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -93,6 +93,7 @@ typedef struct bool allow_restricted; } has_parallel_hazard_arg; +static bool partial_aggregate_walker(Node *node, void *context); static bool contain_agg_clause_walker(Node *node, void *context); static bool count_agg_clauses_walker(Node *node, count_agg_clauses_context *context); @@ -400,6 +401,64 @@ make_ands_implicit(Expr *clause) *****************************************************************************/ /* + * aggregates_allow_partial + * Recursively search for Aggref clauses and determine if each of them + * support partial aggregation. Partial aggregation requires that the + * aggregate does not have a DISTINCT or ORDER BY clause, and that it also + * has a combine function set. Returns true if all found Aggrefs support + * partial aggregation and false if any don't. + */ +bool +aggregates_allow_partial(Node *clause) +{ + if (!partial_aggregate_walker(clause, NULL)) + return true; + return false; +} + +/* + * partial_aggregate_walker + * Walker function for aggregates_allow_partial. Returns false if all + * aggregates support partial aggregation and true if any don't. + */ +static bool +partial_aggregate_walker(Node *node, void *context) +{ + if (node == NULL) + return false; + if (IsA(node, Aggref)) + { + Aggref *aggref = (Aggref *) node; + HeapTuple aggTuple; + Oid aggcombinefn; + Form_pg_aggregate aggform; + + Assert(aggref->agglevelsup == 0); + + /* can't combine aggs with DISTINCT or ORDER BY */ + if (aggref->aggdistinct || aggref->aggorder) + return true; /* abort search */ + + aggTuple = SearchSysCache1(AGGFNOID, + ObjectIdGetDatum(aggref->aggfnoid)); + if (!HeapTupleIsValid(aggTuple)) + elog(ERROR, "cache lookup failed for aggregate %u", + aggref->aggfnoid); + aggform = (Form_pg_aggregate) GETSTRUCT(aggTuple); + aggcombinefn = aggform->aggcombinefn; + ReleaseSysCache(aggTuple); + + /* Do we have a combine function? */ + if (!OidIsValid(aggcombinefn)) + return true; /* abort search */ + + return false; /* continue searching */ + } + return expression_tree_walker(node, partial_aggregate_walker, + (void *) context); +} + +/* * contain_agg_clause * Recursively search for Aggref/GroupingFunc nodes within a clause. * diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 38ba82f..51400b2 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -828,6 +828,15 @@ static struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, { + {"enable_parallelagg", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel agg plans."), + NULL + }, + &enable_parallelagg, + true, + NULL, NULL, NULL + }, + { {"enable_material", PGC_USERSET, QUERY_TUNING_METHOD, gettext_noop("Enables the planner's use of materialization."), NULL diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 3b3fd0f..fc86b38 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -47,6 +47,7 @@ extern Node *make_and_qual(Node *qual1, Node *qual2); extern Expr *make_ands_explicit(List *andclauses); extern List *make_ands_implicit(Expr *clause); +extern bool aggregates_allow_partial(Node *clause); extern bool contain_agg_clause(Node *clause); extern void count_agg_clauses(PlannerInfo *root, Node *clause, AggClauseCosts *costs); diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 78c7cae..0ab043a 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -62,6 +62,7 @@ extern bool enable_bitmapscan; extern bool enable_tidscan; extern bool enable_sort; extern bool enable_hashagg; +extern bool enable_parallelagg; extern bool enable_nestloop; extern bool enable_material; extern bool enable_mergejoin;