From 6cb85d8121caf8698948d4004e9639e1a1f232db Mon Sep 17 00:00:00 2001 From: Pengzhou Tang Date: Wed, 11 Mar 2020 23:13:44 -0400 Subject: [PATCH 4/5] Reorganise the aggregate phases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit is a preparing step to support parallel grouping sets. When planning, PG used to organize the grouping sets in [HASHED] -> [SORTED] order which means HASHED aggregates were always located before SORTED aggregate, when initializing AGG node, PG also organized the aggregate phases in [HASHED]->[SORTED] order, all HASHED grouping sets were squeezed to the phase 0, when executing AGG node, if followed AGG_SORTED or AGG_MIXED strategy, the executor will start from phase1 -> phases2-> phases3 then phase0 if it is an AGG_MIXED strategy. This bothers a lot when adding the support for parallel grouping sets, firstly, we need complicated logic to locate the first sort rollup/phase and handle the special order for a different strategy in many places, Secondly, squeezing all hashed grouping sets to phase 0 is not working for parallel grouping sets, we can not put all hash transition functions to one expression state in the final stage. This commit organizes the grouping sets in a more natural order: [SORTED]->[HASHED] and the HASHED sets are no longer squeezed to a single phase, we use another way to put all hash transitions to the first phase's expression state, the executor now starts execution from phase0 for all strategies. This commit also move 'sort_in' from AggState to AggStatePerPhase* structure, this helps to handle more complicated cases when parallel groupingsets is introduced, we might need to add a tuplestore 'store_in' to store partial aggregates results for PLAIN sets then. This commit also make the hash spill refill logic clear and avoid using nullcheck when refilling the hashtable. --- contrib/postgres_fdw/expected/postgres_fdw.out | 4 +- src/backend/commands/explain.c | 2 +- src/backend/executor/execExpr.c | 57 +- src/backend/executor/execExprInterp.c | 30 +- src/backend/executor/nodeAgg.c | 946 +++++++++++----------- src/backend/jit/llvm/llvmjit_expr.c | 51 +- src/backend/optimizer/plan/createplan.c | 29 +- src/backend/optimizer/plan/planner.c | 9 +- src/backend/optimizer/util/pathnode.c | 65 +- src/include/executor/execExpr.h | 5 +- src/include/executor/executor.h | 2 +- src/include/executor/nodeAgg.h | 34 +- src/include/nodes/execnodes.h | 25 +- src/test/regress/expected/groupingsets.out | 40 +- src/test/regress/expected/partition_aggregate.out | 2 +- 15 files changed, 655 insertions(+), 646 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 62c2697920..fc0ed2f4d5 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -3448,8 +3448,8 @@ select c2, sum(c1) from ft1 where c2 < 3 group by rollup(c2) order by 1 nulls la Sort Key: ft1.c2 -> MixedAggregate Output: c2, sum(c1) - Hash Key: ft1.c2 Group Key: () + Hash Key: ft1.c2 -> Foreign Scan on public.ft1 Output: c2, c1 Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE ((c2 < 3)) @@ -3473,8 +3473,8 @@ select c2, sum(c1) from ft1 where c2 < 3 group by cube(c2) order by 1 nulls last Sort Key: ft1.c2 -> MixedAggregate Output: c2, sum(c1) - Hash Key: ft1.c2 Group Key: () + Hash Key: ft1.c2 -> Foreign Scan on public.ft1 Output: c2, c1 Remote SQL: SELECT "C 1", c2 FROM "S 1"."T 1" WHERE ((c2 < 3)) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 8c82d6ea95..4dec889f77 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -2319,7 +2319,7 @@ show_grouping_set_keys(PlanState *planstate, const char *keyname; const char *keysetname; - if (aggnode->aggstrategy == AGG_HASHED || aggnode->aggstrategy == AGG_MIXED) + if (aggnode->aggstrategy == AGG_HASHED) { keyname = "Hash Key"; keysetname = "Hash Keys"; diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index 1370ffec50..3533f5ccc8 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -80,7 +80,7 @@ static void ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, AggStatePerTrans pertrans, - int transno, int setno, int setoff, bool ishash, + int transno, int setno, AggStatePerPhase perphase, bool nullcheck); @@ -2931,13 +2931,13 @@ ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, * the array of AggStatePerGroup, and skip evaluation if so. */ ExprState * -ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, - bool doSort, bool doHash, bool nullcheck) +ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, bool nullcheck, bool allow_concurrent_hashing) { ExprState *state = makeNode(ExprState); PlanState *parent = &aggstate->ss.ps; ExprEvalStep scratch = {0}; bool isCombine = DO_AGGSPLIT_COMBINE(aggstate->aggsplit); + ListCell *lc; LastAttnumInfo deform = {0, 0, 0}; state->expr = (Expr *) aggstate; @@ -2978,6 +2978,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, NullableDatum *strictargs = NULL; bool *strictnulls = NULL; int argno; + int setno; ListCell *bail; /* @@ -3155,37 +3156,27 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, * grouping set). Do so for both sort and hash based computations, as * applicable. */ - if (doSort) + for (setno = 0; setno < phase->numsets; setno++) { - int processGroupingSets = Max(phase->numsets, 1); - int setoff = 0; - - for (int setno = 0; setno < processGroupingSets; setno++) - { - ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, - pertrans, transno, setno, setoff, false, - nullcheck); - setoff++; - } + ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, + pertrans, transno, setno, phase, nullcheck); } - if (doHash) + /* + * Call transition function for HASHED aggs that can be + * advanced concurrently. + */ + if (allow_concurrent_hashing && + phase->concurrent_hashes) { - int numHashes = aggstate->num_hashes; - int setoff; - - /* in MIXED mode, there'll be preceding transition values */ - if (aggstate->aggstrategy != AGG_HASHED) - setoff = aggstate->maxsets; - else - setoff = 0; - - for (int setno = 0; setno < numHashes; setno++) + foreach(lc, phase->concurrent_hashes) { + AggStatePerPhaseHash perhash = (AggStatePerPhaseHash) lfirst(lc); + ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, - pertrans, transno, setno, setoff, true, + pertrans, transno, 0, + (AggStatePerPhase) perhash, nullcheck); - setoff++; } } @@ -3234,14 +3225,17 @@ static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, AggStatePerTrans pertrans, - int transno, int setno, int setoff, bool ishash, + int transno, int setno, AggStatePerPhase perphase, bool nullcheck) { ExprContext *aggcontext; int adjust_jumpnull = -1; - if (ishash) + if (perphase->is_hashed) + { + Assert(setno == 0); aggcontext = aggstate->hashcontext; + } else aggcontext = aggstate->aggcontexts[setno]; @@ -3249,9 +3243,10 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate, if (nullcheck) { scratch->opcode = EEOP_AGG_PLAIN_PERGROUP_NULLCHECK; - scratch->d.agg_plain_pergroup_nullcheck.setoff = setoff; + scratch->d.agg_plain_pergroup_nullcheck.pergroups = perphase->pergroups; /* adjust later */ scratch->d.agg_plain_pergroup_nullcheck.jumpnull = -1; + scratch->d.agg_plain_pergroup_nullcheck.setno = setno; ExprEvalPushStep(state, scratch); adjust_jumpnull = state->steps_len - 1; } @@ -3319,7 +3314,7 @@ ExecBuildAggTransCall(ExprState *state, AggState *aggstate, scratch->d.agg_trans.pertrans = pertrans; scratch->d.agg_trans.setno = setno; - scratch->d.agg_trans.setoff = setoff; + scratch->d.agg_trans.pergroups = perphase->pergroups; scratch->d.agg_trans.transno = transno; scratch->d.agg_trans.aggcontext = aggcontext; ExprEvalPushStep(state, scratch); diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 113ed1547c..b0dbba4e55 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -1610,9 +1610,9 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_CASE(EEOP_AGG_PLAIN_PERGROUP_NULLCHECK) { - AggState *aggstate = castNode(AggState, state->parent); - AggStatePerGroup pergroup_allaggs = aggstate->all_pergroups - [op->d.agg_plain_pergroup_nullcheck.setoff]; + AggStatePerGroup pergroup_allaggs = + op->d.agg_plain_pergroup_nullcheck.pergroups + [op->d.agg_plain_pergroup_nullcheck.setno]; if (pergroup_allaggs == NULL) EEO_JUMP(op->d.agg_plain_pergroup_nullcheck.jumpnull); @@ -1636,8 +1636,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(pertrans->transtypeByVal); @@ -1665,8 +1665,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(pertrans->transtypeByVal); @@ -1684,8 +1684,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(pertrans->transtypeByVal); @@ -1702,8 +1702,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(!pertrans->transtypeByVal); @@ -1724,8 +1724,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(!pertrans->transtypeByVal); @@ -1742,8 +1742,8 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) { AggState *aggstate = castNode(AggState, state->parent); AggStatePerTrans pertrans = op->d.agg_trans.pertrans; - AggStatePerGroup pergroup = &aggstate->all_pergroups - [op->d.agg_trans.setoff] + AggStatePerGroup pergroup = &op->d.agg_trans.pergroups + [op->d.agg_trans.setno] [op->d.agg_trans.transno]; Assert(!pertrans->transtypeByVal); diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index 908c2980b8..8a8b49547b 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -250,6 +250,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "nodes/pathnodes.h" #include "optimizer/optimizer.h" #include "parser/parse_agg.h" #include "parser/parse_coerce.h" @@ -348,7 +349,7 @@ typedef struct HashAggSpill */ typedef struct HashAggBatch { - int setno; /* grouping set */ + int phaseidx; /* phase that own this batch */ int used_bits; /* number of bits of hash already used */ LogicalTapeSet *tapeset; /* borrowed reference to tape set */ int input_tapenum; /* input partition tape */ @@ -379,7 +380,7 @@ static void finalize_partialaggregate(AggState *aggstate, AggStatePerAgg peragg, AggStatePerGroup pergroupstate, Datum *resultVal, bool *resultIsNull); -static void prepare_hash_slot(AggState *aggstate); +static void prepare_hash_slot(AggState *aggstate, AggStatePerPhaseHash perhash); static void prepare_projection_slot(AggState *aggstate, TupleTableSlot *slot, int currentSet); @@ -390,9 +391,9 @@ static TupleTableSlot *project_aggregates(AggState *aggstate); static Bitmapset *find_unaggregated_cols(AggState *aggstate); static bool find_unaggregated_cols_walker(Node *node, Bitmapset **colnos); static void build_hash_tables(AggState *aggstate); -static void build_hash_table(AggState *aggstate, int setno, long nbuckets); -static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, - bool nullcheck); +static void build_hash_table(AggState *aggstate, + AggStatePerPhaseHash perhash, long nbuckets); +static void hashagg_recompile_expressions(AggState *aggstate); static long hash_choose_num_buckets(double hashentrysize, long estimated_nbuckets, Size memory); @@ -400,12 +401,16 @@ static int hash_choose_num_partitions(uint64 input_groups, double hashentrysize, int used_bits, int *log2_npartittions); -static AggStatePerGroup lookup_hash_entry(AggState *aggstate, uint32 hash); -static void lookup_hash_entries(AggState *aggstate); +static AggStatePerGroup lookup_hash_entry(AggState *aggstate, + AggStatePerPhaseHash perhash, + uint32 hash); +static void lookup_hash_entries(AggState *aggstate, + AggStatePerPhaseHash perhash, + List *perhashes); static TupleTableSlot *agg_retrieve_direct(AggState *aggstate); -static void agg_sort_input(AggState *aggstate); static void agg_fill_hash_table(AggState *aggstate); static bool agg_refill_hash_table(AggState *aggstate); +static void agg_sort_input(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table(AggState *aggstate); static TupleTableSlot *agg_retrieve_hash_table_in_memory(AggState *aggstate); static void hash_agg_check_limits(AggState *aggstate); @@ -415,7 +420,7 @@ static void hash_agg_update_metrics(AggState *aggstate, bool from_tape, static void hashagg_finish_initial_spills(AggState *aggstate); static void hashagg_reset_spill_state(AggState *aggstate); static HashAggBatch *hashagg_batch_new(LogicalTapeSet *tapeset, - int input_tapenum, int setno, + int input_tapenum, int phaseidx, int64 input_tuples, int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, @@ -424,7 +429,7 @@ static void hashagg_spill_init(HashAggSpill *spill, HashTapeInfo *tapeinfo, static Size hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash); static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, - int setno); + int phaseidx); static void hashagg_tapeinfo_init(AggState *aggstate); static void hashagg_tapeinfo_assign(HashTapeInfo *tapeinfo, int *dest, int ndest); @@ -458,7 +463,10 @@ select_current_set(AggState *aggstate, int setno, bool is_hash) * ExecAggPlainTransByRef(). */ if (is_hash) + { + Assert(setno == 0); aggstate->curaggcontext = aggstate->hashcontext; + } else aggstate->curaggcontext = aggstate->aggcontexts[setno]; @@ -466,72 +474,73 @@ select_current_set(AggState *aggstate, int setno, bool is_hash) } /* - * Switch to phase "newphase", which must either be 0 or 1 (to reset) or + * Switch to phase "newphase", which must either be 0 (to reset) or * current_phase + 1. Juggle the tuplesorts accordingly. - * - * Phase 0 is for hashing, which we currently handle last in the AGG_MIXED - * case, so when entering phase 0, all we need to do is drop open sorts. */ static void initialize_phase(AggState *aggstate, int newphase) { - Assert(newphase <= 1 || newphase == aggstate->current_phase + 1); + AggStatePerPhase current_phase; + AggStatePerPhaseSort persort; + + /* Don't use aggstate->phase here, it might not be initialized yet*/ + current_phase = aggstate->phases[aggstate->current_phase]; /* * Whatever the previous state, we're now done with whatever input - * tuplesort was in use. + * tuplesort was in use, cleanup them. + * + * Note: we keep the first tuplesort/tuplestore, this will benifit the + * rescan in some cases without resorting the input again. */ - if (aggstate->sort_in) + if (!current_phase->is_hashed && aggstate->current_phase > 0) { - tuplesort_end(aggstate->sort_in); - aggstate->sort_in = NULL; - } - - if (newphase <= 1) - { - /* - * Discard any existing output tuplesort. - */ - if (aggstate->sort_out) + persort = (AggStatePerPhaseSort) current_phase; + if (persort->sort_in) { - tuplesort_end(aggstate->sort_out); - aggstate->sort_out = NULL; + tuplesort_end(persort->sort_in); + persort->sort_in = NULL; } } - else - { - /* - * The old output tuplesort becomes the new input one, and this is the - * right time to actually sort it. - */ - aggstate->sort_in = aggstate->sort_out; - aggstate->sort_out = NULL; - Assert(aggstate->sort_in); - tuplesort_performsort(aggstate->sort_in); - } + + /* advance to next phase */ + aggstate->current_phase = newphase; + aggstate->phase = aggstate->phases[newphase]; + + if (aggstate->phase->is_hashed) + return; + + /* New phase is not hashed */ + persort = (AggStatePerPhaseSort) aggstate->phase; + + /* This is the right time to actually sort it. */ + if (persort->sort_in) + tuplesort_performsort(persort->sort_in); /* - * If this isn't the last phase, we need to sort appropriately for the + * If copy_out is set, we need to sort appropriately for the * next phase in sequence. */ - if (newphase > 0 && newphase < aggstate->numphases - 1) + if (persort->copy_out) { - Sort *sortnode = (Sort *) aggstate->phases[newphase + 1].aggnode->sortnode; - PlanState *outerNode = outerPlanState(aggstate); - TupleDesc tupDesc = ExecGetResultType(outerNode); - - aggstate->sort_out = tuplesort_begin_heap(tupDesc, - sortnode->numCols, - sortnode->sortColIdx, - sortnode->sortOperators, - sortnode->collations, - sortnode->nullsFirst, - work_mem, - NULL, false); + AggStatePerPhaseSort next = + (AggStatePerPhaseSort) aggstate->phases[newphase + 1]; + Sort *sortnode = (Sort *) next->phasedata.aggnode->sortnode; + PlanState *outerNode = outerPlanState(aggstate); + TupleDesc tupDesc = ExecGetResultType(outerNode); + + Assert(!next->phasedata.is_hashed); + + if (!next->sort_in) + next->sort_in = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + NULL, false); } - - aggstate->current_phase = newphase; - aggstate->phase = &aggstate->phases[newphase]; } /* @@ -546,12 +555,16 @@ static TupleTableSlot * fetch_input_tuple(AggState *aggstate) { TupleTableSlot *slot; + AggStatePerPhaseSort current_phase; + + Assert(!aggstate->phase->is_hashed); + current_phase = (AggStatePerPhaseSort) aggstate->phase; - if (aggstate->sort_in) + if (current_phase->sort_in) { /* make sure we check for interrupts in either path through here */ CHECK_FOR_INTERRUPTS(); - if (!tuplesort_gettupleslot(aggstate->sort_in, true, false, + if (!tuplesort_gettupleslot(current_phase->sort_in, true, false, aggstate->sort_slot, NULL)) return NULL; slot = aggstate->sort_slot; @@ -559,8 +572,13 @@ fetch_input_tuple(AggState *aggstate) else slot = ExecProcNode(outerPlanState(aggstate)); - if (!TupIsNull(slot) && aggstate->sort_out) - tuplesort_puttupleslot(aggstate->sort_out, slot); + if (!TupIsNull(slot) && current_phase->copy_out) + { + AggStatePerPhaseSort next = + (AggStatePerPhaseSort) aggstate->phases[aggstate->current_phase + 1]; + Assert(!next->phasedata.is_hashed); + tuplesort_puttupleslot(next->sort_in, slot); + } return slot; } @@ -666,7 +684,7 @@ initialize_aggregates(AggState *aggstate, int numReset) { int transno; - int numGroupingSets = Max(aggstate->phase->numsets, 1); + int numGroupingSets = aggstate->phase->numsets; int setno = 0; int numTrans = aggstate->numtrans; AggStatePerTrans transstates = aggstate->pertrans; @@ -1194,10 +1212,9 @@ finalize_partialaggregate(AggState *aggstate, * hashslot. This is necessary to compute the hash or perform a lookup. */ static void -prepare_hash_slot(AggState *aggstate) +prepare_hash_slot(AggState *aggstate, AggStatePerPhaseHash perhash) { TupleTableSlot *inputslot = aggstate->tmpcontext->ecxt_outertuple; - AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; TupleTableSlot *hashslot = perhash->hashslot; int i; @@ -1431,29 +1448,33 @@ find_unaggregated_cols_walker(Node *node, Bitmapset **colnos) static void build_hash_tables(AggState *aggstate) { - int setno; + int phaseidx; - for (setno = 0; setno < aggstate->num_hashes; ++setno) + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { - AggStatePerHash perhash = &aggstate->perhash[setno]; + AggStatePerPhaseHash perhash; + AggStatePerPhase phase = aggstate->phases[phaseidx]; long nbuckets; Size memory; + if (!phase->is_hashed) + continue; + + perhash = (AggStatePerPhaseHash) phase; + if (perhash->hashtable != NULL) { ResetTupleHashTable(perhash->hashtable); continue; } - Assert(perhash->aggnode->numGroups > 0); - memory = aggstate->hash_mem_limit / aggstate->num_hashes; /* choose reasonable number of buckets per hashtable */ nbuckets = hash_choose_num_buckets( - aggstate->hashentrysize, perhash->aggnode->numGroups, memory); + aggstate->hashentrysize, phase->aggnode->numGroups, memory); - build_hash_table(aggstate, setno, nbuckets); + build_hash_table(aggstate, perhash, nbuckets); } aggstate->hash_ngroups_current = 0; @@ -1463,9 +1484,8 @@ build_hash_tables(AggState *aggstate) * Build a single hashtable for this grouping set. */ static void -build_hash_table(AggState *aggstate, int setno, long nbuckets) +build_hash_table(AggState *aggstate, AggStatePerPhaseHash perhash, long nbuckets) { - AggStatePerHash perhash = &aggstate->perhash[setno]; MemoryContext metacxt = aggstate->hash_metacxt; MemoryContext hashcxt = aggstate->hashcontext->ecxt_per_tuple_memory; MemoryContext tmpcxt = aggstate->tmpcontext->ecxt_per_tuple_memory; @@ -1489,7 +1509,7 @@ build_hash_table(AggState *aggstate, int setno, long nbuckets) perhash->hashGrpColIdxHash, perhash->eqfuncoids, perhash->hashfunctions, - perhash->aggnode->grpCollations, + perhash->phasedata.aggnode->grpCollations, nbuckets, additionalsize, metacxt, @@ -1528,23 +1548,29 @@ find_hash_columns(AggState *aggstate) { Bitmapset *base_colnos; List *outerTlist = outerPlanState(aggstate)->plan->targetlist; - int numHashes = aggstate->num_hashes; EState *estate = aggstate->ss.ps.state; int j; /* Find Vars that will be needed in tlist and qual */ base_colnos = find_unaggregated_cols(aggstate); - for (j = 0; j < numHashes; ++j) + for (j = 0; j < aggstate->numphases; ++j) { - AggStatePerHash perhash = &aggstate->perhash[j]; + AggStatePerPhase perphase = aggstate->phases[j]; + AggStatePerPhaseHash perhash; Bitmapset *colnos = bms_copy(base_colnos); - AttrNumber *grpColIdx = perhash->aggnode->grpColIdx; + Bitmapset *grouped_cols = perphase->grouped_cols[0]; + AttrNumber *grpColIdx = perphase->aggnode->grpColIdx; List *hashTlist = NIL; + ListCell *lc; TupleDesc hashDesc; int maxCols; int i; + if (!perphase->is_hashed) + continue; + + perhash = (AggStatePerPhaseHash) perphase; perhash->largestGrpColIdx = 0; /* @@ -1554,18 +1580,12 @@ find_hash_columns(AggState *aggstate) * there'd be no point storing them. Use prepare_projection_slot's * logic to determine which. */ - if (aggstate->phases[0].grouped_cols) + foreach(lc, aggstate->all_grouped_cols) { - Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[j]; - ListCell *lc; + int attnum = lfirst_int(lc); - foreach(lc, aggstate->all_grouped_cols) - { - int attnum = lfirst_int(lc); - - if (!bms_is_member(attnum, grouped_cols)) - colnos = bms_del_member(colnos, attnum); - } + if (!bms_is_member(attnum, grouped_cols)) + colnos = bms_del_member(colnos, attnum); } /* @@ -1621,7 +1641,7 @@ find_hash_columns(AggState *aggstate) hashDesc = ExecTypeFromTL(hashTlist); execTuplesHashPrepare(perhash->numCols, - perhash->aggnode->grpOperators, + perphase->aggnode->grpOperators, &perhash->eqfuncoids, &perhash->hashfunctions); perhash->hashslot = @@ -1668,28 +1688,46 @@ hash_agg_entry_size(int numAggs, Size tupleWidth, Size transitionSpace) * expressions in the AggStatePerPhase, and reuse when appropriate. */ static void -hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) +hashagg_recompile_expressions(AggState *aggstate) { - AggStatePerPhase phase; - int i = minslot ? 1 : 0; - int j = nullcheck ? 1 : 0; + AggStatePerPhase phase = aggstate->phase; Assert(aggstate->aggstrategy == AGG_HASHED || aggstate->aggstrategy == AGG_MIXED); - if (aggstate->aggstrategy == AGG_HASHED) - phase = &aggstate->phases[0]; - else /* AGG_MIXED */ - phase = &aggstate->phases[1]; - - if (phase->evaltrans_cache[i][j] == NULL) + if (phase->evaltrans_cache[aggstate->evaltrans_mode] == NULL) { const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops; - bool outerfixed = aggstate->ss.ps.outeropsfixed; - bool dohash = true; - bool dosort; + bool outerfixed = aggstate->ss.ps.outeropsfixed; + bool minslot = false; + int nullcheck = false; + int allow_concurrent_hashing = true; - dosort = aggstate->aggstrategy == AGG_MIXED ? true : false; + /* + * we are refilling the hash table and we disallow concurrent hashing + * within transition expression because we refill the hash tables one + * set by one set, this can avoid unnecessary nullcheck, meanwhile, we + * get tuple from spill file, so it is a MinimalTuple. + */ + if (aggstate->evaltrans_mode == HASHREFILLMODE) + { + minslot = true; + nullcheck = false; + allow_concurrent_hashing = false; + } + /* + * we entred the spill mode, the concurrent hashing still works in this + * mode, but some grouping sets need to put the tuple into spill files + * and their pergroup states will be NULL, so we need add nullcheck. + * HASHFILLINGSPILL is only set in the first phase, so we used the + * outer slot and minslot should be false. + */ + else if (aggstate->evaltrans_mode == HASHSPILLMODE) + { + minslot = false; + nullcheck = true; + allow_concurrent_hashing = true; + } /* temporarily change the outerops while compiling the expression */ if (minslot) @@ -1698,15 +1736,15 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) aggstate->ss.ps.outeropsfixed = true; } - phase->evaltrans_cache[i][j] = ExecBuildAggTrans( - aggstate, phase, dosort, dohash, nullcheck); + phase->evaltrans_cache[aggstate->evaltrans_mode] = + ExecBuildAggTrans(aggstate, phase, nullcheck, allow_concurrent_hashing); /* change back */ aggstate->ss.ps.outerops = outerops; aggstate->ss.ps.outeropsfixed = outerfixed; } - phase->evaltrans = phase->evaltrans_cache[i][j]; + phase->evaltrans = phase->evaltrans_cache[aggstate->evaltrans_mode]; } /* @@ -1803,29 +1841,22 @@ static void hash_agg_enter_spill_mode(AggState *aggstate) { aggstate->hash_spill_mode = true; - hashagg_recompile_expressions(aggstate, aggstate->table_filled, true); + + /* if table_filled is true, we must be refilling the hash table */ + if (aggstate->table_filled) + aggstate->evaltrans_mode = HASHREFILLMODE; + else + aggstate->evaltrans_mode = HASHSPILLMODE; + + hashagg_recompile_expressions(aggstate); if (!aggstate->hash_ever_spilled) { Assert(aggstate->hash_tapeinfo == NULL); - Assert(aggstate->hash_spills == NULL); aggstate->hash_ever_spilled = true; hashagg_tapeinfo_init(aggstate); - - aggstate->hash_spills = palloc( - sizeof(HashAggSpill) * aggstate->num_hashes); - - for (int setno = 0; setno < aggstate->num_hashes; setno++) - { - AggStatePerHash perhash = &aggstate->perhash[setno]; - HashAggSpill *spill = &aggstate->hash_spills[setno]; - - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, - perhash->aggnode->numGroups, - aggstate->hashentrysize); - } } } @@ -1977,9 +2008,8 @@ hash_choose_num_partitions(uint64 input_groups, double hashentrysize, * the current grouping set, return NULL and the caller will spill it to disk. */ static AggStatePerGroup -lookup_hash_entry(AggState *aggstate, uint32 hash) +lookup_hash_entry(AggState *aggstate, AggStatePerPhaseHash perhash, uint32 hash) { - AggStatePerHash perhash = &aggstate->perhash[aggstate->current_set]; TupleTableSlot *hashslot = perhash->hashslot; TupleHashEntryData *entry; bool isnew = false; @@ -2043,33 +2073,41 @@ lookup_hash_entry(AggState *aggstate, uint32 hash) * efficient. */ static void -lookup_hash_entries(AggState *aggstate) +lookup_hash_entries(AggState *aggstate, AggStatePerPhaseHash perhash, + List *concurrent_hashes) { - AggStatePerGroup *pergroup = aggstate->hash_pergroup; - int setno; + ListCell *lc; + List *all_hashes = perhash ? list_make1(perhash) : NIL; + + all_hashes = list_concat(all_hashes, concurrent_hashes); - for (setno = 0; setno < aggstate->num_hashes; setno++) + foreach (lc, all_hashes) { - AggStatePerHash perhash = &aggstate->perhash[setno]; uint32 hash; + AggStatePerPhaseHash perhash = (AggStatePerPhaseHash) lfirst(lc); - select_current_set(aggstate, setno, true); - prepare_hash_slot(aggstate); + select_current_set(aggstate, 0, true); + prepare_hash_slot(aggstate, perhash); hash = TupleHashTableHash(perhash->hashtable, perhash->hashslot); - pergroup[setno] = lookup_hash_entry(aggstate, hash); + perhash->phasedata.pergroups[0] = lookup_hash_entry(aggstate, perhash, hash); /* check to see if we need to spill the tuple for this grouping set */ - if (pergroup[setno] == NULL) + if (perhash->phasedata.pergroups[0] == NULL) { - HashAggSpill *spill = &aggstate->hash_spills[setno]; TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; - if (spill->partitions == NULL) - hashagg_spill_init(spill, aggstate->hash_tapeinfo, 0, - perhash->aggnode->numGroups, + if (perhash->hash_spill == NULL) + perhash->hash_spill = palloc0(sizeof(HashAggSpill)); + + if (perhash->hash_spill->partitions == NULL) + hashagg_spill_init(perhash->hash_spill, + aggstate->hash_tapeinfo, 0, + perhash->phasedata.aggnode->numGroups, aggstate->hashentrysize); - hashagg_spill_tuple(spill, slot, hash); + hashagg_spill_tuple(perhash->hash_spill, + slot, + hash); } } } @@ -2103,12 +2141,11 @@ ExecAgg(PlanState *pstate) case AGG_HASHED: if (!node->table_filled) agg_fill_hash_table(node); - /* FALLTHROUGH */ - case AGG_MIXED: result = agg_retrieve_hash_table(node); break; case AGG_PLAIN: case AGG_SORTED: + case AGG_MIXED: if (!node->input_sorted) agg_sort_input(node); result = agg_retrieve_direct(node); @@ -2136,8 +2173,8 @@ agg_retrieve_direct(AggState *aggstate) TupleTableSlot *outerslot; TupleTableSlot *firstSlot; TupleTableSlot *result; - bool hasGroupingSets = aggstate->phase->numsets > 0; - int numGroupingSets = Max(aggstate->phase->numsets, 1); + bool hasGroupingSets = aggstate->phase->aggnode->groupingSets != NULL; + int numGroupingSets = aggstate->phase->numsets; int currentSet; int nextSetSize; int numReset; @@ -2154,7 +2191,7 @@ agg_retrieve_direct(AggState *aggstate) tmpcontext = aggstate->tmpcontext; peragg = aggstate->peragg; - pergroups = aggstate->pergroups; + pergroups = aggstate->phase->pergroups; firstSlot = aggstate->ss.ss_ScanTupleSlot; /* @@ -2212,25 +2249,35 @@ agg_retrieve_direct(AggState *aggstate) { if (aggstate->current_phase < aggstate->numphases - 1) { + /* Advance to the next phase */ initialize_phase(aggstate, aggstate->current_phase + 1); - aggstate->input_done = false; - aggstate->projected_set = -1; - numGroupingSets = Max(aggstate->phase->numsets, 1); - node = aggstate->phase->aggnode; - numReset = numGroupingSets; - } - else if (aggstate->aggstrategy == AGG_MIXED) - { - /* - * Mixed mode; we've output all the grouped stuff and have - * full hashtables, so switch to outputting those. - */ - initialize_phase(aggstate, 0); - aggstate->table_filled = true; - ResetTupleHashIterator(aggstate->perhash[0].hashtable, - &aggstate->perhash[0].hashiter); - select_current_set(aggstate, 0, true); - return agg_retrieve_hash_table(aggstate); + + /* Check whether new phase is an AGG_HASHED */ + if (!aggstate->phase->is_hashed) + { + aggstate->input_done = false; + aggstate->projected_set = -1; + numGroupingSets = aggstate->phase->numsets; + node = aggstate->phase->aggnode; + numReset = numGroupingSets; + pergroups = aggstate->phase->pergroups; + } + else + { + AggStatePerPhaseHash perhash = (AggStatePerPhaseHash) aggstate->phase; + /* finalize any spills */ + hashagg_finish_initial_spills(aggstate); + + + /* + * Mixed mode; we've output all the grouped stuff and have + * full hashtables, so switch to outputting those. + */ + aggstate->table_filled = true; + ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); + select_current_set(aggstate, 0, true); + return agg_retrieve_hash_table(aggstate); + } } else { @@ -2269,11 +2316,11 @@ agg_retrieve_direct(AggState *aggstate) */ tmpcontext->ecxt_innertuple = econtext->ecxt_outertuple; if (aggstate->input_done || - (node->aggstrategy != AGG_PLAIN && + (aggstate->phase->aggnode->numCols > 0 && aggstate->projected_set != -1 && aggstate->projected_set < (numGroupingSets - 1) && nextSetSize > 0 && - !ExecQualAndReset(aggstate->phase->eqfunctions[nextSetSize - 1], + !ExecQualAndReset(((AggStatePerPhaseSort) aggstate->phase)->eqfunctions[nextSetSize - 1], tmpcontext))) { aggstate->projected_set += 1; @@ -2376,13 +2423,13 @@ agg_retrieve_direct(AggState *aggstate) for (;;) { /* - * During phase 1 only of a mixed agg, we need to update - * hashtables as well in advance_aggregates. + * If current phase can do transition concurrently, we need + * to update hashtables as well in advance_aggregates. */ - if (aggstate->aggstrategy == AGG_MIXED && - aggstate->current_phase == 1) + if (aggstate->phase->concurrent_hashes) { - lookup_hash_entries(aggstate); + lookup_hash_entries(aggstate, NULL, + aggstate->phase->concurrent_hashes); } /* Advance the aggregates (or combine functions) */ @@ -2396,11 +2443,6 @@ agg_retrieve_direct(AggState *aggstate) { /* no more outer-plan tuples available */ - /* if we built hash tables, finalize any spills */ - if (aggstate->aggstrategy == AGG_MIXED && - aggstate->current_phase == 1) - hashagg_finish_initial_spills(aggstate); - if (hasGroupingSets) { aggstate->input_done = true; @@ -2419,10 +2461,10 @@ agg_retrieve_direct(AggState *aggstate) * If we are grouping, check whether we've crossed a group * boundary. */ - if (node->aggstrategy != AGG_PLAIN) + if (aggstate->phase->aggnode->numCols > 0) { tmpcontext->ecxt_innertuple = firstSlot; - if (!ExecQual(aggstate->phase->eqfunctions[node->numCols - 1], + if (!ExecQual(((AggStatePerPhaseSort) aggstate->phase)->eqfunctions[node->numCols - 1], tmpcontext)) { aggstate->grp_firstTuple = ExecCopySlotHeapTuple(outerslot); @@ -2471,24 +2513,31 @@ agg_retrieve_direct(AggState *aggstate) static void agg_sort_input(AggState *aggstate) { - AggStatePerPhase phase = &aggstate->phases[1]; + AggStatePerPhase phase = aggstate->phases[0]; + AggStatePerPhaseSort persort = (AggStatePerPhaseSort) phase; TupleDesc tupDesc; Sort *sortnode; + bool randomAccess; Assert(!aggstate->input_sorted); + Assert(!phase->is_hashed); Assert(phase->aggnode->sortnode); sortnode = (Sort *) phase->aggnode->sortnode; tupDesc = ExecGetResultType(outerPlanState(aggstate)); - - aggstate->sort_in = tuplesort_begin_heap(tupDesc, - sortnode->numCols, - sortnode->sortColIdx, - sortnode->sortOperators, - sortnode->collations, - sortnode->nullsFirst, - work_mem, - NULL, false); + randomAccess = (aggstate->eflags & (EXEC_FLAG_REWIND | + EXEC_FLAG_BACKWARD | + EXEC_FLAG_MARK)) != 0; + + + persort->sort_in = tuplesort_begin_heap(tupDesc, + sortnode->numCols, + sortnode->sortColIdx, + sortnode->sortOperators, + sortnode->collations, + sortnode->nullsFirst, + work_mem, + NULL, randomAccess); for (;;) { TupleTableSlot *outerslot; @@ -2497,11 +2546,11 @@ agg_sort_input(AggState *aggstate) if (TupIsNull(outerslot)) break; - tuplesort_puttupleslot(aggstate->sort_in, outerslot); + tuplesort_puttupleslot(persort->sort_in, outerslot); } /* Sort the first phase */ - tuplesort_performsort(aggstate->sort_in); + tuplesort_performsort(persort->sort_in); /* Mark the input to be sorted */ aggstate->input_sorted = true; @@ -2513,8 +2562,14 @@ agg_sort_input(AggState *aggstate) static void agg_fill_hash_table(AggState *aggstate) { + AggStatePerPhaseHash currentphase; TupleTableSlot *outerslot; ExprContext *tmpcontext = aggstate->tmpcontext; + List *concurrent_hashes = aggstate->phase->concurrent_hashes; + + /* Current phase must be the first phase */ + Assert(aggstate->current_phase == 0); + currentphase = (AggStatePerPhaseHash) aggstate->phase; /* * Process each outer-plan tuple, and then fetch the next one, until we @@ -2522,7 +2577,7 @@ agg_fill_hash_table(AggState *aggstate) */ for (;;) { - outerslot = fetch_input_tuple(aggstate); + outerslot = ExecProcNode(outerPlanState(aggstate)); if (TupIsNull(outerslot)) break; @@ -2530,7 +2585,7 @@ agg_fill_hash_table(AggState *aggstate) tmpcontext->ecxt_outertuple = outerslot; /* Find or build hashtable entries */ - lookup_hash_entries(aggstate); + lookup_hash_entries(aggstate, currentphase, concurrent_hashes); /* Advance the aggregates (or combine functions) */ advance_aggregates(aggstate); @@ -2548,8 +2603,7 @@ agg_fill_hash_table(AggState *aggstate) aggstate->table_filled = true; /* Initialize to walk the first hash table */ select_current_set(aggstate, 0, true); - ResetTupleHashIterator(aggstate->perhash[0].hashtable, - &aggstate->perhash[0].hashiter); + ResetTupleHashIterator(currentphase->hashtable, ¤tphase->hashiter); } /* @@ -2567,6 +2621,7 @@ agg_fill_hash_table(AggState *aggstate) static bool agg_refill_hash_table(AggState *aggstate) { + AggStatePerPhaseHash perhash; HashAggBatch *batch; HashAggSpill spill; HashTapeInfo *tapeinfo = aggstate->hash_tapeinfo; @@ -2578,6 +2633,7 @@ agg_refill_hash_table(AggState *aggstate) batch = linitial(aggstate->hash_batches); aggstate->hash_batches = list_delete_first(aggstate->hash_batches); + perhash = (AggStatePerPhaseHash) aggstate->phases[batch->phaseidx]; /* * Estimate the number of groups for this batch as the total number of @@ -2592,32 +2648,15 @@ agg_refill_hash_table(AggState *aggstate) batch->used_bits, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, NULL); - /* there could be residual pergroup pointers; clear them */ - for (int setoff = 0; - setoff < aggstate->maxsets + aggstate->num_hashes; - setoff++) - aggstate->all_pergroups[setoff] = NULL; - /* free memory and reset hash tables */ ReScanExprContext(aggstate->hashcontext); - for (int setno = 0; setno < aggstate->num_hashes; setno++) - ResetTupleHashTable(aggstate->perhash[setno].hashtable); + ResetTupleHashTable(perhash->hashtable); aggstate->hash_ngroups_current = 0; - /* - * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output - * happens in phase 0. So, we switch to phase 1 when processing a batch, - * and back to phase 0 after the batch is done. - */ - Assert(aggstate->current_phase == 0); - if (aggstate->phase->aggstrategy == AGG_MIXED) - { - aggstate->current_phase = 1; - aggstate->phase = &aggstate->phases[aggstate->current_phase]; - } - - select_current_set(aggstate, batch->setno, true); + /* switch to the phase of current batch */ + initialize_phase(aggstate, batch->phaseidx); + select_current_set(aggstate, 0, true); /* * Spilled tuples are always read back as MinimalTuples, which may be @@ -2626,7 +2665,8 @@ agg_refill_hash_table(AggState *aggstate) * We still need the NULL check, because we are only processing one * grouping set at a time and the rest will be NULL. */ - hashagg_recompile_expressions(aggstate, true, true); + aggstate->evaltrans_mode = HASHREFILLMODE; + hashagg_recompile_expressions(aggstate); LogicalTapeRewindForRead(tapeinfo->tapeset, batch->input_tapenum, HASHAGG_READ_BUFFER_SIZE); @@ -2644,10 +2684,11 @@ agg_refill_hash_table(AggState *aggstate) ExecStoreMinimalTuple(tuple, slot, true); aggstate->tmpcontext->ecxt_outertuple = slot; - prepare_hash_slot(aggstate); - aggstate->hash_pergroup[batch->setno] = lookup_hash_entry(aggstate, hash); + prepare_hash_slot(aggstate, perhash); + perhash->phasedata.pergroups[0] = + lookup_hash_entry(aggstate, perhash, hash); - if (aggstate->hash_pergroup[batch->setno] != NULL) + if (perhash->phasedata.pergroups[0] != NULL) { /* Advance the aggregates (or combine functions) */ advance_aggregates(aggstate); @@ -2677,14 +2718,10 @@ agg_refill_hash_table(AggState *aggstate) hashagg_tapeinfo_release(tapeinfo, batch->input_tapenum); - /* change back to phase 0 */ - aggstate->current_phase = 0; - aggstate->phase = &aggstate->phases[aggstate->current_phase]; - if (spill_initialized) { hash_agg_update_metrics(aggstate, true, spill.npartitions); - hashagg_spill_finish(aggstate, &spill, batch->setno); + hashagg_spill_finish(aggstate, &spill, batch->phaseidx); } else hash_agg_update_metrics(aggstate, true, 0); @@ -2692,9 +2729,7 @@ agg_refill_hash_table(AggState *aggstate) aggstate->hash_spill_mode = false; /* prepare to walk the first hash table */ - select_current_set(aggstate, batch->setno, true); - ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable, - &aggstate->perhash[batch->setno].hashiter); + ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); pfree(batch); @@ -2742,7 +2777,7 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) TupleHashEntryData *entry; TupleTableSlot *firstSlot; TupleTableSlot *result; - AggStatePerHash perhash; + AggStatePerPhaseHash perhash; /* * get state info from node. @@ -2753,11 +2788,7 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) peragg = aggstate->peragg; firstSlot = aggstate->ss.ss_ScanTupleSlot; - /* - * Note that perhash (and therefore anything accessed through it) can - * change inside the loop, as we change between grouping sets. - */ - perhash = &aggstate->perhash[aggstate->current_set]; + perhash = (AggStatePerPhaseHash) aggstate->phase; /* * We loop retrieving groups until we find one satisfying @@ -2776,18 +2807,16 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) entry = ScanTupleHashTable(perhash->hashtable, &perhash->hashiter); if (entry == NULL) { - int nextset = aggstate->current_set + 1; - - if (nextset < aggstate->num_hashes) + if (aggstate->current_phase + 1 < aggstate->numphases && + aggstate->evaltrans_mode != HASHREFILLMODE) { /* * Switch to next grouping set, reinitialize, and restart the * loop. */ - select_current_set(aggstate, nextset, true); - - perhash = &aggstate->perhash[aggstate->current_set]; - + select_current_set(aggstate, 0, true); + initialize_phase(aggstate, aggstate->current_phase + 1); + perhash = (AggStatePerPhaseHash) aggstate->phase; ResetTupleHashIterator(perhash->hashtable, &perhash->hashiter); continue; @@ -2982,12 +3011,12 @@ hashagg_spill_tuple(HashAggSpill *spill, TupleTableSlot *slot, uint32 hash) * be done. */ static HashAggBatch * -hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int setno, +hashagg_batch_new(LogicalTapeSet *tapeset, int tapenum, int phaseidx, int64 input_tuples, int used_bits) { HashAggBatch *batch = palloc0(sizeof(HashAggBatch)); - batch->setno = setno; + batch->phaseidx = phaseidx; batch->used_bits = used_bits; batch->tapeset = tapeset; batch->input_tapenum = tapenum; @@ -3053,25 +3082,31 @@ hashagg_batch_read(HashAggBatch *batch, uint32 *hashp) static void hashagg_finish_initial_spills(AggState *aggstate) { - int setno; + int phaseidx; int total_npartitions = 0; - if (aggstate->hash_spills != NULL) + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { - for (setno = 0; setno < aggstate->num_hashes; setno++) + AggStatePerPhaseHash perhash; + AggStatePerPhase phase = aggstate->phases[phaseidx]; + + if (!phase->is_hashed) + continue; + + perhash = (AggStatePerPhaseHash) phase; + if (perhash->hash_spill) { - HashAggSpill *spill = &aggstate->hash_spills[setno]; - total_npartitions += spill->npartitions; - hashagg_spill_finish(aggstate, spill, setno); - } + total_npartitions += perhash->hash_spill->npartitions; + hashagg_spill_finish(aggstate, perhash->hash_spill, phase->phaseidx); - /* - * We're not processing tuples from outer plan any more; only - * processing batches of spilled tuples. The initial spill structures - * are no longer needed. - */ - pfree(aggstate->hash_spills); - aggstate->hash_spills = NULL; + /* + * We're not processing tuples from outer plan any more; only + * processing batches of spilled tuples. The initial spill structures + * are no longer needed. + */ + pfree(perhash->hash_spill); + perhash->hash_spill = NULL; + } } hash_agg_update_metrics(aggstate, false, total_npartitions); @@ -3084,7 +3119,7 @@ hashagg_finish_initial_spills(AggState *aggstate) * Transform spill partitions into new batches. */ static void -hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) +hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int phaseidx) { int i; int used_bits = 32 - spill->shift; @@ -3102,7 +3137,7 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) continue; new_batch = hashagg_batch_new(aggstate->hash_tapeinfo->tapeset, - tapenum, setno, spill->ntuples[i], + tapenum, phaseidx, spill->ntuples[i], used_bits); aggstate->hash_batches = lcons(new_batch, aggstate->hash_batches); aggstate->hash_batches_used++; @@ -3118,21 +3153,25 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) static void hashagg_reset_spill_state(AggState *aggstate) { - ListCell *lc; + ListCell *lc; + int phaseidx; /* free spills from initial pass */ - if (aggstate->hash_spills != NULL) + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { - int setno; + AggStatePerPhaseHash perhash; + AggStatePerPhase phase = aggstate->phases[phaseidx]; - for (setno = 0; setno < aggstate->num_hashes; setno++) + if (!phase->is_hashed) + continue; + + perhash = (AggStatePerPhaseHash) phase; + + if (perhash->hash_spill) { - HashAggSpill *spill = &aggstate->hash_spills[setno]; - pfree(spill->ntuples); - pfree(spill->partitions); + pfree(perhash->hash_spill); + perhash->hash_spill = NULL; } - pfree(aggstate->hash_spills); - aggstate->hash_spills = NULL; } /* free batches */ @@ -3171,25 +3210,22 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) AggState *aggstate; AggStatePerAgg peraggs; AggStatePerTrans pertransstates; - AggStatePerGroup *pergroups; Plan *outerPlan; ExprContext *econtext; TupleDesc scanDesc; - Agg *firstSortAgg; int numaggs, transno, aggno; - int phase; int phaseidx; ListCell *l; Bitmapset *all_grouped_cols = NULL; int numGroupingSets = 1; - int numPhases; - int numHashes; int i = 0; int j = 0; + bool need_extra_slot = false; bool use_hashing = (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED); + uint64 totalHashGroups = 0; /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -3216,24 +3252,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->curpertrans = NULL; aggstate->input_done = false; aggstate->agg_done = false; - aggstate->pergroups = NULL; aggstate->grp_firstTuple = NULL; - aggstate->sort_in = NULL; - aggstate->sort_out = NULL; aggstate->input_sorted = true; - - /* - * phases[0] always exists, but is dummy in sorted/plain mode - */ - numPhases = (use_hashing ? 1 : 2); - numHashes = (use_hashing ? 1 : 0); - - firstSortAgg = node->aggstrategy == AGG_SORTED ? node : NULL; + aggstate->eflags = eflags; + aggstate->num_hashes = 0; + aggstate->hash_spill_mode = HASHNORMALMODE; /* * Calculate the maximum number of grouping sets in any phase; this - * determines the size of some allocations. Also calculate the number of - * phases, since all hashed/mixed nodes contribute to only a single phase. + * determines the size of some allocations. */ if (node->groupingSets) { @@ -3246,31 +3273,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) numGroupingSets = Max(numGroupingSets, list_length(agg->groupingSets)); - /* - * additional AGG_HASHED aggs become part of phase 0, but all - * others add an extra phase. - */ if (agg->aggstrategy != AGG_HASHED) - { - ++numPhases; - - if (!firstSortAgg) - firstSortAgg = agg; - - } - else - ++numHashes; + need_extra_slot = true; } } aggstate->maxsets = numGroupingSets; - aggstate->numphases = numPhases; + aggstate->numphases = 1 + list_length(node->chain); /* - * The first SORTED phase is not sorted, agg need to do its own sort. See + * The first phase is not sorted, agg need to do its own sort. See * agg_sort_input(), this can only happen in groupingsets case. */ - if (firstSortAgg && firstSortAgg->sortnode) + if (node->sortnode) aggstate->input_sorted = false; aggstate->aggcontexts = (ExprContext **) @@ -3331,11 +3346,10 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; /* - * If there are more than two phases (including a potential dummy phase - * 0), input will be resorted using tuplesort. Need a slot for that. + * An extra slot is needed if 1) agg need to do its own sort 2) agg + * has more than one non-hashed phases */ - if (numPhases > 2 || - !aggstate->input_sorted) + if (node->sortnode || need_extra_slot) { aggstate->sort_slot = ExecInitExtraTupleSlot(estate, scanDesc, &TTSOpsMinimalTuple); @@ -3391,72 +3405,92 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * For each phase, prepare grouping set data and fmgr lookup data for * compare functions. Accumulate all_grouped_cols in passing. */ - aggstate->phases = palloc0(numPhases * sizeof(AggStatePerPhaseData)); + aggstate->phases = palloc0(aggstate->numphases * sizeof(AggStatePerPhase)); - aggstate->num_hashes = numHashes; - if (numHashes) - { - aggstate->perhash = palloc0(sizeof(AggStatePerHashData) * numHashes); - aggstate->phases[0].numsets = 0; - aggstate->phases[0].gset_lengths = palloc(numHashes * sizeof(int)); - aggstate->phases[0].grouped_cols = palloc(numHashes * sizeof(Bitmapset *)); - } - - phase = 0; - for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx) + for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { Agg *aggnode; + AggStatePerPhase phasedata = NULL; if (phaseidx > 0) aggnode = list_nth_node(Agg, node->chain, phaseidx - 1); else aggnode = node; - if (aggnode->aggstrategy == AGG_HASHED - || aggnode->aggstrategy == AGG_MIXED) + if (aggnode->aggstrategy == AGG_HASHED) { - AggStatePerPhase phasedata = &aggstate->phases[0]; - AggStatePerHash perhash; - Bitmapset *cols = NULL; + AggStatePerPhaseHash perhash; + Bitmapset *cols = NULL; - Assert(phase == 0); - i = phasedata->numsets++; - perhash = &aggstate->perhash[i]; + aggstate->num_hashes++; + totalHashGroups += aggnode->numGroups; - /* phase 0 always points to the "real" Agg in the hash case */ - phasedata->aggnode = node; - phasedata->aggstrategy = node->aggstrategy; - - /* but the actual Agg node representing this hash is saved here */ - perhash->aggnode = aggnode; + perhash = (AggStatePerPhaseHash) palloc0(sizeof(AggStatePerPhaseHashData)); + phasedata = (AggStatePerPhase) perhash; + phasedata->is_hashed = true; + phasedata->aggnode = aggnode; + phasedata->aggstrategy = aggnode->aggstrategy; - phasedata->gset_lengths[i] = perhash->numCols = aggnode->numCols; + /* AGG_HASHED always has only one set */ + phasedata->numsets = 1; + phasedata->gset_lengths = palloc(sizeof(int)); + phasedata->gset_lengths[0] = perhash->numCols = aggnode->numCols; + phasedata->grouped_cols = palloc(sizeof(Bitmapset *)); for (j = 0; j < aggnode->numCols; ++j) cols = bms_add_member(cols, aggnode->grpColIdx[j]); - - phasedata->grouped_cols[i] = cols; + phasedata->grouped_cols[0] = cols; all_grouped_cols = bms_add_members(all_grouped_cols, cols); - continue; + + /* + * Initialize pergroup state. For AGG_HASHED, all groups do transition + * on the fly, all pergroup states are kept in hashtable, everytime + * a tuple is processed, lookup_hash_entry() choose one group and + * set phasedata->pergroups[0], then advance_aggregates can use it + * to do transition in this group. + * We do not need to allocate a real pergroup and set the pointer + * here, there are too many pergroup states, lookup_hash_entry() will + * allocate it. + */ + phasedata->pergroups = + (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup)); + + /* + * Hash aggregate does not require the order of input tuples, so + * we can do the transition immediately when a tuple is fetched, + * which means we can do the transition concurrently with the + * first phase. + */ + if (phaseidx > 0) + { + aggstate->phases[0]->concurrent_hashes = + lappend(aggstate->phases[0]->concurrent_hashes, perhash); + /* skip evaltrans for this phase */ + phasedata->skip_evaltrans = true; + } } else { - AggStatePerPhase phasedata = &aggstate->phases[++phase]; - int num_sets; + AggStatePerPhaseSort persort; - phasedata->numsets = num_sets = list_length(aggnode->groupingSets); + persort = (AggStatePerPhaseSort) palloc0(sizeof(AggStatePerPhaseSortData)); + phasedata = (AggStatePerPhase) persort; + phasedata->is_hashed = false; + phasedata->aggnode = aggnode; + phasedata->aggstrategy = aggnode->aggstrategy; - if (num_sets) + if (aggnode->groupingSets) { - phasedata->gset_lengths = palloc(num_sets * sizeof(int)); - phasedata->grouped_cols = palloc(num_sets * sizeof(Bitmapset *)); + phasedata->numsets = list_length(aggnode->groupingSets); + phasedata->gset_lengths = palloc(phasedata->numsets * sizeof(int)); + phasedata->grouped_cols = palloc(phasedata->numsets * sizeof(Bitmapset *)); i = 0; foreach(l, aggnode->groupingSets) { - int current_length = list_length(lfirst(l)); - Bitmapset *cols = NULL; + int current_length = list_length(lfirst(l)); + Bitmapset *cols = NULL; /* planner forces this to be correct */ for (j = 0; j < current_length; ++j) @@ -3473,37 +3507,49 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } else { - Assert(phaseidx == 0); - + phasedata->numsets = 1; phasedata->gset_lengths = NULL; phasedata->grouped_cols = NULL; } + /* + * Initialize pergroup states for AGG_SORTED/AGG_PLAIN/AGG_MIXED + * phases, each set only have one group on the fly, all groups in + * a set can reuse a pergroup state. Unlike AGG_HASHED, we + * pre-allocate the pergroup states here. + */ + phasedata->pergroups = + (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) * phasedata->numsets); + + for (i = 0; i < phasedata->numsets; i++) + { + phasedata->pergroups[i] = + (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) * numaggs); + } + /* * If we are grouping, precompute fmgr lookup data for inner loop. */ - if (aggnode->aggstrategy == AGG_SORTED) + if (aggnode->numCols > 0) { int i = 0; - Assert(aggnode->numCols > 0); - /* * Build a separate function for each subset of columns that * need to be compared. */ - phasedata->eqfunctions = + persort->eqfunctions = (ExprState **) palloc0(aggnode->numCols * sizeof(ExprState *)); /* for each grouping set */ - for (i = 0; i < phasedata->numsets; i++) + for (i = 0; i < phasedata->numsets && phasedata->gset_lengths; i++) { int length = phasedata->gset_lengths[i]; - if (phasedata->eqfunctions[length - 1] != NULL) + if (persort->eqfunctions[length - 1] != NULL) continue; - phasedata->eqfunctions[length - 1] = + persort->eqfunctions[length - 1] = execTuplesMatchPrepare(scanDesc, length, aggnode->grpColIdx, @@ -3513,9 +3559,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } /* and for all grouped columns, unless already computed */ - if (phasedata->eqfunctions[aggnode->numCols - 1] == NULL) + if (persort->eqfunctions[aggnode->numCols - 1] == NULL) { - phasedata->eqfunctions[aggnode->numCols - 1] = + persort->eqfunctions[aggnode->numCols - 1] = execTuplesMatchPrepare(scanDesc, aggnode->numCols, aggnode->grpColIdx, @@ -3525,9 +3571,24 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) } } - phasedata->aggnode = aggnode; - phasedata->aggstrategy = aggnode->aggstrategy; + /* + * For non-first AGG_SORTED phase, it processes the same input + * tuples with previous phase except that it need to resort the + * input tuples. Tell the previous phase to copy out the tuples. + */ + if (phaseidx > 0) + { + AggStatePerPhaseSort prev = + (AggStatePerPhaseSort) aggstate->phases[phaseidx - 1]; + + Assert(!prev->phasedata.is_hashed); + /* Tell the previous phase to copy the tuple to the sort_in */ + prev->copy_out = true; + } } + + phasedata->phaseidx = phaseidx; + aggstate->phases[phaseidx] = phasedata; } /* @@ -3551,51 +3612,9 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->peragg = peraggs; aggstate->pertrans = pertransstates; - - aggstate->all_pergroups = - (AggStatePerGroup *) palloc0(sizeof(AggStatePerGroup) - * (numGroupingSets + numHashes)); - pergroups = aggstate->all_pergroups; - - if (node->aggstrategy != AGG_HASHED) - { - for (i = 0; i < numGroupingSets; i++) - { - pergroups[i] = (AggStatePerGroup) palloc0(sizeof(AggStatePerGroupData) - * numaggs); - } - - aggstate->pergroups = pergroups; - pergroups += numGroupingSets; - } - - /* - * Hashing can only appear in the initial phase. - */ - if (use_hashing) - { - /* this is an array of pointers, not structures */ - aggstate->hash_pergroup = pergroups; - } - - /* - * Initialize current phase-dependent values to initial phase. The initial - * phase is 1 (first sort pass) for all strategies that use sorting (if - * hashing is being done too, then phase 0 is processed last); but if only - * hashing is being done, then phase 0 is all there is. - */ - if (node->aggstrategy == AGG_HASHED) - { - aggstate->current_phase = 0; - initialize_phase(aggstate, 0); - select_current_set(aggstate, 0, true); - } - else - { - aggstate->current_phase = 1; - initialize_phase(aggstate, 1); - select_current_set(aggstate, 0, false); - } + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + select_current_set(aggstate, 0, aggstate->aggstrategy == AGG_HASHED); /* ----------------- * Perform lookups of aggregate function info, and initialize the @@ -3931,12 +3950,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) (errcode(ERRCODE_GROUPING_ERROR), errmsg("aggregate function calls cannot be nested"))); - /* Initialize hash contexts and hash tables for hash aggregates */ + /* + * Initialize current phase-dependent values to initial phase. + */ if (use_hashing) { Plan *outerplan = outerPlan(node); - uint64 totalGroups = 0; - int i; aggstate->hash_metacxt = AllocSetContextCreate( aggstate->ss.ps.state->es_query_cxt, @@ -3954,10 +3973,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * when there is more than one grouping set, but should still be * reasonable. */ - for (i = 0; i < aggstate->num_hashes; i++) - totalGroups = aggstate->perhash[i].aggnode->numGroups; - - hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0, + hash_agg_set_limits(aggstate->hashentrysize, totalHashGroups, 0, &aggstate->hash_mem_limit, &aggstate->hash_ngroups_limit, &aggstate->hash_planned_partitions); @@ -3976,51 +3992,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) */ for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { - AggStatePerPhase phase = &aggstate->phases[phaseidx]; - bool dohash = false; - bool dosort = false; + AggStatePerPhase phase = aggstate->phases[phaseidx]; - /* phase 0 doesn't necessarily exist */ - if (!phase->aggnode) + if (phase->skip_evaltrans) continue; - if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 1) - { - /* - * Phase one, and only phase one, in a mixed agg performs both - * sorting and aggregation. - */ - dohash = true; - dosort = true; - } - else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0) - { - /* - * No need to compute a transition function for an AGG_MIXED phase - * 0 - the contents of the hashtables will have been computed - * during phase 1. - */ - continue; - } - else if (phase->aggstrategy == AGG_PLAIN || - phase->aggstrategy == AGG_SORTED) - { - dohash = false; - dosort = true; - } - else if (phase->aggstrategy == AGG_HASHED) - { - dohash = true; - dosort = false; - } - else - Assert(false); - - phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash, - false); + phase->evaltrans = ExecBuildAggTrans(aggstate, phase, false, true); /* cache compiled expression for outer slot without NULL check */ - phase->evaltrans_cache[0][0] = phase->evaltrans; + phase->evaltrans_cache[HASHNORMALMODE] = phase->evaltrans; } return aggstate; @@ -4506,13 +4486,21 @@ ExecEndAgg(AggState *node) int transno; int numGroupingSets = Max(node->maxsets, 1); int setno; + int phaseidx; /* Make sure we have closed any open tuplesorts */ + for (phaseidx = 0; phaseidx < node->numphases; phaseidx++) + { + AggStatePerPhase phase = node->phases[phaseidx]; + AggStatePerPhaseSort persort; - if (node->sort_in) - tuplesort_end(node->sort_in); - if (node->sort_out) - tuplesort_end(node->sort_out); + if (phase->is_hashed) + continue; + + persort = (AggStatePerPhaseSort) phase; + if (persort->sort_in) + tuplesort_end(persort->sort_in); + } hashagg_reset_spill_state(node); @@ -4562,6 +4550,7 @@ ExecReScanAgg(AggState *node) int transno; int numGroupingSets = Max(node->maxsets, 1); int setno; + int phaseidx; node->agg_done = false; @@ -4586,8 +4575,12 @@ ExecReScanAgg(AggState *node) if (outerPlan->chgParam == NULL && !node->hash_ever_spilled && !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) { - ResetTupleHashIterator(node->perhash[0].hashtable, - &node->perhash[0].hashiter); + AggStatePerPhaseHash perhash = (AggStatePerPhaseHash) node->phases[0]; + ResetTupleHashIterator(perhash->hashtable, + &perhash->hashiter); + + /* reset to phase 0 */ + initialize_phase(node, 0); select_current_set(node, 0, true); return; } @@ -4652,7 +4645,8 @@ ExecReScanAgg(AggState *node) node->table_filled = false; /* iterator will be reset when the table is filled */ - hashagg_recompile_expressions(node, false, false); + node->hash_spill_mode = HASHNORMALMODE; + hashagg_recompile_expressions(node); } if (node->aggstrategy != AGG_HASHED) @@ -4660,18 +4654,54 @@ ExecReScanAgg(AggState *node) /* * Reset the per-group state (in particular, mark transvalues null) */ - for (setno = 0; setno < numGroupingSets; setno++) + for (phaseidx = 0; phaseidx < node->numphases; phaseidx++) { - MemSet(node->pergroups[setno], 0, - sizeof(AggStatePerGroupData) * node->numaggs); + AggStatePerPhase phase = node->phases[phaseidx]; + + /* hash pergroups is reset by build_hash_tables */ + if (phase->is_hashed) + continue; + + for (setno = 0; setno < phase->numsets; setno++) + MemSet(phase->pergroups[setno], 0, + sizeof(AggStatePerGroupData) * node->numaggs); } - /* Reset input_sorted */ + /* + * The agg did its own first sort using tuplesort and the first + * tuplesort is kept (see initialize_phase), if the subplan does + * not have any parameter changes, and none of our own parameter + * changes affect input expressions of the aggregated functions, + * then we can just rescan the first tuplesort, no need to build + * it again. + * + * Note: agg only do its own sort for groupingsets now. + */ if (aggnode->sortnode) - node->input_sorted = false; + { + AggStatePerPhaseSort firstphase = (AggStatePerPhaseSort) node->phases[0]; + bool randomAccess = (node->eflags & (EXEC_FLAG_REWIND | + EXEC_FLAG_BACKWARD | + EXEC_FLAG_MARK)) != 0; + if (firstphase->sort_in && + randomAccess && + outerPlan->chgParam == NULL && + !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) + { + tuplesort_rescan(firstphase->sort_in); + node->input_sorted = true; + } + else + { + if (firstphase->sort_in) + tuplesort_end(firstphase->sort_in); + firstphase->sort_in = NULL; + node->input_sorted = false; + } + } - /* reset to phase 1 */ - initialize_phase(node, 1); + /* reset to phase 0 */ + initialize_phase(node, 0); node->input_done = false; node->projected_set = -1; diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index b855e73957..066cd59554 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -2049,30 +2049,26 @@ llvm_compile_expr(ExprState *state) case EEOP_AGG_PLAIN_PERGROUP_NULLCHECK: { int jumpnull; - LLVMValueRef v_aggstatep; - LLVMValueRef v_allpergroupsp; + LLVMValueRef v_pergroupsp; LLVMValueRef v_pergroup_allaggs; - LLVMValueRef v_setoff; + LLVMValueRef v_setno; jumpnull = op->d.agg_plain_pergroup_nullcheck.jumpnull; /* - * pergroup_allaggs = aggstate->all_pergroups - * [op->d.agg_plain_pergroup_nullcheck.setoff]; + * pergroup = + * &op->d.agg_plain_pergroup_nullcheck.pergroups + * [op->d.agg_plain_pergroup_nullcheck.setno]; */ - v_aggstatep = LLVMBuildBitCast( - b, v_parent, l_ptr(StructAggState), ""); + v_pergroupsp = + l_ptr_const(op->d.agg_plain_pergroup_nullcheck.pergroups, + l_ptr(l_ptr(StructAggStatePerGroupData))); - v_allpergroupsp = l_load_struct_gep( - b, v_aggstatep, - FIELDNO_AGGSTATE_ALL_PERGROUPS, - "aggstate.all_pergroups"); + v_setno = + l_int32_const(op->d.agg_plain_pergroup_nullcheck.setno); - v_setoff = l_int32_const( - op->d.agg_plain_pergroup_nullcheck.setoff); - - v_pergroup_allaggs = l_load_gep1( - b, v_allpergroupsp, v_setoff, ""); + v_pergroup_allaggs = + l_load_gep1(b, v_pergroupsp, v_setno, ""); LLVMBuildCondBr( b, @@ -2094,6 +2090,7 @@ llvm_compile_expr(ExprState *state) { AggState *aggstate; AggStatePerTrans pertrans; + AggStatePerGroup *pergroups; FunctionCallInfo fcinfo; LLVMValueRef v_aggstatep; @@ -2103,12 +2100,12 @@ llvm_compile_expr(ExprState *state) LLVMValueRef v_transvaluep; LLVMValueRef v_transnullp; - LLVMValueRef v_setoff; + LLVMValueRef v_setno; LLVMValueRef v_transno; LLVMValueRef v_aggcontext; - LLVMValueRef v_allpergroupsp; + LLVMValueRef v_pergroupsp; LLVMValueRef v_current_setp; LLVMValueRef v_current_pertransp; LLVMValueRef v_curaggcontext; @@ -2124,6 +2121,7 @@ llvm_compile_expr(ExprState *state) aggstate = castNode(AggState, state->parent); pertrans = op->d.agg_trans.pertrans; + pergroups = op->d.agg_trans.pergroups; fcinfo = pertrans->transfn_fcinfo; @@ -2133,19 +2131,18 @@ llvm_compile_expr(ExprState *state) l_ptr(StructAggStatePerTransData)); /* - * pergroup = &aggstate->all_pergroups - * [op->d.agg_strict_trans_check.setoff] - * [op->d.agg_init_trans_check.transno]; + * pergroup = &op->d.agg_trans.pergroups + * [op->d.agg_trans.setno] + * [op->d.agg_trans.transno]; */ - v_allpergroupsp = - l_load_struct_gep(b, v_aggstatep, - FIELDNO_AGGSTATE_ALL_PERGROUPS, - "aggstate.all_pergroups"); - v_setoff = l_int32_const(op->d.agg_trans.setoff); + v_pergroupsp = + l_ptr_const(pergroups, + l_ptr(l_ptr(StructAggStatePerGroupData))); + v_setno = l_int32_const(op->d.agg_trans.setno); v_transno = l_int32_const(op->d.agg_trans.transno); v_pergroupp = LLVMBuildGEP(b, - l_load_gep1(b, v_allpergroupsp, v_setoff, ""), + l_load_gep1(b, v_pergroupsp, v_setno, ""), &v_transno, 1, ""); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 7c29f89cc3..e9ad5a98cb 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -2226,8 +2226,6 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) chain = NIL; if (list_length(rollups) > 1) { - bool is_first_sort = ((RollupData *) linitial(rollups))->is_hashed; - for_each_cell(lc, rollups, list_second_cell(rollups)) { RollupData *rollup = lfirst(lc); @@ -2245,24 +2243,17 @@ create_groupingsets_plan(PlannerInfo *root, GroupingSetsPath *best_path) */ if (!rollup->is_hashed) { - if (!is_first_sort || - (is_first_sort && !best_path->is_sorted)) - { - sort_plan = (Plan *) - make_sort_from_groupcols(rollup->groupClause, - new_grpColIdx, - subplan); - - /* - * Remove stuff we don't need to avoid bloating debug output. - */ - sort_plan->targetlist = NIL; - sort_plan->lefttree = NULL; - } - } + sort_plan = (Plan *) + make_sort_from_groupcols(rollup->groupClause, + new_grpColIdx, + subplan); - if (!rollup->is_hashed) - is_first_sort = false; + /* + * Remove stuff we don't need to avoid bloating debug output. + */ + sort_plan->targetlist = NIL; + sort_plan->lefttree = NULL; + } if (rollup->is_hashed) strat = AGG_HASHED; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 6578b3fef0..f26e962ac9 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -4357,7 +4357,8 @@ consider_groupingsets_paths(PlannerInfo *root, if (unhashed_rollup) { - new_rollups = lappend(new_rollups, unhashed_rollup); + /* unhashed rollups always sit before hashed rollups */ + new_rollups = lcons(unhashed_rollup, new_rollups); strat = AGG_MIXED; } else if (empty_sets) @@ -4370,7 +4371,8 @@ consider_groupingsets_paths(PlannerInfo *root, rollup->numGroups = list_length(empty_sets); rollup->hashable = false; rollup->is_hashed = false; - new_rollups = lappend(new_rollups, rollup); + /* unhashed rollups always sit before hashed rollups */ + new_rollups = lcons(rollup, new_rollups); /* * The first non-hashed rollup is PLAIN AGG, is_sorted * should be true. @@ -4539,7 +4541,8 @@ consider_groupingsets_paths(PlannerInfo *root, rollup->numGroups = gs->numGroups; rollup->hashable = true; rollup->is_hashed = true; - rollups = lcons(rollup, rollups); + /* non-hashed rollup always sit before hashed rollup */ + rollups = lappend(rollups, rollup); } if (rollups) diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 6e8899227f..4578c3184b 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3001,7 +3001,6 @@ create_groupingsets_path(PlannerInfo *root, PathTarget *target = rel->reltarget; ListCell *lc; bool is_first = true; - bool is_first_sort = true; /* The topmost generated Plan node will be an Agg */ pathnode->path.pathtype = T_Agg; @@ -3054,14 +3053,13 @@ create_groupingsets_path(PlannerInfo *root, int numGroupCols = list_length(linitial(gsets)); /* - * In AGG_SORTED or AGG_PLAIN mode, the first rollup takes the - * (already-sorted) input, and following ones do their own sort. + * In AGG_SORTED or AGG_PLAIN mode, the first rollup do its own + * sort if is_sorted is false, the following ones do their own sort. * * In AGG_HASHED mode, there is one rollup for each grouping set. * - * In AGG_MIXED mode, the first rollups are hashed, the first - * non-hashed one takes the (already-sorted) input, and following ones - * do their own sort. + * In AGG_MIXED mode, the first rollup do its own sort if is_sorted + * is false, the following non-hashed ones do their own sort. */ if (is_first) { @@ -3095,33 +3093,21 @@ create_groupingsets_path(PlannerInfo *root, subpath->rows, subpath->pathtarget->width); is_first = false; - if (!rollup->is_hashed) - is_first_sort = false; } else { + AggStrategy rollup_strategy; Path sort_path; /* dummy for result of cost_sort */ Path agg_path; /* dummy for result of cost_agg */ - if (rollup->is_hashed || (is_first_sort && is_sorted)) - { - /* - * Account for cost of aggregation, but don't charge input - * cost again - */ - cost_agg(&agg_path, root, - rollup->is_hashed ? AGG_HASHED : AGG_SORTED, - agg_costs, - numGroupCols, - rollup->numGroups, - having_qual, - 0.0, 0.0, - subpath->rows, - subpath->pathtarget->width); - if (!rollup->is_hashed) - is_first_sort = false; - } - else + sort_path.startup_cost = 0; + sort_path.total_cost = 0; + sort_path.rows = subpath->rows; + + rollup_strategy = rollup->is_hashed ? + AGG_HASHED : (numGroupCols ? AGG_SORTED : AGG_PLAIN); + + if (!rollup->is_hashed && numGroupCols) { /* Account for cost of sort, but don't charge input cost again */ cost_sort(&sort_path, root, NIL, @@ -3131,21 +3117,20 @@ create_groupingsets_path(PlannerInfo *root, 0.0, work_mem, -1.0); - - /* Account for cost of aggregation */ - - cost_agg(&agg_path, root, - AGG_SORTED, - agg_costs, - numGroupCols, - rollup->numGroups, - having_qual, - sort_path.startup_cost, - sort_path.total_cost, - sort_path.rows, - subpath->pathtarget->width); } + /* Account for cost of aggregation */ + cost_agg(&agg_path, root, + rollup_strategy, + agg_costs, + numGroupCols, + rollup->numGroups, + having_qual, + sort_path.startup_cost, + sort_path.total_cost, + sort_path.rows, + subpath->pathtarget->width); + pathnode->path.total_cost += agg_path.total_cost; pathnode->path.rows += agg_path.rows; } diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index dbe8649a57..4ed5d0a7de 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -626,7 +626,8 @@ typedef struct ExprEvalStep /* for EEOP_AGG_PLAIN_PERGROUP_NULLCHECK */ struct { - int setoff; + AggStatePerGroup *pergroups; + int setno; int jumpnull; } agg_plain_pergroup_nullcheck; @@ -634,11 +635,11 @@ typedef struct ExprEvalStep /* for EEOP_AGG_ORDERED_TRANS_{DATUM,TUPLE} */ struct { + AggStatePerGroup *pergroups; AggStatePerTrans pertrans; ExprContext *aggcontext; int setno; int transno; - int setoff; } agg_trans; } d; } ExprEvalStep; diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 94890512dc..d3a56c068e 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -255,7 +255,7 @@ extern ExprState *ExecInitQual(List *qual, PlanState *parent); extern ExprState *ExecInitCheck(List *qual, PlanState *parent); extern List *ExecInitExprList(List *nodes, PlanState *parent); extern ExprState *ExecBuildAggTrans(AggState *aggstate, struct AggStatePerPhaseData *phase, - bool doSort, bool doHash, bool nullcheck); + bool nullcheck, bool allow_concurrent_hashing); extern ExprState *ExecBuildGroupingEqual(TupleDesc ldesc, TupleDesc rdesc, const TupleTableSlotOps *lops, const TupleTableSlotOps *rops, int numCols, diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 9e70bd8b84..1612b71e05 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -270,21 +270,33 @@ typedef struct AggStatePerGroupData */ typedef struct AggStatePerPhaseData { + int phaseidx; /* phaseidx */ + bool is_hashed; /* plan to do hash aggregate */ AggStrategy aggstrategy; /* strategy for this phase */ - int numsets; /* number of grouping sets (or 0) */ + int numsets; /* number of grouping sets */ int *gset_lengths; /* lengths of grouping sets */ Bitmapset **grouped_cols; /* column groupings for rollup */ - ExprState **eqfunctions; /* expression returning equality, indexed by - * nr of cols to compare */ Agg *aggnode; /* Agg node for phase data */ ExprState *evaltrans; /* evaluation of transition functions */ - /* cached variants of the compiled expression */ - ExprState *evaltrans_cache - [2] /* 0: outerops; 1: TTSOpsMinimalTuple */ - [2]; /* 0: no NULL check; 1: with NULL check */ + ExprState *evaltrans_cache[3]; + + List *concurrent_hashes; /* hash phases can do transition concurrently */ + AggStatePerGroup *pergroups; /* pergroup states for a phase */ + + bool skip_evaltrans; /* do not build evaltrans */ } AggStatePerPhaseData; +typedef struct AggStatePerPhaseSortData +{ + AggStatePerPhaseData phasedata; + Tuplesortstate *sort_in; /* sorted input to phases > 1 */ + Tuplestorestate *store_in; /* sorted input to phases > 1 */ + ExprState **eqfunctions; /* expression returning equality, indexed by + * nr of cols to compare */ + bool copy_out; /* hint for copy input tuples for next phase */ +} AggStatePerPhaseSortData; + /* * AggStatePerHashData - per-hashtable state * @@ -292,8 +304,9 @@ typedef struct AggStatePerPhaseData * grouping set. (When doing hashing without grouping sets, we have just one of * them.) */ -typedef struct AggStatePerHashData +typedef struct AggStatePerPhaseHashData { + AggStatePerPhaseData phasedata; TupleHashTable hashtable; /* hash table with one entry per group */ TupleHashIterator hashiter; /* for iterating through hash table */ TupleTableSlot *hashslot; /* slot for loading hash table */ @@ -304,9 +317,8 @@ typedef struct AggStatePerHashData int largestGrpColIdx; /* largest col required for hashing */ AttrNumber *hashGrpColIdxInput; /* hash col indices in input slot */ AttrNumber *hashGrpColIdxHash; /* indices in hash table tuples */ - Agg *aggnode; /* original Agg node, for numGroups etc. */ -} AggStatePerHashData; - + struct HashAggSpill *hash_spill; /* HashAggSpill for current hash grouping set */ +} AggStatePerPhaseHashData; extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags); extern void ExecEndAgg(AggState *node); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 75a45b2549..788ddade64 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2036,7 +2036,8 @@ typedef struct AggStatePerAggData *AggStatePerAgg; typedef struct AggStatePerTransData *AggStatePerTrans; typedef struct AggStatePerGroupData *AggStatePerGroup; typedef struct AggStatePerPhaseData *AggStatePerPhase; -typedef struct AggStatePerHashData *AggStatePerHash; +typedef struct AggStatePerPhaseSortData *AggStatePerPhaseSort; +typedef struct AggStatePerPhaseHashData *AggStatePerPhaseHash; typedef struct AggState { @@ -2068,21 +2069,17 @@ typedef struct AggState List *all_grouped_cols; /* list of all grouped cols in DESC order */ /* These fields are for grouping set phase data */ int maxsets; /* The max number of sets in any phase */ - AggStatePerPhase phases; /* array of all phases */ + AggStatePerPhase *phases; /* array of all phases */ Tuplesortstate *sort_in; /* sorted input to phases > 1 */ Tuplesortstate *sort_out; /* input is copied here for next phase */ TupleTableSlot *sort_slot; /* slot for sort results */ /* these fields are used in AGG_PLAIN and AGG_SORTED modes: */ - AggStatePerGroup *pergroups; /* grouping set indexed array of per-group - * pointers */ HeapTuple grp_firstTuple; /* copy of first tuple of current group */ - /* these fields are used in AGG_HASHED and AGG_MIXED modes: */ + /* these fields are used in AGG_HASHED */ bool table_filled; /* hash table filled yet? */ int num_hashes; MemoryContext hash_metacxt; /* memory for hash table itself */ struct HashTapeInfo *hash_tapeinfo; /* metadata for spill tapes */ - struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set, - exists only during first pass */ TupleTableSlot *hash_spill_slot; /* slot for reading from spill files */ List *hash_batches; /* hash batches remaining to be processed */ bool hash_ever_spilled; /* ever spilled during this execution? */ @@ -2098,18 +2095,16 @@ typedef struct AggState memory in all hash tables */ uint64 hash_disk_used; /* kB of disk space used */ int hash_batches_used; /* batches used during entire execution */ - - AggStatePerHash perhash; /* array of per-hashtable data */ - AggStatePerGroup *hash_pergroup; /* grouping set indexed array of - * per-group pointers */ +#define HASHNORMALMODE 0 /* normal mode: no minmal slot, no null check */ +#define HASHSPILLMODE 1 /* spill mode: no minmal slot, null check */ +#define HASHREFILLMODE 2 /* refill mode: minmal slot, no null check */ + int evaltrans_mode; /* these fields are used in AGG_SORTED and AGG_MIXED */ bool input_sorted; /* hash table filled yet? */ + int eflags; /* eflags for the first sort */ + - /* support for evaluation of agg input expressions: */ -#define FIELDNO_AGGSTATE_ALL_PERGROUPS 50 - AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than - * ->hash_pergroup */ ProjectionInfo *combinedproj; /* projection machinery */ } AggState; diff --git a/src/test/regress/expected/groupingsets.out b/src/test/regress/expected/groupingsets.out index 1acbbfad55..09f881d78a 100644 --- a/src/test/regress/expected/groupingsets.out +++ b/src/test/regress/expected/groupingsets.out @@ -1004,10 +1004,10 @@ explain (costs off) select a, b, grouping(a,b), sum(v), count(*), max(v) Sort Sort Key: (GROUPING("*VALUES*".column1, "*VALUES*".column2)), "*VALUES*".column1, "*VALUES*".column2 -> MixedAggregate + Group Key: () Hash Key: "*VALUES*".column1, "*VALUES*".column2 Hash Key: "*VALUES*".column1 Hash Key: "*VALUES*".column2 - Group Key: () -> Values Scan on "*VALUES*" (8 rows) @@ -1066,9 +1066,9 @@ explain (costs off) Sort Sort Key: (GROUPING(unhashable_col, unsortable_col)), (sum(v)) -> MixedAggregate - Hash Key: unsortable_col Sort Key: unhashable_col Group Key: unhashable_col + Hash Key: unsortable_col -> Seq Scan on gstest4 (7 rows) @@ -1108,9 +1108,9 @@ explain (costs off) Sort Sort Key: (GROUPING(unhashable_col, unsortable_col)), (sum(v)) -> MixedAggregate - Hash Key: v, unsortable_col Sort Key: v, unhashable_col Group Key: v, unhashable_col + Hash Key: v, unsortable_col -> Seq Scan on gstest4 (7 rows) @@ -1149,10 +1149,10 @@ explain (costs off) QUERY PLAN -------------------------------- MixedAggregate - Hash Key: a, b Group Key: () Group Key: () Group Key: () + Hash Key: a, b -> Seq Scan on gstest_empty (6 rows) @@ -1310,10 +1310,10 @@ explain (costs off) -> Sort Sort Key: a, b -> MixedAggregate + Group Key: () Hash Key: a, b Hash Key: a Hash Key: b - Group Key: () -> Seq Scan on gstest2 (11 rows) @@ -1345,10 +1345,10 @@ explain (costs off) Sort Sort Key: gstest_data.a, gstest_data.b -> MixedAggregate + Group Key: () Hash Key: gstest_data.a, gstest_data.b Hash Key: gstest_data.a Hash Key: gstest_data.b - Group Key: () -> Nested Loop -> Values Scan on "*VALUES*" -> Function Scan on gstest_data @@ -1545,16 +1545,16 @@ explain (costs off) QUERY PLAN ---------------------------- MixedAggregate - Hash Key: two - Hash Key: four - Hash Key: ten - Hash Key: hundred Sort Key: unique1 Group Key: unique1 Sort Key: twothousand Group Key: twothousand Sort Key: thousand Group Key: thousand + Hash Key: hundred + Hash Key: ten + Hash Key: four + Hash Key: two -> Seq Scan on tenk1 (12 rows) @@ -1567,12 +1567,12 @@ explain (costs off) QUERY PLAN ------------------------- MixedAggregate - Hash Key: two - Hash Key: four - Hash Key: ten - Hash Key: hundred Sort Key: unique1 Group Key: unique1 + Hash Key: hundred + Hash Key: ten + Hash Key: four + Hash Key: two -> Seq Scan on tenk1 (8 rows) @@ -1586,15 +1586,15 @@ explain (costs off) QUERY PLAN ---------------------------- MixedAggregate - Hash Key: two - Hash Key: four - Hash Key: ten - Hash Key: hundred - Hash Key: thousand Sort Key: unique1 Group Key: unique1 Sort Key: twothousand Group Key: twothousand + Hash Key: thousand + Hash Key: hundred + Hash Key: ten + Hash Key: four + Hash Key: two -> Seq Scan on tenk1 (11 rows) @@ -1684,6 +1684,7 @@ group by cube (g1000,g100,g10); QUERY PLAN --------------------------------------------------- MixedAggregate + Group Key: () Hash Key: (g.g % 1000), (g.g % 100), (g.g % 10) Hash Key: (g.g % 1000), (g.g % 100) Hash Key: (g.g % 1000) @@ -1691,7 +1692,6 @@ group by cube (g1000,g100,g10); Hash Key: (g.g % 100) Hash Key: (g.g % 10), (g.g % 1000) Hash Key: (g.g % 10) - Group Key: () -> Function Scan on generate_series g (10 rows) diff --git a/src/test/regress/expected/partition_aggregate.out b/src/test/regress/expected/partition_aggregate.out index fbc8d3ac6c..7818f02032 100644 --- a/src/test/regress/expected/partition_aggregate.out +++ b/src/test/regress/expected/partition_aggregate.out @@ -340,8 +340,8 @@ SELECT c, sum(a) FROM pagg_tab GROUP BY rollup(c) ORDER BY 1, 2; Sort Sort Key: pagg_tab.c, (sum(pagg_tab.a)) -> MixedAggregate - Hash Key: pagg_tab.c Group Key: () + Hash Key: pagg_tab.c -> Append -> Seq Scan on pagg_tab_p1 pagg_tab_1 -> Seq Scan on pagg_tab_p2 pagg_tab_2 -- 2.14.1