From 2986764514f2310bfe2d1d7d2eacb4e4096e76f8 Mon Sep 17 00:00:00 2001 From: Sergey Soloviev Date: Wed, 3 Dec 2025 16:41:58 +0300 Subject: [PATCH v2 2/4] introduce AGG_INDEX grouping strategy node AGG_INDEX is a new grouping strategy that builds in-memory index and use it for grouping. The main advantage of this approach is that output is ordered by grouping columns and if there are any ORDER BY specified, then it will use this to build grouping/sorting columns. For index it uses B+tree which was implemented in previous commit. And overall it's implementation is very close to AGG_HASHED: - maintain in-memory grouping structure - track memory consuption - if memory limit reached spill data to disk in batches (using hash of key columns) - hash batches are processed one after another and for each batch fill new in-memory structure For this reason many code logic is generalized to support both index and hash implementations: function generalization using boolean arguments (i.e. 'ishash'), rename spill logic members in AggState with prefix 'spill_' instead of 'hash_', etc. Most differences are in spill logic: to preserve sort order in case of disk spill we must dump all indexes to disk to create sorted runs and perform final external merge. One problem is external merge. It's adapted from tuplesort.c - introduce new operational mode - tuplemerge (with it's own prefix). Internally we just setup state accordingly and process as earlier without any significant code changes. Another problem is what tuples to save into sorted runs. We decided to store tuples after projection (when it's aggregates are finalized), because internal transition info is represented by value/isnull/novalue tripple (in AggStatePerGroupData) which is quiet hard to serialize and handle, but actually, after projection all group by attributes are saved, so we can access them during merge. Also, projection applies filter, so it can discard some tuples. --- src/backend/executor/execExpr.c | 31 +- src/backend/executor/nodeAgg.c | 1378 +++++++++++++++++--- src/backend/utils/sort/tuplesort.c | 209 ++- src/backend/utils/sort/tuplesortvariants.c | 105 ++ src/include/executor/executor.h | 10 +- src/include/executor/nodeAgg.h | 33 +- src/include/nodes/execnodes.h | 61 +- src/include/nodes/nodes.h | 1 + src/include/nodes/plannodes.h | 2 +- src/include/utils/tuplesort.h | 17 +- 10 files changed, 1618 insertions(+), 229 deletions(-) diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index c35744b105e..117d7ba31d0 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -94,7 +94,7 @@ static void ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, AggStatePerTrans pertrans, - int transno, int setno, int setoff, bool ishash, + int transno, int setno, int setoff, int strategy, bool nullcheck); static void ExecInitJsonExpr(JsonExpr *jsexpr, ExprState *state, Datum *resv, bool *resnull, @@ -3667,7 +3667,7 @@ ExecInitCoerceToDomain(ExprEvalStep *scratch, CoerceToDomain *ctest, */ ExprState * ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, - bool doSort, bool doHash, bool nullcheck) + int groupStrategy, bool nullcheck) { ExprState *state = makeNode(ExprState); PlanState *parent = &aggstate->ss.ps; @@ -3925,7 +3925,7 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, * grouping set). Do so for both sort and hash based computations, as * applicable. */ - if (doSort) + if (groupStrategy & GROUPING_STRATEGY_SORT) { int processGroupingSets = Max(phase->numsets, 1); int setoff = 0; @@ -3933,13 +3933,13 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, for (int setno = 0; setno < processGroupingSets; setno++) { ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, - pertrans, transno, setno, setoff, false, - nullcheck); + pertrans, transno, setno, setoff, + GROUPING_STRATEGY_SORT, nullcheck); setoff++; } } - if (doHash) + if (groupStrategy & GROUPING_STRATEGY_HASH) { int numHashes = aggstate->num_hashes; int setoff; @@ -3953,12 +3953,19 @@ ExecBuildAggTrans(AggState *aggstate, AggStatePerPhase phase, for (int setno = 0; setno < numHashes; setno++) { ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, - pertrans, transno, setno, setoff, true, - nullcheck); + pertrans, transno, setno, setoff, + GROUPING_STRATEGY_HASH, nullcheck); setoff++; } } + if (groupStrategy & GROUPING_STRATEGY_INDEX) + { + ExecBuildAggTransCall(state, aggstate, &scratch, trans_fcinfo, + pertrans, transno, 0, 0, + GROUPING_STRATEGY_INDEX, nullcheck); + } + /* adjust early bail out jump target(s) */ foreach(bail, adjust_bailout) { @@ -4011,16 +4018,18 @@ static void ExecBuildAggTransCall(ExprState *state, AggState *aggstate, ExprEvalStep *scratch, FunctionCallInfo fcinfo, AggStatePerTrans pertrans, - int transno, int setno, int setoff, bool ishash, + int transno, int setno, int setoff, int strategy, bool nullcheck) { ExprContext *aggcontext; int adjust_jumpnull = -1; - if (ishash) + if (strategy & GROUPING_STRATEGY_HASH) aggcontext = aggstate->hashcontext; - else + else if (strategy & GROUPING_STRATEGY_SORT) aggcontext = aggstate->aggcontexts[setno]; + else + aggcontext = aggstate->indexcontext; /* add check for NULL pointer? */ if (nullcheck) diff --git a/src/backend/executor/nodeAgg.c b/src/backend/executor/nodeAgg.c index a18556f62ec..c5c6b7bfce9 100644 --- a/src/backend/executor/nodeAgg.c +++ b/src/backend/executor/nodeAgg.c @@ -364,7 +364,7 @@ typedef struct FindColsContext Bitmapset *unaggregated; /* other column references */ } FindColsContext; -static void select_current_set(AggState *aggstate, int setno, bool is_hash); +static void select_current_set(AggState *aggstate, int setno, int strategy); static void initialize_phase(AggState *aggstate, int newphase); static TupleTableSlot *fetch_input_tuple(AggState *aggstate); static void initialize_aggregates(AggState *aggstate, @@ -403,8 +403,8 @@ static void find_cols(AggState *aggstate, Bitmapset **aggregated, static bool find_cols_walker(Node *node, FindColsContext *context); static void build_hash_tables(AggState *aggstate); static void build_hash_table(AggState *aggstate, int setno, double nbuckets); -static void hashagg_recompile_expressions(AggState *aggstate, bool minslot, - bool nullcheck); +static void agg_recompile_expressions(AggState *aggstate, bool minslot, + bool nullcheck); static void hash_create_memory(AggState *aggstate); static double hash_choose_num_buckets(double hashentrysize, double ngroups, Size memory); @@ -431,13 +431,13 @@ static HashAggBatch *hashagg_batch_new(LogicalTape *input_tape, int setno, int64 input_tuples, double input_card, int used_bits); static MinimalTuple hashagg_batch_read(HashAggBatch *batch, uint32 *hashp); -static void hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, - int used_bits, double input_groups, - double hashentrysize); -static Size hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, - TupleTableSlot *inputslot, uint32 hash); -static void hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, - int setno); +static void agg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, + int used_bits, double input_groups, + double hashentrysize); +static Size agg_spill_tuple(AggState *aggstate, HashAggSpill *spill, + TupleTableSlot *inputslot, uint32 hash); +static void agg_spill_finish(AggState *aggstate, HashAggSpill *spill, + int setno); static Datum GetAggInitVal(Datum textInitVal, Oid transtype); static void build_pertrans_for_aggref(AggStatePerTrans pertrans, AggState *aggstate, EState *estate, @@ -446,21 +446,27 @@ static void build_pertrans_for_aggref(AggStatePerTrans pertrans, Oid aggdeserialfn, Datum initValue, bool initValueIsNull, Oid *inputTypes, int numArguments); - +static void agg_fill_index(AggState *state); +static TupleTableSlot *agg_retrieve_index(AggState *state); +static void lookup_index_entries(AggState *state); +static void indexagg_finish_initial_spills(AggState *aggstate); +static void index_agg_enter_spill_mode(AggState *aggstate); /* * Select the current grouping set; affects current_set and * curaggcontext. */ static void -select_current_set(AggState *aggstate, int setno, bool is_hash) +select_current_set(AggState *aggstate, int setno, int strategy) { /* * When changing this, also adapt ExecAggPlainTransByVal() and * ExecAggPlainTransByRef(). */ - if (is_hash) + if (strategy == GROUPING_STRATEGY_HASH) aggstate->curaggcontext = aggstate->hashcontext; + else if (strategy == GROUPING_STRATEGY_INDEX) + aggstate->curaggcontext = aggstate->indexcontext; else aggstate->curaggcontext = aggstate->aggcontexts[setno]; @@ -680,7 +686,7 @@ initialize_aggregates(AggState *aggstate, { AggStatePerGroup pergroup = pergroups[setno]; - select_current_set(aggstate, setno, false); + select_current_set(aggstate, setno, GROUPING_STRATEGY_SORT); for (transno = 0; transno < numTrans; transno++) { @@ -1478,7 +1484,7 @@ build_hash_tables(AggState *aggstate) continue; } - memory = aggstate->hash_mem_limit / aggstate->num_hashes; + memory = aggstate->spill_mem_limit / aggstate->num_hashes; /* choose reasonable number of buckets per hashtable */ nbuckets = hash_choose_num_buckets(aggstate->hashentrysize, @@ -1496,7 +1502,7 @@ build_hash_tables(AggState *aggstate) build_hash_table(aggstate, setno, nbuckets); } - aggstate->hash_ngroups_current = 0; + aggstate->spill_ngroups_current = 0; } /* @@ -1728,7 +1734,7 @@ hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace) } /* - * hashagg_recompile_expressions() + * agg_recompile_expressions() * * Identifies the right phase, compiles the right expression given the * arguments, and then sets phase->evalfunc to that expression. @@ -1746,34 +1752,47 @@ hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace) * expressions in the AggStatePerPhase, and reuse when appropriate. */ static void -hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) +agg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) { AggStatePerPhase phase; int i = minslot ? 1 : 0; int j = nullcheck ? 1 : 0; Assert(aggstate->aggstrategy == AGG_HASHED || - aggstate->aggstrategy == AGG_MIXED); + aggstate->aggstrategy == AGG_MIXED || + aggstate->aggstrategy == AGG_INDEX); - if (aggstate->aggstrategy == AGG_HASHED) - phase = &aggstate->phases[0]; - else /* AGG_MIXED */ + if (aggstate->aggstrategy == AGG_MIXED) phase = &aggstate->phases[1]; + else /* AGG_HASHED or AGG_INDEX */ + phase = &aggstate->phases[0]; if (phase->evaltrans_cache[i][j] == NULL) { const TupleTableSlotOps *outerops = aggstate->ss.ps.outerops; bool outerfixed = aggstate->ss.ps.outeropsfixed; - bool dohash = true; - bool dosort = false; + int strategy = 0; - /* - * If minslot is true, that means we are processing a spilled batch - * (inside agg_refill_hash_table()), and we must not advance the - * sorted grouping sets. - */ - if (aggstate->aggstrategy == AGG_MIXED && !minslot) - dosort = true; + switch (aggstate->aggstrategy) + { + case AGG_MIXED: + /* + * If minslot is true, that means we are processing a spilled batch + * (inside agg_refill_hash_table()), and we must not advance the + * sorted grouping sets. + */ + if (!minslot) + strategy |= GROUPING_STRATEGY_SORT; + /* FALLTHROUGH */ + case AGG_HASHED: + strategy |= GROUPING_STRATEGY_HASH; + break; + case AGG_INDEX: + strategy |= GROUPING_STRATEGY_INDEX; + break; + default: + Assert(false); + } /* temporarily change the outerops while compiling the expression */ if (minslot) @@ -1783,8 +1802,7 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) } phase->evaltrans_cache[i][j] = ExecBuildAggTrans(aggstate, phase, - dosort, dohash, - nullcheck); + strategy, nullcheck); /* change back */ aggstate->ss.ps.outerops = outerops; @@ -1803,9 +1821,9 @@ hashagg_recompile_expressions(AggState *aggstate, bool minslot, bool nullcheck) * substantially larger than the initial value. */ void -hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, - Size *mem_limit, uint64 *ngroups_limit, - int *num_partitions) +agg_set_limits(double hashentrysize, double input_groups, int used_bits, + Size *mem_limit, uint64 *ngroups_limit, + int *num_partitions) { int npartitions; Size partition_mem; @@ -1853,6 +1871,18 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, *ngroups_limit = 1; } +static inline bool +agg_spill_required(AggState *aggstate, Size total_mem) +{ + /* + * Don't spill unless there's at least one group in the hash table so we + * can be sure to make progress even in edge cases. + */ + return aggstate->spill_ngroups_current > 0 && + (total_mem > aggstate->spill_mem_limit || + aggstate->spill_ngroups_current > aggstate->spill_ngroups_limit); +} + /* * hash_agg_check_limits * @@ -1863,7 +1893,6 @@ hash_agg_set_limits(double hashentrysize, double input_groups, int used_bits, static void hash_agg_check_limits(AggState *aggstate) { - uint64 ngroups = aggstate->hash_ngroups_current; Size meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true); Size entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt, @@ -1874,7 +1903,7 @@ hash_agg_check_limits(AggState *aggstate) bool do_spill = false; #ifdef USE_INJECTION_POINTS - if (ngroups >= 1000) + if (aggstate->spill_ngroups_current >= 1000) { if (IS_INJECTION_POINT_ATTACHED("hash-aggregate-spill-1000")) { @@ -1888,9 +1917,7 @@ hash_agg_check_limits(AggState *aggstate) * Don't spill unless there's at least one group in the hash table so we * can be sure to make progress even in edge cases. */ - if (aggstate->hash_ngroups_current > 0 && - (total_mem > aggstate->hash_mem_limit || - ngroups > aggstate->hash_ngroups_limit)) + if (agg_spill_required(aggstate, total_mem)) { do_spill = true; } @@ -1899,97 +1926,199 @@ hash_agg_check_limits(AggState *aggstate) hash_agg_enter_spill_mode(aggstate); } +static void +index_agg_check_limits(AggState *aggstate) +{ + Size meta_mem = MemoryContextMemAllocated(aggstate->index_metacxt, + true); + Size node_mem = MemoryContextMemAllocated(aggstate->index_nodecxt, + true); + Size entry_mem = MemoryContextMemAllocated(aggstate->index_entrycxt, + true); + Size tval_mem = MemoryContextMemAllocated(aggstate->indexcontext->ecxt_per_tuple_memory, + true); + Size total_mem = meta_mem + node_mem + entry_mem + tval_mem; + bool do_spill = false; + +#ifdef USE_INJECTION_POINTS + if (aggstate->spill_ngroups_current >= 1000) + { + if (IS_INJECTION_POINT_ATTACHED("index-aggregate-spill-1000")) + { + do_spill = true; + INJECTION_POINT_CACHED("index-aggregate-spill-1000", NULL); + } + } +#endif + + if (agg_spill_required(aggstate, total_mem)) + { + do_spill = true; + } + + if (do_spill) + index_agg_enter_spill_mode(aggstate); +} + /* * Enter "spill mode", meaning that no new groups are added to any of the hash * tables. Tuples that would create a new group are instead spilled, and * processed later. */ -static void -hash_agg_enter_spill_mode(AggState *aggstate) +static inline void +agg_enter_spill_mode(AggState *aggstate, bool ishash) { - INJECTION_POINT("hash-aggregate-enter-spill-mode", NULL); - aggstate->hash_spill_mode = true; - hashagg_recompile_expressions(aggstate, aggstate->table_filled, true); - - if (!aggstate->hash_ever_spilled) + if (ishash) { - Assert(aggstate->hash_tapeset == NULL); - Assert(aggstate->hash_spills == NULL); - - aggstate->hash_ever_spilled = true; - - aggstate->hash_tapeset = LogicalTapeSetCreate(true, NULL, -1); + INJECTION_POINT("hash-aggregate-enter-spill-mode", NULL); + aggstate->spill_mode = true; + agg_recompile_expressions(aggstate, aggstate->table_filled, true); + } + else + { + INJECTION_POINT("index-aggregate-enter-spill-mode", NULL); + aggstate->spill_mode = true; + agg_recompile_expressions(aggstate, aggstate->index_filled, true); + } + + if (!aggstate->spill_ever_happened) + { + Assert(aggstate->spill_tapeset == NULL); + Assert(aggstate->spills == NULL); - aggstate->hash_spills = palloc_array(HashAggSpill, aggstate->num_hashes); + aggstate->spill_ever_happened = true; + aggstate->spill_tapeset = LogicalTapeSetCreate(true, NULL, -1); - for (int setno = 0; setno < aggstate->num_hashes; setno++) + if (ishash) { - AggStatePerHash perhash = &aggstate->perhash[setno]; - HashAggSpill *spill = &aggstate->hash_spills[setno]; - - hashagg_spill_init(spill, aggstate->hash_tapeset, 0, + aggstate->spills = palloc_array(HashAggSpill, aggstate->num_hashes); + + for (int setno = 0; setno < aggstate->num_hashes; setno++) + { + AggStatePerHash perhash = &aggstate->perhash[setno]; + HashAggSpill *spill = &aggstate->spills[setno]; + + agg_spill_init(spill, aggstate->spill_tapeset, 0, perhash->aggnode->numGroups, aggstate->hashentrysize); + } + } + else + { + aggstate->spills = palloc(sizeof(HashAggSpill)); + agg_spill_init(aggstate->spills, aggstate->spill_tapeset, 0, + aggstate->perindex->aggnode->numGroups, + aggstate->hashentrysize); } } } +static void +hash_agg_enter_spill_mode(AggState *aggstate) +{ + agg_enter_spill_mode(aggstate, true); +} + +static void +index_agg_enter_spill_mode(AggState *aggstate) +{ + agg_enter_spill_mode(aggstate, false); +} + /* * Update metrics after filling the hash table. * * If reading from the outer plan, from_tape should be false; if reading from * another tape, from_tape should be true. */ -static void -hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) +static inline void +agg_update_spill_metrics(AggState *aggstate, bool from_tape, int npartitions, bool ishash) { Size meta_mem; Size entry_mem; - Size hashkey_mem; + Size key_mem; Size buffer_mem; Size total_mem; if (aggstate->aggstrategy != AGG_MIXED && - aggstate->aggstrategy != AGG_HASHED) + aggstate->aggstrategy != AGG_HASHED && + aggstate->aggstrategy != AGG_INDEX) return; - /* memory for the hash table itself */ - meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true); - - /* memory for hash entries */ - entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt, true); - - /* memory for byref transition states */ - hashkey_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true); - + if (ishash) + { + /* memory for the hash table itself */ + meta_mem = MemoryContextMemAllocated(aggstate->hash_metacxt, true); + + /* memory for hash entries */ + entry_mem = MemoryContextMemAllocated(aggstate->hash_tuplescxt, true); + + /* memory for byref transition states */ + key_mem = MemoryContextMemAllocated(aggstate->hashcontext->ecxt_per_tuple_memory, true); + } + else + { + /* memory for the index itself */ + meta_mem = MemoryContextMemAllocated(aggstate->index_metacxt, true); + + /* memory for the index nodes */ + meta_mem += MemoryContextMemAllocated(aggstate->index_nodecxt, true); + + /* memory for index entries */ + entry_mem = MemoryContextMemAllocated(aggstate->index_entrycxt, true); + + /* memory for byref transition states */ + key_mem = MemoryContextMemAllocated(aggstate->indexcontext->ecxt_per_tuple_memory, true); + } /* memory for read/write tape buffers, if spilled */ buffer_mem = npartitions * HASHAGG_WRITE_BUFFER_SIZE; if (from_tape) buffer_mem += HASHAGG_READ_BUFFER_SIZE; /* update peak mem */ - total_mem = meta_mem + entry_mem + hashkey_mem + buffer_mem; - if (total_mem > aggstate->hash_mem_peak) - aggstate->hash_mem_peak = total_mem; + total_mem = meta_mem + entry_mem + key_mem + buffer_mem; + if (total_mem > aggstate->spill_mem_peak) + aggstate->spill_mem_peak = total_mem; /* update disk usage */ - if (aggstate->hash_tapeset != NULL) + if (aggstate->spill_tapeset != NULL) { - uint64 disk_used = LogicalTapeSetBlocks(aggstate->hash_tapeset) * (BLCKSZ / 1024); + uint64 disk_used = LogicalTapeSetBlocks(aggstate->spill_tapeset) * (BLCKSZ / 1024); - if (aggstate->hash_disk_used < disk_used) - aggstate->hash_disk_used = disk_used; + if (aggstate->spill_disk_used < disk_used) + aggstate->spill_disk_used = disk_used; } /* update hashentrysize estimate based on contents */ - if (aggstate->hash_ngroups_current > 0) + if (aggstate->spill_ngroups_current > 0) { - aggstate->hashentrysize = - TupleHashEntrySize() + - (hashkey_mem / (double) aggstate->hash_ngroups_current); + if (ishash) + { + aggstate->hashentrysize = + TupleHashEntrySize() + + (key_mem / (double) aggstate->spill_ngroups_current); + } + else + { + /* index stores MinimalTuples directly without any wrapper */ + aggstate->hashentrysize = + (key_mem / (double) aggstate->spill_ngroups_current); + } } } +static void +hash_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) +{ + agg_update_spill_metrics(aggstate, from_tape, npartitions, true); +} + +static void +index_agg_update_metrics(AggState *aggstate, bool from_tape, int npartitions) +{ + agg_update_spill_metrics(aggstate, from_tape, npartitions, false); +} + /* * Create memory contexts used for hash aggregation. */ @@ -2048,6 +2177,33 @@ hash_create_memory(AggState *aggstate) } +/* + * Create memory contexts used for index aggregation. + */ +static void +index_create_memory(AggState *aggstate) +{ + Size maxBlockSize = ALLOCSET_DEFAULT_MAXSIZE; + + aggstate->indexcontext = CreateWorkExprContext(aggstate->ss.ps.state); + + aggstate->index_metacxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt, + "IndexAgg meta context", + ALLOCSET_DEFAULT_SIZES); + aggstate->index_nodecxt = BumpContextCreate(aggstate->ss.ps.state->es_query_cxt, + "IndexAgg node context", + ALLOCSET_SMALL_SIZES); + + maxBlockSize = pg_prevpower2_size_t(work_mem * (Size) 1024 / 16); + maxBlockSize = Min(maxBlockSize, ALLOCSET_DEFAULT_MAXSIZE); + maxBlockSize = Max(maxBlockSize, ALLOCSET_DEFAULT_INITSIZE); + aggstate->index_entrycxt = AllocSetContextCreate(aggstate->ss.ps.state->es_query_cxt, + "IndexAgg table context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + maxBlockSize); +} + /* * Choose a reasonable number of buckets for the initial hash table size. */ @@ -2141,7 +2297,7 @@ initialize_hash_entry(AggState *aggstate, TupleHashTable hashtable, AggStatePerGroup pergroup; int transno; - aggstate->hash_ngroups_current++; + aggstate->spill_ngroups_current++; hash_agg_check_limits(aggstate); /* no need to allocate or initialize per-group state */ @@ -2196,9 +2352,9 @@ lookup_hash_entries(AggState *aggstate) bool *p_isnew; /* if hash table already spilled, don't create new entries */ - p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; + p_isnew = aggstate->spill_mode ? NULL : &isnew; - select_current_set(aggstate, setno, true); + select_current_set(aggstate, setno, GROUPING_STRATEGY_HASH); prepare_hash_slot(perhash, outerslot, hashslot); @@ -2214,15 +2370,15 @@ lookup_hash_entries(AggState *aggstate) } else { - HashAggSpill *spill = &aggstate->hash_spills[setno]; + HashAggSpill *spill = &aggstate->spills[setno]; TupleTableSlot *slot = aggstate->tmpcontext->ecxt_outertuple; if (spill->partitions == NULL) - hashagg_spill_init(spill, aggstate->hash_tapeset, 0, - perhash->aggnode->numGroups, - aggstate->hashentrysize); + agg_spill_init(spill, aggstate->spill_tapeset, 0, + perhash->aggnode->numGroups, + aggstate->hashentrysize); - hashagg_spill_tuple(aggstate, spill, slot, hash); + agg_spill_tuple(aggstate, spill, slot, hash); pergroup[setno] = NULL; } } @@ -2265,6 +2421,12 @@ ExecAgg(PlanState *pstate) case AGG_SORTED: result = agg_retrieve_direct(node); break; + case AGG_INDEX: + if (!node->index_filled) + agg_fill_index(node); + + result = agg_retrieve_index(node); + break; } if (!TupIsNull(result)) @@ -2381,7 +2543,7 @@ agg_retrieve_direct(AggState *aggstate) aggstate->table_filled = true; ResetTupleHashIterator(aggstate->perhash[0].hashtable, &aggstate->perhash[0].hashiter); - select_current_set(aggstate, 0, true); + select_current_set(aggstate, 0, GROUPING_STRATEGY_HASH); return agg_retrieve_hash_table(aggstate); } else @@ -2601,7 +2763,7 @@ agg_retrieve_direct(AggState *aggstate) prepare_projection_slot(aggstate, econtext->ecxt_outertuple, currentSet); - select_current_set(aggstate, currentSet, false); + select_current_set(aggstate, currentSet, GROUPING_STRATEGY_SORT); finalize_aggregates(aggstate, peragg, @@ -2683,19 +2845,19 @@ agg_refill_hash_table(AggState *aggstate) HashAggBatch *batch; AggStatePerHash perhash; HashAggSpill spill; - LogicalTapeSet *tapeset = aggstate->hash_tapeset; + LogicalTapeSet *tapeset = aggstate->spill_tapeset; bool spill_initialized = false; - if (aggstate->hash_batches == NIL) + if (aggstate->spill_batches == NIL) return false; /* hash_batches is a stack, with the top item at the end of the list */ - batch = llast(aggstate->hash_batches); - aggstate->hash_batches = list_delete_last(aggstate->hash_batches); + batch = llast(aggstate->spill_batches); + aggstate->spill_batches = list_delete_last(aggstate->spill_batches); - hash_agg_set_limits(aggstate->hashentrysize, batch->input_card, - batch->used_bits, &aggstate->hash_mem_limit, - &aggstate->hash_ngroups_limit, NULL); + agg_set_limits(aggstate->hashentrysize, batch->input_card, + batch->used_bits, &aggstate->spill_mem_limit, + &aggstate->spill_ngroups_limit, NULL); /* * Each batch only processes one grouping set; set the rest to NULL so @@ -2712,7 +2874,7 @@ agg_refill_hash_table(AggState *aggstate) for (int setno = 0; setno < aggstate->num_hashes; setno++) ResetTupleHashTable(aggstate->perhash[setno].hashtable); - aggstate->hash_ngroups_current = 0; + aggstate->spill_ngroups_current = 0; /* * In AGG_MIXED mode, hash aggregation happens in phase 1 and the output @@ -2726,7 +2888,7 @@ agg_refill_hash_table(AggState *aggstate) aggstate->phase = &aggstate->phases[aggstate->current_phase]; } - select_current_set(aggstate, batch->setno, true); + select_current_set(aggstate, batch->setno, GROUPING_STRATEGY_HASH); perhash = &aggstate->perhash[aggstate->current_set]; @@ -2737,19 +2899,19 @@ agg_refill_hash_table(AggState *aggstate) * We still need the NULL check, because we are only processing one * grouping set at a time and the rest will be NULL. */ - hashagg_recompile_expressions(aggstate, true, true); + agg_recompile_expressions(aggstate, true, true); INJECTION_POINT("hash-aggregate-process-batch", NULL); for (;;) { - TupleTableSlot *spillslot = aggstate->hash_spill_rslot; + TupleTableSlot *spillslot = aggstate->spill_rslot; TupleTableSlot *hashslot = perhash->hashslot; TupleHashTable hashtable = perhash->hashtable; TupleHashEntry entry; MinimalTuple tuple; uint32 hash; bool isnew = false; - bool *p_isnew = aggstate->hash_spill_mode ? NULL : &isnew; + bool *p_isnew = aggstate->spill_mode ? NULL : &isnew; CHECK_FOR_INTERRUPTS(); @@ -2782,11 +2944,11 @@ agg_refill_hash_table(AggState *aggstate) * that we don't assign tapes that will never be used. */ spill_initialized = true; - hashagg_spill_init(&spill, tapeset, batch->used_bits, - batch->input_card, aggstate->hashentrysize); + agg_spill_init(&spill, tapeset, batch->used_bits, + batch->input_card, aggstate->hashentrysize); } /* no memory for a new group, spill */ - hashagg_spill_tuple(aggstate, &spill, spillslot, hash); + agg_spill_tuple(aggstate, &spill, spillslot, hash); aggstate->hash_pergroup[batch->setno] = NULL; } @@ -2806,16 +2968,16 @@ agg_refill_hash_table(AggState *aggstate) if (spill_initialized) { - hashagg_spill_finish(aggstate, &spill, batch->setno); + agg_spill_finish(aggstate, &spill, batch->setno); hash_agg_update_metrics(aggstate, true, spill.npartitions); } else hash_agg_update_metrics(aggstate, true, 0); - aggstate->hash_spill_mode = false; + aggstate->spill_mode = false; /* prepare to walk the first hash table */ - select_current_set(aggstate, batch->setno, true); + select_current_set(aggstate, batch->setno, GROUPING_STRATEGY_HASH); ResetTupleHashIterator(aggstate->perhash[batch->setno].hashtable, &aggstate->perhash[batch->setno].hashiter); @@ -2975,14 +3137,14 @@ agg_retrieve_hash_table_in_memory(AggState *aggstate) } /* - * hashagg_spill_init + * agg_spill_init * * Called after we determined that spilling is necessary. Chooses the number * of partitions to create, and initializes them. */ static void -hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, - double input_groups, double hashentrysize) +agg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, + double input_groups, double hashentrysize) { int npartitions; int partition_bits; @@ -3018,14 +3180,13 @@ hashagg_spill_init(HashAggSpill *spill, LogicalTapeSet *tapeset, int used_bits, } /* - * hashagg_spill_tuple + * agg_spill_tuple * - * No room for new groups in the hash table. Save for later in the appropriate - * partition. + * No room for new groups in memory. Save for later in the appropriate partition. */ static Size -hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, - TupleTableSlot *inputslot, uint32 hash) +agg_spill_tuple(AggState *aggstate, HashAggSpill *spill, + TupleTableSlot *inputslot, uint32 hash) { TupleTableSlot *spillslot; int partition; @@ -3039,7 +3200,7 @@ hashagg_spill_tuple(AggState *aggstate, HashAggSpill *spill, /* spill only attributes that we actually need */ if (!aggstate->all_cols_needed) { - spillslot = aggstate->hash_spill_wslot; + spillslot = aggstate->spill_wslot; slot_getsomeattrs(inputslot, aggstate->max_colno_needed); ExecClearTuple(spillslot); for (int i = 0; i < spillslot->tts_tupleDescriptor->natts; i++) @@ -3167,14 +3328,14 @@ hashagg_finish_initial_spills(AggState *aggstate) int setno; int total_npartitions = 0; - if (aggstate->hash_spills != NULL) + if (aggstate->spills != NULL) { for (setno = 0; setno < aggstate->num_hashes; setno++) { - HashAggSpill *spill = &aggstate->hash_spills[setno]; + HashAggSpill *spill = &aggstate->spills[setno]; total_npartitions += spill->npartitions; - hashagg_spill_finish(aggstate, spill, setno); + agg_spill_finish(aggstate, spill, setno); } /* @@ -3182,21 +3343,21 @@ hashagg_finish_initial_spills(AggState *aggstate) * processing batches of spilled tuples. The initial spill structures * are no longer needed. */ - pfree(aggstate->hash_spills); - aggstate->hash_spills = NULL; + pfree(aggstate->spills); + aggstate->spills = NULL; } hash_agg_update_metrics(aggstate, false, total_npartitions); - aggstate->hash_spill_mode = false; + aggstate->spill_mode = false; } /* - * hashagg_spill_finish + * agg_spill_finish * * Transform spill partitions into new batches. */ static void -hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) +agg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) { int i; int used_bits = 32 - spill->shift; @@ -3223,8 +3384,8 @@ hashagg_spill_finish(AggState *aggstate, HashAggSpill *spill, int setno) new_batch = hashagg_batch_new(tape, setno, spill->ntuples[i], cardinality, used_bits); - aggstate->hash_batches = lappend(aggstate->hash_batches, new_batch); - aggstate->hash_batches_used++; + aggstate->spill_batches = lappend(aggstate->spill_batches, new_batch); + aggstate->spill_batches_used++; } pfree(spill->ntuples); @@ -3239,33 +3400,670 @@ static void hashagg_reset_spill_state(AggState *aggstate) { /* free spills from initial pass */ - if (aggstate->hash_spills != NULL) + if (aggstate->spills != NULL) { int setno; for (setno = 0; setno < aggstate->num_hashes; setno++) { - HashAggSpill *spill = &aggstate->hash_spills[setno]; + HashAggSpill *spill = &aggstate->spills[setno]; pfree(spill->ntuples); pfree(spill->partitions); } - pfree(aggstate->hash_spills); - aggstate->hash_spills = NULL; + pfree(aggstate->spills); + aggstate->spills = NULL; } /* free batches */ - list_free_deep(aggstate->hash_batches); - aggstate->hash_batches = NIL; + list_free_deep(aggstate->spill_batches); + aggstate->spill_batches = NIL; /* close tape set */ - if (aggstate->hash_tapeset != NULL) + if (aggstate->spill_tapeset != NULL) { - LogicalTapeSetClose(aggstate->hash_tapeset); - aggstate->hash_tapeset = NULL; + LogicalTapeSetClose(aggstate->spill_tapeset); + aggstate->spill_tapeset = NULL; } } +static void +agg_fill_index(AggState *aggstate) +{ + AggStatePerIndex perindex = aggstate->perindex; + ExprContext *tmpcontext = aggstate->tmpcontext; + + /* + * Process each outer-plan tuple, and then fetch the next one, until we + * exhaust the outer plan. + */ + for (;;) + { + TupleTableSlot *outerslot; + + outerslot = fetch_input_tuple(aggstate); + if (TupIsNull(outerslot)) + break; + + /* set up for lookup_index_entries and advance_aggregates */ + tmpcontext->ecxt_outertuple = outerslot; + /* insert input tuple to index possibly spilling index to disk */ + lookup_index_entries(aggstate); + + /* Advance the aggregates (or combine functions) */ + advance_aggregates(aggstate); + + /* + * Reset per-input-tuple context after each tuple, but note that the + * hash lookups do this too + */ + ResetExprContext(aggstate->tmpcontext); + } + + /* + * Mark that index filled here, so during after recompilation + * expr will expect MinimalTuple instead of outer plan's one type. + */ + aggstate->index_filled = true; + + indexagg_finish_initial_spills(aggstate); + + /* + * This is useful only when there is no spill occurred and projecting + * occurs in memory, but still initialize it. + */ + select_current_set(aggstate, 0, GROUPING_STRATEGY_INDEX); + InitTupleIndexIterator(perindex->index, &perindex->iter); +} + +/* + * Extract the attributes that make up the grouping key into the + * indexslot. This is necessary to perform comparison in index. + */ +static void +prepare_index_slot(AggStatePerIndex perindex, + TupleTableSlot *inputslot, + TupleTableSlot *indexslot) +{ + slot_getsomeattrs(inputslot, perindex->largestGrpColIdx); + ExecClearTuple(indexslot); + + for (int i = 0; i < perindex->numCols; ++i) + { + int varNumber = perindex->idxKeyColIdxInput[i] - 1; + indexslot->tts_values[i] = inputslot->tts_values[varNumber]; + indexslot->tts_isnull[i] = inputslot->tts_isnull[varNumber]; + } + ExecStoreVirtualTuple(indexslot); +} + +static void +indexagg_reset_spill_state(AggState *aggstate) +{ + /* free spills from initial pass */ + if (aggstate->spills != NULL) + { + HashAggSpill *spill = &aggstate->spills[0]; + pfree(spill->ntuples); + pfree(spill->partitions); + pfree(aggstate->spills); + aggstate->spills = NULL; + } + + /* free batches */ + list_free_deep(aggstate->spill_batches); + aggstate->spill_batches = NIL; + + /* close tape set */ + if (aggstate->spill_tapeset != NULL) + { + LogicalTapeSetClose(aggstate->spill_tapeset); + aggstate->spill_tapeset = NULL; + } +} + +/* + * Initialize a freshly-created MinimalTuple in index + */ +static void +initialize_index_entry(AggState *aggstate, TupleIndex index, TupleIndexEntry entry) +{ + AggStatePerGroup pergroup; + + aggstate->spill_ngroups_current++; + index_agg_check_limits(aggstate); + + /* no need to allocate or initialize per-group state */ + if (aggstate->numtrans == 0) + return; + + pergroup = (AggStatePerGroup) TupleIndexEntryGetAdditional(index, entry); + + /* + * Initialize aggregates for new tuple group, indexagg_lookup_entries() + * already has selected the relevant grouping set. + */ + for (int transno = 0; transno < aggstate->numtrans; ++transno) + { + AggStatePerTrans pertrans = &aggstate->pertrans[transno]; + AggStatePerGroup pergroupstate = &pergroup[transno]; + + initialize_aggregate(aggstate, pertrans, pergroupstate); + } +} + +/* + * Create new sorted run from current in-memory stored index. + */ +static void +indexagg_save_index_run(AggState *aggstate) +{ + AggStatePerIndex perindex = aggstate->perindex; + ExprContext *econtext; + TupleIndexIteratorData iter; + AggStatePerAgg peragg; + TupleTableSlot *firstSlot; + TupleIndexEntry entry; + TupleTableSlot *indexslot; + AggStatePerGroup pergroup; + + econtext = aggstate->ss.ps.ps_ExprContext; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + peragg = aggstate->peragg; + indexslot = perindex->indexslot; + + InitTupleIndexIterator(perindex->index, &iter); + + tuplemerge_start_run(aggstate->mergestate); + + while ((entry = TupleIndexIteratorNext(&iter)) != NULL) + { + MinimalTuple tuple = TupleIndexEntryGetMinimalTuple(entry); + TupleTableSlot *output; + + ResetExprContext(econtext); + ExecStoreMinimalTuple(tuple, indexslot, false); + slot_getallattrs(indexslot); + + ExecClearTuple(firstSlot); + memset(firstSlot->tts_isnull, true, + firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); + + for (int i = 0; i < perindex->numCols; i++) + { + int varNumber = perindex->idxKeyColIdxInput[i] - 1; + + firstSlot->tts_values[varNumber] = indexslot->tts_values[i]; + firstSlot->tts_isnull[varNumber] = indexslot->tts_isnull[i]; + } + ExecStoreVirtualTuple(firstSlot); + + pergroup = (AggStatePerGroup) TupleIndexEntryGetAdditional(perindex->index, entry); + + econtext->ecxt_outertuple = firstSlot; + prepare_projection_slot(aggstate, + econtext->ecxt_outertuple, + aggstate->current_set); + finalize_aggregates(aggstate, peragg, pergroup); + output = project_aggregates(aggstate); + if (output) + tuplemerge_puttupleslot(aggstate->mergestate, output); + } + + tuplemerge_end_run(aggstate->mergestate); +} + +/* + * Fill in index with tuples in given batch. + */ +static void +indexagg_refill_batch(AggState *aggstate, HashAggBatch *batch) +{ + AggStatePerIndex perindex = aggstate->perindex; + TupleTableSlot *spillslot = aggstate->spill_rslot; + TupleTableSlot *indexslot = perindex->indexslot; + TupleIndex index = perindex->index; + LogicalTapeSet *tapeset = aggstate->spill_tapeset; + HashAggSpill spill; + bool spill_initialized = false; + int nspill = 0; + + agg_set_limits(aggstate->hashentrysize, batch->input_card, batch->used_bits, + &aggstate->spill_mem_limit, &aggstate->spill_ngroups_limit, NULL); + + ReScanExprContext(aggstate->indexcontext); + + MemoryContextReset(aggstate->index_entrycxt); + MemoryContextReset(aggstate->index_nodecxt); + ResetTupleIndex(perindex->index); + + aggstate->spill_ngroups_current = 0; + + select_current_set(aggstate, batch->setno, GROUPING_STRATEGY_INDEX); + + agg_recompile_expressions(aggstate, true, true); + + for (;;) + { + MinimalTuple tuple; + TupleIndexEntry entry; + bool isnew = false; + bool *p_isnew; + uint32 hash; + + CHECK_FOR_INTERRUPTS(); + + tuple = hashagg_batch_read(batch, &hash); + if (tuple == NULL) + break; + + ExecStoreMinimalTuple(tuple, spillslot, true); + aggstate->tmpcontext->ecxt_outertuple = spillslot; + + prepare_index_slot(perindex, spillslot, indexslot); + + p_isnew = aggstate->spill_mode ? NULL : &isnew; + entry = TupleIndexLookup(index, indexslot, p_isnew); + + if (entry != NULL) + { + if (isnew) + initialize_index_entry(aggstate, index, entry); + + aggstate->all_pergroups[batch->setno] = TupleIndexEntryGetAdditional(index, entry); + advance_aggregates(aggstate); + } + else + { + if (!spill_initialized) + { + spill_initialized = true; + agg_spill_init(&spill, tapeset, batch->used_bits, + batch->input_card, aggstate->hashentrysize); + } + nspill++; + + agg_spill_tuple(aggstate, &spill, spillslot, hash); + aggstate->all_pergroups[batch->setno] = NULL; + } + + ResetExprContext(aggstate->tmpcontext); + } + + LogicalTapeClose(batch->input_tape); + + if (spill_initialized) + { + agg_spill_finish(aggstate, &spill, 0); + index_agg_update_metrics(aggstate, true, spill.npartitions); + } + else + index_agg_update_metrics(aggstate, true, 0); + + aggstate->spill_mode = false; + select_current_set(aggstate, batch->setno, GROUPING_STRATEGY_INDEX); + + pfree(batch); +} + +static void +indexagg_finish_initial_spills(AggState *aggstate) +{ + HashAggSpill *spill; + AggStatePerIndex perindex; + Sort *sort; + + if (!aggstate->spill_ever_happened) + return; + + Assert(aggstate->spills != NULL); + + spill = aggstate->spills; + agg_spill_finish(aggstate, aggstate->spills, 0); + + index_agg_update_metrics(aggstate, false, spill->npartitions); + aggstate->spill_mode = false; + + pfree(aggstate->spills); + aggstate->spills = NULL; + + perindex = aggstate->perindex; + sort = aggstate->index_sort; + aggstate->mergestate = tuplemerge_begin_heap(aggstate->ss.ps.ps_ResultTupleDesc, + perindex->numKeyCols, + perindex->idxKeyColIdxTL, + sort->sortOperators, + sort->collations, + sort->nullsFirst, + work_mem, NULL); + /* + * Some data was spilled. Index aggregate requires output to be sorted, + * so now we must process all remaining spilled data and produce sorted + * runs for external merge. The first saved run is current opened index. + */ + indexagg_save_index_run(aggstate); + + while (aggstate->spill_batches != NIL) + { + HashAggBatch *batch = llast(aggstate->spill_batches); + aggstate->spill_batches = list_delete_last(aggstate->spill_batches); + + indexagg_refill_batch(aggstate, batch); + indexagg_save_index_run(aggstate); + } + + tuplemerge_performmerge(aggstate->mergestate); +} + +static uint32 +index_calculate_input_slot_hash(AggState *aggstate, + TupleTableSlot *inputslot) +{ + AggStatePerIndex perindex = aggstate->perindex; + MemoryContext oldcxt; + uint32 hash; + bool isnull; + + oldcxt = MemoryContextSwitchTo(aggstate->tmpcontext->ecxt_per_tuple_memory); + + perindex->exprcontext->ecxt_innertuple = inputslot; + hash = DatumGetUInt32(ExecEvalExpr(perindex->indexhashexpr, + perindex->exprcontext, + &isnull)); + + MemoryContextSwitchTo(oldcxt); + + return hash; +} + +/* + * indexagg_lookup_entries + * + * Insert input tuples to in-memory index. + */ +static void +lookup_index_entries(AggState *aggstate) +{ + int numGroupingSets = Max(aggstate->maxsets, 1); + AggStatePerGroup *pergroup = aggstate->all_pergroups; + TupleTableSlot *outerslot = aggstate->tmpcontext->ecxt_outertuple; + + for (int setno = 0; setno < numGroupingSets; ++setno) + { + AggStatePerIndex perindex = &aggstate->perindex[setno]; + TupleIndex index = perindex->index; + TupleTableSlot *indexslot = perindex->indexslot; + TupleIndexEntry entry; + bool isnew = false; + bool *p_isnew; + + p_isnew = aggstate->spill_mode ? NULL : &isnew; + select_current_set(aggstate, setno, GROUPING_STRATEGY_INDEX); + + prepare_index_slot(perindex, outerslot, indexslot); + + /* Lookup entry in btree */ + entry = TupleIndexLookup(perindex->index, indexslot, p_isnew); + + /* For now everything is stored in memory - no disk spills */ + if (entry != NULL) + { + /* Initialize it's trans state if just created */ + if (isnew) + initialize_index_entry(aggstate, index, entry); + + pergroup[setno] = TupleIndexEntryGetAdditional(index, entry); + } + else + { + HashAggSpill *spill = &aggstate->spills[setno]; + uint32 hash; + + if (spill->partitions == NULL) + { + agg_spill_init(spill, aggstate->spill_tapeset, 0, + perindex->aggnode->numGroups, + aggstate->hashentrysize); + } + + hash = index_calculate_input_slot_hash(aggstate, indexslot); + agg_spill_tuple(aggstate, spill, outerslot, hash); + pergroup[setno] = NULL; + } + } +} + +static TupleTableSlot * +agg_retrieve_index_in_memory(AggState *aggstate) +{ + ExprContext *econtext; + TupleTableSlot *firstSlot; + AggStatePerIndex perindex; + AggStatePerAgg peragg; + AggStatePerGroup pergroup; + TupleTableSlot *result; + + econtext = aggstate->ss.ps.ps_ExprContext; + firstSlot = aggstate->ss.ss_ScanTupleSlot; + peragg = aggstate->peragg; + perindex = &aggstate->perindex[aggstate->current_set]; + + for (;;) + { + TupleIndexEntry entry; + TupleTableSlot *indexslot = perindex->indexslot; + + CHECK_FOR_INTERRUPTS(); + + entry = TupleIndexIteratorNext(&perindex->iter); + if (entry == NULL) + return NULL; + + ResetExprContext(econtext); + ExecStoreMinimalTuple(TupleIndexEntryGetMinimalTuple(entry), indexslot, false); + slot_getallattrs(indexslot); + + ExecClearTuple(firstSlot); + memset(firstSlot->tts_isnull, true, + firstSlot->tts_tupleDescriptor->natts * sizeof(bool)); + + for (int i = 0; i < perindex->numCols; i++) + { + int varNumber = perindex->idxKeyColIdxInput[i] - 1; + + firstSlot->tts_values[varNumber] = indexslot->tts_values[i]; + firstSlot->tts_isnull[varNumber] = indexslot->tts_isnull[i]; + } + ExecStoreVirtualTuple(firstSlot); + + pergroup = (AggStatePerGroup) TupleIndexEntryGetAdditional(perindex->index, entry); + + econtext->ecxt_outertuple = firstSlot; + prepare_projection_slot(aggstate, + econtext->ecxt_outertuple, + aggstate->current_set); + finalize_aggregates(aggstate, peragg, pergroup); + result = project_aggregates(aggstate); + if (result) + return result; + } + + /* no more groups */ + return NULL; +} + +static TupleTableSlot * +agg_retrieve_index_merge(AggState *aggstate) +{ + AggStatePerIndex perindex = aggstate->perindex; + TupleTableSlot *slot = perindex->mergeslot; + TupleTableSlot *resultslot = aggstate->ss.ps.ps_ResultTupleSlot; + + ExecClearTuple(slot); + + if (!tuplesort_gettupleslot(aggstate->mergestate, true, true, slot, NULL)) + return NULL; + + slot_getallattrs(slot); + ExecClearTuple(resultslot); + + for (int i = 0; i < resultslot->tts_tupleDescriptor->natts; ++i) + { + resultslot->tts_values[i] = slot->tts_values[i]; + resultslot->tts_isnull[i] = slot->tts_isnull[i]; + } + ExecStoreVirtualTuple(resultslot); + + return resultslot; +} + +static TupleTableSlot * +agg_retrieve_index(AggState *aggstate) +{ + if (aggstate->spill_ever_happened) + return agg_retrieve_index_merge(aggstate); + else + return agg_retrieve_index_in_memory(aggstate); +} + +static void +build_index(AggState *aggstate) +{ + AggStatePerIndex perindex = aggstate->perindex; + MemoryContext metacxt = aggstate->index_metacxt; + MemoryContext entrycxt = aggstate->index_entrycxt; + MemoryContext nodecxt = aggstate->index_nodecxt; + MemoryContext oldcxt; + Size additionalsize; + Oid *eqfuncoids; + Sort *sort; + + Assert(aggstate->aggstrategy == AGG_INDEX); + + additionalsize = aggstate->numtrans * sizeof(AggStatePerGroupData); + sort = aggstate->index_sort; + + /* inmem index */ + perindex->index = BuildTupleIndex(perindex->indexslot->tts_tupleDescriptor, + perindex->numKeyCols, + perindex->idxKeyColIdxIndex, + sort->sortOperators, + sort->collations, + sort->nullsFirst, + additionalsize, + metacxt, + entrycxt, + nodecxt); + + /* disk spill logic */ + oldcxt = MemoryContextSwitchTo(metacxt); + execTuplesHashPrepare(perindex->numKeyCols, perindex->aggnode->grpOperators, + &eqfuncoids, &perindex->hashfunctions); + perindex->indexhashexpr = + ExecBuildHash32FromAttrs(perindex->indexslot->tts_tupleDescriptor, + perindex->indexslot->tts_ops, + perindex->hashfunctions, + perindex->aggnode->grpCollations, + perindex->numKeyCols, + perindex->idxKeyColIdxIndex, + &aggstate->ss.ps, + 0); + perindex->exprcontext = CreateStandaloneExprContext(); + MemoryContextSwitchTo(oldcxt); +} + +static void +find_index_columns(AggState *aggstate) +{ + Bitmapset *base_colnos; + Bitmapset *aggregated_colnos; + TupleDesc scanDesc = aggstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + List *outerTlist = outerPlanState(aggstate)->plan->targetlist; + EState *estate = aggstate->ss.ps.state; + AggStatePerIndex perindex; + Bitmapset *colnos; + AttrNumber *sortColIdx; + List *indexTlist = NIL; + TupleDesc indexDesc; + int maxCols; + int i; + + find_cols(aggstate, &aggregated_colnos, &base_colnos); + aggstate->colnos_needed = bms_union(base_colnos, aggregated_colnos); + aggstate->max_colno_needed = 0; + aggstate->all_cols_needed = true; + + for (i = 0; i < scanDesc->natts; i++) + { + int colno = i + 1; + + if (bms_is_member(colno, aggstate->colnos_needed)) + aggstate->max_colno_needed = colno; + else + aggstate->all_cols_needed = false; + } + + perindex = aggstate->perindex; + colnos = bms_copy(base_colnos); + + if (aggstate->phases[0].grouped_cols) + { + Bitmapset *grouped_cols = aggstate->phases[0].grouped_cols[0]; + ListCell *lc; + foreach(lc, aggstate->all_grouped_cols) + { + int attnum = lfirst_int(lc); + if (!bms_is_member(attnum, grouped_cols)) + colnos = bms_del_member(colnos, attnum); + } + } + + maxCols = bms_num_members(colnos) + perindex->numKeyCols; + + perindex->idxKeyColIdxInput = palloc(maxCols * sizeof(AttrNumber)); + perindex->idxKeyColIdxIndex = palloc(perindex->numKeyCols * sizeof(AttrNumber)); + + /* Add all the sorting/grouping columns to colnos */ + sortColIdx = aggstate->index_sort->sortColIdx; + for (i = 0; i < perindex->numKeyCols; i++) + colnos = bms_add_member(colnos, sortColIdx[i]); + + for (i = 0; i < perindex->numKeyCols; i++) + { + perindex->idxKeyColIdxInput[i] = sortColIdx[i]; + perindex->idxKeyColIdxIndex[i] = i + 1; + + perindex->numCols++; + /* delete already mapped columns */ + colnos = bms_del_member(colnos, sortColIdx[i]); + } + + /* and the remainig columns */ + i = -1; + while ((i = bms_next_member(colnos, i)) >= 0) + { + perindex->idxKeyColIdxInput[perindex->numCols] = i; + perindex->numCols++; + } + + /* build tuple descriptor for the index */ + perindex->largestGrpColIdx = 0; + for (i = 0; i < perindex->numCols; i++) + { + int varNumber = perindex->idxKeyColIdxInput[i] - 1; + + indexTlist = lappend(indexTlist, list_nth(outerTlist, varNumber)); + perindex->largestGrpColIdx = Max(varNumber + 1, perindex->largestGrpColIdx); + } + + indexDesc = ExecTypeFromTL(indexTlist); + perindex->indexslot = ExecAllocTableSlot(&estate->es_tupleTable, indexDesc, + &TTSOpsMinimalTuple); + list_free(indexTlist); + bms_free(colnos); + + bms_free(base_colnos); +} /* ----------------- * ExecInitAgg @@ -3297,10 +4095,12 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) int numGroupingSets = 1; int numPhases; int numHashes; + int numIndexes; int i = 0; int j = 0; bool use_hashing = (node->aggstrategy == AGG_HASHED || node->aggstrategy == AGG_MIXED); + bool use_index = (node->aggstrategy == AGG_INDEX); /* check for unsupported flags */ Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); @@ -3337,6 +4137,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) */ numPhases = (use_hashing ? 1 : 2); numHashes = (use_hashing ? 1 : 0); + numIndexes = (use_index ? 1 : 0); /* * Calculate the maximum number of grouping sets in any phase; this @@ -3356,7 +4157,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * additional AGG_HASHED aggs become part of phase 0, but all - * others add an extra phase. + * others add an extra phase. AGG_INDEX does not support grouping + * sets, so else branch must be AGG_SORTED or AGG_MIXED. */ if (agg->aggstrategy != AGG_HASHED) ++numPhases; @@ -3395,6 +4197,8 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) if (use_hashing) hash_create_memory(aggstate); + else if (use_index) + index_create_memory(aggstate); ExecAssignExprContext(estate, &aggstate->ss.ps); @@ -3501,6 +4305,13 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->phases[0].gset_lengths = palloc_array(int, numHashes); aggstate->phases[0].grouped_cols = palloc_array(Bitmapset *, numHashes); } + else if (numIndexes) + { + aggstate->perindex = palloc0(sizeof(AggStatePerIndexData) * numIndexes); + aggstate->phases[0].numsets = 0; + aggstate->phases[0].gset_lengths = palloc(numIndexes * sizeof(int)); + aggstate->phases[0].grouped_cols = palloc(numIndexes * sizeof(Bitmapset *)); + } phase = 0; for (phaseidx = 0; phaseidx <= list_length(node->chain); ++phaseidx) @@ -3513,6 +4324,18 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggnode = list_nth_node(Agg, node->chain, phaseidx - 1); sortnode = castNode(Sort, outerPlan(aggnode)); } + else if (use_index) + { + Assert(list_length(node->chain) == 1); + + aggnode = node; + sortnode = castNode(Sort, linitial(node->chain)); + /* + * list contains single element, so we must adjust loop variable, + * so it will be single iteration at all. + */ + phaseidx++; + } else { aggnode = node; @@ -3549,6 +4372,35 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) all_grouped_cols = bms_add_members(all_grouped_cols, cols); continue; } + else if (aggnode->aggstrategy == AGG_INDEX) + { + AggStatePerPhase phasedata = &aggstate->phases[0]; + AggStatePerIndex perindex; + Bitmapset *cols; + + Assert(phase == 0); + Assert(sortnode); + + i = phasedata->numsets++; + + /* phase 0 always points to the "real" Agg in the index case */ + phasedata->aggnode = node; + phasedata->aggstrategy = node->aggstrategy; + phasedata->sortnode = sortnode; + + perindex = &aggstate->perindex[i]; + perindex->aggnode = aggnode; + aggstate->index_sort = sortnode; + + phasedata->gset_lengths[i] = perindex->numKeyCols = aggnode->numCols; + + cols = NULL; + for (j = 0; j < aggnode->numCols; ++j) + cols = bms_add_member(cols, aggnode->grpColIdx[j]); + + phasedata->grouped_cols[i] = cols; + all_grouped_cols = bms_add_members(all_grouped_cols, cols); + } else { AggStatePerPhase phasedata = &aggstate->phases[++phase]; @@ -3666,7 +4518,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) aggstate->all_pergroups = palloc0_array(AggStatePerGroup, numGroupingSets + numHashes); pergroups = aggstate->all_pergroups; - if (node->aggstrategy != AGG_HASHED) + if (node->aggstrategy != AGG_HASHED && node->aggstrategy != AGG_INDEX) { for (i = 0; i < numGroupingSets; i++) { @@ -3680,18 +4532,15 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) /* * Hashing can only appear in the initial phase. */ - if (use_hashing) + if (use_hashing || use_index) { Plan *outerplan = outerPlan(node); double totalGroups = 0; - aggstate->hash_spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc, - &TTSOpsMinimalTuple); - aggstate->hash_spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc, - &TTSOpsVirtual); - - /* this is an array of pointers, not structures */ - aggstate->hash_pergroup = pergroups; + aggstate->spill_rslot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsMinimalTuple); + aggstate->spill_wslot = ExecInitExtraTupleSlot(estate, scanDesc, + &TTSOpsVirtual); aggstate->hashentrysize = hash_agg_entry_size(aggstate->numtrans, outerplan->plan_width, @@ -3706,20 +4555,115 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) for (int k = 0; k < aggstate->num_hashes; k++) totalGroups += aggstate->perhash[k].aggnode->numGroups; - hash_agg_set_limits(aggstate->hashentrysize, totalGroups, 0, - &aggstate->hash_mem_limit, - &aggstate->hash_ngroups_limit, - &aggstate->hash_planned_partitions); - find_hash_columns(aggstate); + agg_set_limits(aggstate->hashentrysize, totalGroups, 0, + &aggstate->spill_mem_limit, + &aggstate->spill_ngroups_limit, + &aggstate->spill_planned_partitions); - /* Skip massive memory allocation if we are just doing EXPLAIN */ - if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) - build_hash_tables(aggstate); + if (use_hashing) + { + /* this is an array of pointers, not structures */ + aggstate->hash_pergroup = pergroups; + + find_hash_columns(aggstate); + + /* Skip massive memory allocation if we are just doing EXPLAIN */ + if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + build_hash_tables(aggstate); + aggstate->table_filled = false; + } + else + { + find_index_columns(aggstate); + + if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY)) + build_index(aggstate); + aggstate->index_filled = false; + } - aggstate->table_filled = false; /* Initialize this to 1, meaning nothing spilled, yet */ - aggstate->hash_batches_used = 1; + aggstate->spill_batches_used = 1; + } + + /* + * For index merge disk spill may be required and we perform external + * merge for this purpose. But stored tuples are already projected, so + * have different TupleDesc than used in-memory (inputDesc and indexDesc). + */ + if (use_index) + { + AggStatePerIndex perindex = aggstate->perindex; + ListCell *lc; + List *targetlist = aggstate->ss.ps.plan->targetlist; + AttrNumber *attr_mapping_tl = + palloc0(sizeof(AttrNumber) * list_length(targetlist)); + AttrNumber *keyColIdxResult; + + /* + * Build grouping column attribute mapping and store it in + * attr_mapping_tl. If there is no such mapping (projected), then + * InvalidAttrNumber is set, otherwise index in indexDesc column + * storing this attribute. + */ + foreach (lc, targetlist) + { + TargetEntry *te = (TargetEntry *)lfirst(lc); + Var *group_var; + + /* All grouping expressions in targetlist stored as OUTER Vars */ + if (!IsA(te->expr, Var)) + continue; + + group_var = (Var *)te->expr; + if (group_var->varno != OUTER_VAR) + continue; + + attr_mapping_tl[foreach_current_index(lc)] = group_var->varattno; + } + + /* Mapping is built and now create reverse mapping */ + keyColIdxResult = palloc0(sizeof(AttrNumber) * list_length(outerPlan(node)->targetlist)); + for (i = 0; i < list_length(targetlist); ++i) + { + AttrNumber outer_attno = attr_mapping_tl[i]; + AttrNumber existingIdx; + + if (!AttributeNumberIsValid(outer_attno)) + continue; + + existingIdx = keyColIdxResult[outer_attno - 1]; + + /* attnumbers can duplicate, so use first ones */ + if (AttributeNumberIsValid(existingIdx) && existingIdx <= outer_attno) + continue; + + /* + * column can be referenced in query but planner can decide to + * remove is from grouping. + */ + if (!bms_is_member(outer_attno, all_grouped_cols)) + continue; + + keyColIdxResult[outer_attno - 1] = i + 1; + } + + perindex->idxKeyColIdxTL = palloc(sizeof(AttrNumber) * perindex->numKeyCols); + for (i = 0; i < perindex->numKeyCols; ++i) + { + AttrNumber attno = keyColIdxResult[perindex->idxKeyColIdxInput[i] - 1]; + if (!AttributeNumberIsValid(attno)) + elog(ERROR, "could not locate group by attributes in targetlist for index mapping"); + + perindex->idxKeyColIdxTL[i] = attno; + } + + pfree(attr_mapping_tl); + pfree(keyColIdxResult); + + perindex->mergeslot = ExecInitExtraTupleSlot(estate, + aggstate->ss.ps.ps_ResultTupleDesc, + &TTSOpsMinimalTuple); } /* @@ -3732,13 +4676,19 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) { aggstate->current_phase = 0; initialize_phase(aggstate, 0); - select_current_set(aggstate, 0, true); + select_current_set(aggstate, 0, GROUPING_STRATEGY_HASH); + } + else if (node->aggstrategy == AGG_INDEX) + { + aggstate->current_phase = 0; + initialize_phase(aggstate, 0); + select_current_set(aggstate, 0, GROUPING_STRATEGY_INDEX); } else { aggstate->current_phase = 1; initialize_phase(aggstate, 1); - select_current_set(aggstate, 0, false); + select_current_set(aggstate, 0, GROUPING_STRATEGY_SORT); } /* @@ -4066,8 +5016,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) for (phaseidx = 0; phaseidx < aggstate->numphases; phaseidx++) { AggStatePerPhase phase = &aggstate->phases[phaseidx]; - bool dohash = false; - bool dosort = false; + int strategy; /* phase 0 doesn't necessarily exist */ if (!phase->aggnode) @@ -4079,8 +5028,7 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) * Phase one, and only phase one, in a mixed agg performs both * sorting and aggregation. */ - dohash = true; - dosort = true; + strategy = GROUPING_STRATEGY_HASH | GROUPING_STRATEGY_SORT; } else if (aggstate->aggstrategy == AGG_MIXED && phaseidx == 0) { @@ -4094,19 +5042,20 @@ ExecInitAgg(Agg *node, EState *estate, int eflags) else if (phase->aggstrategy == AGG_PLAIN || phase->aggstrategy == AGG_SORTED) { - dohash = false; - dosort = true; + strategy = GROUPING_STRATEGY_SORT; } else if (phase->aggstrategy == AGG_HASHED) { - dohash = true; - dosort = false; + strategy = GROUPING_STRATEGY_HASH; + } + else if (phase->aggstrategy == AGG_INDEX) + { + strategy = GROUPING_STRATEGY_INDEX; } else Assert(false); - phase->evaltrans = ExecBuildAggTrans(aggstate, phase, dosort, dohash, - false); + phase->evaltrans = ExecBuildAggTrans(aggstate, phase, strategy, false); /* cache compiled expression for outer slot without NULL check */ phase->evaltrans_cache[0][0] = phase->evaltrans; @@ -4409,9 +5358,9 @@ ExecEndAgg(AggState *node) Assert(ParallelWorkerNumber <= node->shared_info->num_workers); si = &node->shared_info->sinstrument[ParallelWorkerNumber]; - si->hash_batches_used = node->hash_batches_used; - si->hash_disk_used = node->hash_disk_used; - si->hash_mem_peak = node->hash_mem_peak; + si->hash_batches_used = node->spill_batches_used; + si->hash_disk_used = node->spill_disk_used; + si->hash_mem_peak = node->spill_mem_peak; } /* Make sure we have closed any open tuplesorts */ @@ -4421,7 +5370,10 @@ ExecEndAgg(AggState *node) if (node->sort_out) tuplesort_end(node->sort_out); - hashagg_reset_spill_state(node); + if (node->aggstrategy == AGG_INDEX) + indexagg_reset_spill_state(node); + else + hashagg_reset_spill_state(node); /* Release hash tables too */ if (node->hash_metacxt != NULL) @@ -4434,6 +5386,26 @@ ExecEndAgg(AggState *node) MemoryContextDelete(node->hash_tuplescxt); node->hash_tuplescxt = NULL; } + if (node->index_metacxt != NULL) + { + MemoryContextDelete(node->index_metacxt); + node->index_metacxt = NULL; + } + if (node->index_entrycxt != NULL) + { + MemoryContextDelete(node->index_entrycxt); + node->index_entrycxt = NULL; + } + if (node->index_nodecxt != NULL) + { + MemoryContextDelete(node->index_nodecxt); + node->index_nodecxt = NULL; + } + if (node->mergestate) + { + tuplesort_end(node->mergestate); + node->mergestate = NULL; + } for (transno = 0; transno < node->numtrans; transno++) { @@ -4451,6 +5423,8 @@ ExecEndAgg(AggState *node) ReScanExprContext(node->aggcontexts[setno]); if (node->hashcontext) ReScanExprContext(node->hashcontext); + if (node->indexcontext) + ReScanExprContext(node->indexcontext); outerPlan = outerPlanState(node); ExecEndNode(outerPlan); @@ -4486,12 +5460,27 @@ ExecReScanAgg(AggState *node) * we can just rescan the existing hash table; no need to build it * again. */ - if (outerPlan->chgParam == NULL && !node->hash_ever_spilled && + if (outerPlan->chgParam == NULL && !node->spill_ever_happened && !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) { ResetTupleHashIterator(node->perhash[0].hashtable, &node->perhash[0].hashiter); - select_current_set(node, 0, true); + select_current_set(node, 0, GROUPING_STRATEGY_HASH); + return; + } + } + + if (node->aggstrategy == AGG_INDEX) + { + if (!node->index_filled) + return; + + if (outerPlan->chgParam == NULL && !node->spill_ever_happened && + !bms_overlap(node->ss.ps.chgParam, aggnode->aggParams)) + { + AggStatePerIndex perindex = node->perindex; + ResetTupleIndexIterator(perindex->index, &perindex->iter); + select_current_set(node, 0, GROUPING_STRATEGY_INDEX); return; } } @@ -4545,9 +5534,9 @@ ExecReScanAgg(AggState *node) { hashagg_reset_spill_state(node); - node->hash_ever_spilled = false; - node->hash_spill_mode = false; - node->hash_ngroups_current = 0; + node->spill_ever_happened = false; + node->spill_mode = false; + node->spill_ngroups_current = 0; ReScanExprContext(node->hashcontext); /* Rebuild empty hash table(s) */ @@ -4555,10 +5544,33 @@ ExecReScanAgg(AggState *node) node->table_filled = false; /* iterator will be reset when the table is filled */ - hashagg_recompile_expressions(node, false, false); + agg_recompile_expressions(node, false, false); } - if (node->aggstrategy != AGG_HASHED) + if (node->aggstrategy == AGG_INDEX) + { + indexagg_reset_spill_state(node); + + node->spill_ever_happened = false; + node->spill_mode = false; + node->spill_ngroups_current = 0; + + ReScanExprContext(node->indexcontext); + MemoryContextReset(node->index_entrycxt); + MemoryContextReset(node->index_nodecxt); + + build_index(node); + node->index_filled = false; + + agg_recompile_expressions(node, false, false); + + if (node->mergestate) + { + tuplesort_end(node->mergestate); + node->mergestate = NULL; + } + } + else if (node->aggstrategy != AGG_HASHED) { /* * Reset the per-group state (in particular, mark transvalues null) diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index 88ae529e843..fc349707778 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -1900,6 +1900,7 @@ static void inittapestate(Tuplesortstate *state, int maxTapes) { int64 tapeSpace; + Size memtuplesSize; /* * Decrease availMem to reflect the space needed for tape buffers; but @@ -1912,7 +1913,16 @@ inittapestate(Tuplesortstate *state, int maxTapes) */ tapeSpace = (int64) maxTapes * TAPE_BUFFER_OVERHEAD; - if (tapeSpace + GetMemoryChunkSpace(state->memtuples) < state->allowedMem) + /* + * In merge state during initial run creation we do not use in-memory + * tuples array and write to tapes directly. + */ + if (state->memtuples != NULL) + memtuplesSize = GetMemoryChunkSpace(state->memtuples); + else + memtuplesSize = 0; + + if (tapeSpace + memtuplesSize < state->allowedMem) USEMEM(state, tapeSpace); /* @@ -2031,11 +2041,14 @@ mergeruns(Tuplesortstate *state) /* * We no longer need a large memtuples array. (We will allocate a smaller - * one for the heap later.) + * one for the heap later.) Note that in merge state this array can be NULL. */ - FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); - pfree(state->memtuples); - state->memtuples = NULL; + if (state->memtuples) + { + FREEMEM(state, GetMemoryChunkSpace(state->memtuples)); + pfree(state->memtuples); + state->memtuples = NULL; + } /* * Initialize the slab allocator. We need one slab slot per input tape, @@ -3157,3 +3170,189 @@ ssup_datum_int32_cmp(Datum x, Datum y, SortSupport ssup) else return 0; } + +/* + * tuplemerge_begin_common + * + * Create new Tuplesortstate for performing merge only. This is used when + * we know, that input is sorted, but stored in multiple tapes, so only + * have to perform merge. + * + * Unlike tuplesort_begin_common it does not accept sortopt, because none + * of current options are supported by merge (random access and bounded sort). + */ +Tuplesortstate * +tuplemerge_begin_common(int workMem, SortCoordinate coordinate) +{ + Tuplesortstate *state; + MemoryContext maincontext; + MemoryContext sortcontext; + MemoryContext oldcontext; + + /* + * Memory context surviving tuplesort_reset. This memory context holds + * data which is useful to keep while sorting multiple similar batches. + */ + maincontext = AllocSetContextCreate(CurrentMemoryContext, + "TupleMerge main", + ALLOCSET_DEFAULT_SIZES); + + /* + * Create a working memory context for one sort operation. The content of + * this context is deleted by tuplesort_reset. + */ + sortcontext = AllocSetContextCreate(maincontext, + "TupleMerge merge", + ALLOCSET_DEFAULT_SIZES); + + /* + * Make the Tuplesortstate within the per-sortstate context. This way, we + * don't need a separate pfree() operation for it at shutdown. + */ + oldcontext = MemoryContextSwitchTo(maincontext); + + state = (Tuplesortstate *) palloc0(sizeof(Tuplesortstate)); + + if (trace_sort) + pg_rusage_init(&state->ru_start); + + state->base.sortopt = TUPLESORT_NONE; + state->base.tuples = true; + state->abbrevNext = 10; + + /* + * workMem is forced to be at least 64KB, the current minimum valid value + * for the work_mem GUC. This is a defense against parallel sort callers + * that divide out memory among many workers in a way that leaves each + * with very little memory. + */ + state->allowedMem = Max(workMem, 64) * (int64) 1024; + state->base.sortcontext = sortcontext; + state->base.maincontext = maincontext; + + /* + * After all of the other non-parallel-related state, we setup all of the + * state needed for each batch. + */ + + /* + * Merging do not accept RANDOMACCESS, so only possible context is Bump, + * which saves some cycles. + */ + state->base.tuplecontext = BumpContextCreate(state->base.sortcontext, + "Caller tuples", + ALLOCSET_DEFAULT_SIZES); + + state->status = TSS_BUILDRUNS; + state->bounded = false; + state->boundUsed = false; + state->availMem = state->allowedMem; + + /* + * When performing merge we do not need in-memory array for sorting. + * Even if we do not use memtuples, still allocate it, but make it empty. + * So if someone will invoke inappropriate function in merge mode we will + * not fail. + */ + state->memtuples = NULL; + state->memtupcount = 0; + state->memtupsize = INITIAL_MEMTUPSIZE; + state->growmemtuples = true; + state->slabAllocatorUsed = false; + + /* + * Tape variables (inputTapes, outputTapes, etc.) will be initialized by + * inittapes(), if needed. + */ + state->result_tape = NULL; /* flag that result tape has not been formed */ + state->tapeset = NULL; + + inittapes(state, true); + + /* + * Initialize parallel-related state based on coordination information + * from caller + */ + if (!coordinate) + { + /* Serial sort */ + state->shared = NULL; + state->worker = -1; + state->nParticipants = -1; + } + else if (coordinate->isWorker) + { + /* Parallel worker produces exactly one final run from all input */ + state->shared = coordinate->sharedsort; + state->worker = worker_get_identifier(state); + state->nParticipants = -1; + } + else + { + /* Parallel leader state only used for final merge */ + state->shared = coordinate->sharedsort; + state->worker = -1; + state->nParticipants = coordinate->nParticipants; + Assert(state->nParticipants >= 1); + } + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +void +tuplemerge_start_run(Tuplesortstate *state) +{ + if (state->memtupcount == 0) + return; + + selectnewtape(state); + state->memtupcount = 0; +} + +void +tuplemerge_performmerge(Tuplesortstate *state) +{ + if (state->memtupcount == 0) + { + /* + * We have started new run, but no tuples were written. mergeruns + * expects that each run have at least 1 tuple, otherwise it + * will fail to even fill initial merge heap. + */ + state->nOutputRuns--; + } + else + state->memtupcount = 0; + + mergeruns(state); + + state->current = 0; + state->eof_reached = false; + state->markpos_block = 0L; + state->markpos_offset = 0; + state->markpos_eof = false; +} + +void +tuplemerge_puttuple_common(Tuplesortstate *state, SortTuple *tuple, Size tuplen) +{ + MemoryContext oldcxt = MemoryContextSwitchTo(state->base.sortcontext); + + Assert(state->destTape); + WRITETUP(state, state->destTape, tuple); + + MemoryContextSwitchTo(oldcxt); + + state->memtupcount++; +} + +void +tuplemerge_end_run(Tuplesortstate *state) +{ + if (state->memtupcount != 0) + { + markrunend(state->destTape); + } +} diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index 079a51c474d..5f8afa8a17a 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -2071,3 +2071,108 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, if (base->sortopt & TUPLESORT_RANDOMACCESS) /* need trailing length word? */ LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); } + +Tuplesortstate * +tuplemerge_begin_heap(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, Oid *sortCollations, + bool *nullsFirstFlags, + int workMem, SortCoordinate coordinate) +{ + Tuplesortstate *state = tuplemerge_begin_common(workMem, coordinate); + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext; + int i; + + oldcontext = MemoryContextSwitchTo(base->maincontext); + + Assert(nkeys > 0); + + if (trace_sort) + elog(LOG, + "begin tuple merge: nkeys = %d, workMem = %d", nkeys, workMem); + + base->nKeys = nkeys; + + TRACE_POSTGRESQL_SORT_START(HEAP_SORT, + false, /* no unique check */ + nkeys, + workMem, + false, + PARALLEL_SORT(coordinate)); + + base->removeabbrev = removeabbrev_heap; + base->comparetup = comparetup_heap; + base->comparetup_tiebreak = comparetup_heap_tiebreak; + base->writetup = writetup_heap; + base->readtup = readtup_heap; + base->haveDatum1 = true; + base->arg = tupDesc; /* assume we need not copy tupDesc */ + + /* Prepare SortSupport data for each column */ + base->sortKeys = (SortSupport) palloc0(nkeys * sizeof(SortSupportData)); + + for (i = 0; i < nkeys; i++) + { + SortSupport sortKey = base->sortKeys + i; + + Assert(attNums[i] != 0); + Assert(sortOperators[i] != 0); + + sortKey->ssup_cxt = CurrentMemoryContext; + sortKey->ssup_collation = sortCollations[i]; + sortKey->ssup_nulls_first = nullsFirstFlags[i]; + sortKey->ssup_attno = attNums[i]; + /* Convey if abbreviation optimization is applicable in principle */ + sortKey->abbreviate = (i == 0 && base->haveDatum1); + + PrepareSortSupportFromOrderingOp(sortOperators[i], sortKey); + } + + /* + * The "onlyKey" optimization cannot be used with abbreviated keys, since + * tie-breaker comparisons may be required. Typically, the optimization + * is only of value to pass-by-value types anyway, whereas abbreviated + * keys are typically only of value to pass-by-reference types. + */ + if (nkeys == 1 && !base->sortKeys->abbrev_converter) + base->onlyKey = base->sortKeys; + + MemoryContextSwitchTo(oldcontext); + + return state; +} + +void +tuplemerge_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + MemoryContext oldcontext = MemoryContextSwitchTo(base->tuplecontext); + TupleDesc tupDesc = (TupleDesc) base->arg; + SortTuple stup; + MinimalTuple tuple; + HeapTupleData htup; + Size tuplen; + + /* copy the tuple into sort storage */ + tuple = ExecCopySlotMinimalTuple(slot); + stup.tuple = tuple; + /* set up first-column key value */ + htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; + htup.t_data = (HeapTupleHeader) ((char *) tuple - MINIMAL_TUPLE_OFFSET); + stup.datum1 = heap_getattr(&htup, + base->sortKeys[0].ssup_attno, + tupDesc, + &stup.isnull1); + + /* GetMemoryChunkSpace is not supported for bump contexts */ + if (TupleSortUseBumpTupleCxt(base->sortopt)) + tuplen = MAXALIGN(tuple->t_len); + else + tuplen = GetMemoryChunkSpace(tuple); + + tuplemerge_puttuple_common(state, &stup, tuplen); + + MemoryContextSwitchTo(oldcontext); +} + diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6192cc8d143..7c9efe77ab9 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -393,8 +393,16 @@ extern ExprState *ExecInitExprWithParams(Expr *node, ParamListInfo ext_params); extern ExprState *ExecInitQual(List *qual, PlanState *parent); extern ExprState *ExecInitCheck(List *qual, PlanState *parent); extern List *ExecInitExprList(List *nodes, PlanState *parent); + +/* + * Which strategy to use for aggregation/grouping + */ +#define GROUPING_STRATEGY_SORT 1 +#define GROUPING_STRATEGY_HASH (1 << 1) +#define GROUPING_STRATEGY_INDEX (1 << 2) + extern ExprState *ExecBuildAggTrans(AggState *aggstate, struct AggStatePerPhaseData *phase, - bool doSort, bool doHash, bool nullcheck); + int groupStrategy, bool nullcheck); extern ExprState *ExecBuildHash32FromAttrs(TupleDesc desc, const TupleTableSlotOps *ops, FmgrInfo *hashfunctions, diff --git a/src/include/executor/nodeAgg.h b/src/include/executor/nodeAgg.h index 6c4891bbaeb..8361d000878 100644 --- a/src/include/executor/nodeAgg.h +++ b/src/include/executor/nodeAgg.h @@ -321,6 +321,33 @@ typedef struct AggStatePerHashData Agg *aggnode; /* original Agg node, for numGroups etc. */ } AggStatePerHashData; +/* + * AggStatePerIndexData - per-index state + * + * Logic is the same as for AggStatePerHashData - one of these for each + * grouping set. + */ +typedef struct AggStatePerIndexData +{ + TupleIndex index; /* current in-memory index data */ + MemoryContext metacxt; /* memory context containing TupleIndex */ + MemoryContext tempctx; /* short-lived context */ + TupleTableSlot *indexslot; /* slot for loading index */ + int numCols; /* total number of columns in index tuple */ + int numKeyCols; /* number of key columns in index tuple */ + int largestGrpColIdx; /* largest col required for comparison */ + AttrNumber *idxKeyColIdxInput; /* key column indices in input slot */ + AttrNumber *idxKeyColIdxIndex; /* key column indices in index tuples */ + TupleIndexIteratorData iter; /* iterator state for index */ + Agg *aggnode; /* original Agg node, for numGroups etc. */ + + /* state used only for spill mode */ + AttrNumber *idxKeyColIdxTL; /* key column indices in target list */ + FmgrInfo *hashfunctions; /* tuple hashing function */ + ExprState *indexhashexpr; /* ExprState for hashing index datatype(s) */ + ExprContext *exprcontext; /* expression context */ + TupleTableSlot *mergeslot; /* slot for loading tuple during merge */ +} AggStatePerIndexData; extern AggState *ExecInitAgg(Agg *node, EState *estate, int eflags); extern void ExecEndAgg(AggState *node); @@ -328,9 +355,9 @@ extern void ExecReScanAgg(AggState *node); extern Size hash_agg_entry_size(int numTrans, Size tupleWidth, Size transitionSpace); -extern void hash_agg_set_limits(double hashentrysize, double input_groups, - int used_bits, Size *mem_limit, - uint64 *ngroups_limit, int *num_partitions); +extern void agg_set_limits(double hashentrysize, double input_groups, + int used_bits, Size *mem_limit, + uint64 *ngroups_limit, int *num_partitions); /* parallel instrumentation support */ extern void ExecAggEstimate(AggState *node, ParallelContext *pcxt); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 99ee472b51f..3bba2359e11 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2613,6 +2613,7 @@ typedef struct AggStatePerTransData *AggStatePerTrans; typedef struct AggStatePerGroupData *AggStatePerGroup; typedef struct AggStatePerPhaseData *AggStatePerPhase; typedef struct AggStatePerHashData *AggStatePerHash; +typedef struct AggStatePerIndexData *AggStatePerIndex; typedef struct AggState { @@ -2628,17 +2629,18 @@ typedef struct AggState AggStatePerAgg peragg; /* per-Aggref information */ AggStatePerTrans pertrans; /* per-Trans state information */ ExprContext *hashcontext; /* econtexts for long-lived data (hashtable) */ + ExprContext *indexcontext; /* econtexts for long-lived data (index) */ ExprContext **aggcontexts; /* econtexts for long-lived data (per GS) */ ExprContext *tmpcontext; /* econtext for input expressions */ -#define FIELDNO_AGGSTATE_CURAGGCONTEXT 14 +#define FIELDNO_AGGSTATE_CURAGGCONTEXT 15 ExprContext *curaggcontext; /* currently active aggcontext */ AggStatePerAgg curperagg; /* currently active aggregate, if any */ -#define FIELDNO_AGGSTATE_CURPERTRANS 16 +#define FIELDNO_AGGSTATE_CURPERTRANS 17 AggStatePerTrans curpertrans; /* currently active trans state, if any */ bool input_done; /* indicates end of input */ bool agg_done; /* indicates completion of Agg scan */ int projected_set; /* The last projected grouping set */ -#define FIELDNO_AGGSTATE_CURRENT_SET 20 +#define FIELDNO_AGGSTATE_CURRENT_SET 21 int current_set; /* The current grouping set being evaluated */ Bitmapset *grouped_cols; /* grouped cols in current projection */ List *all_grouped_cols; /* list of all grouped cols in DESC order */ @@ -2660,32 +2662,43 @@ typedef struct AggState int num_hashes; MemoryContext hash_metacxt; /* memory for hash table bucket array */ MemoryContext hash_tuplescxt; /* memory for hash table tuples */ - struct LogicalTapeSet *hash_tapeset; /* tape set for hash spill tapes */ - struct HashAggSpill *hash_spills; /* HashAggSpill for each grouping set, - * exists only during first pass */ - TupleTableSlot *hash_spill_rslot; /* for reading spill files */ - TupleTableSlot *hash_spill_wslot; /* for writing spill files */ - List *hash_batches; /* hash batches remaining to be processed */ - bool hash_ever_spilled; /* ever spilled during this execution? */ - bool hash_spill_mode; /* we hit a limit during the current batch - * and we must not create new groups */ - Size hash_mem_limit; /* limit before spilling hash table */ - uint64 hash_ngroups_limit; /* limit before spilling hash table */ - int hash_planned_partitions; /* number of partitions planned - * for first pass */ - double hashentrysize; /* estimate revised during execution */ - Size hash_mem_peak; /* peak hash table memory usage */ - uint64 hash_ngroups_current; /* number of groups currently in - * memory in all hash tables */ - uint64 hash_disk_used; /* kB of disk space used */ - int hash_batches_used; /* batches used during entire execution */ - AggStatePerHash perhash; /* array of per-hashtable data */ AggStatePerGroup *hash_pergroup; /* grouping set indexed array of * per-group pointers */ + /* Fields used for managing spill mode in hash and index aggs */ + struct LogicalTapeSet *spill_tapeset; /* tape set for hash spill tapes */ + struct HashAggSpill *spills; /* HashAggSpill for each grouping set, + * exists only during first pass */ + TupleTableSlot *spill_rslot; /* for reading spill files */ + TupleTableSlot *spill_wslot; /* for writing spill files */ + List *spill_batches; /* hash batches remaining to be processed */ + + bool spill_ever_happened; /* ever spilled during this execution? */ + bool spill_mode; /* we hit a limit during the current batch + * and we must not create new groups */ + Size spill_mem_limit; /* limit before spilling hash table or index */ + uint64 spill_ngroups_limit; /* limit before spilling hash table or index */ + int spill_planned_partitions; /* number of partitions planned + * for first pass */ + double hashentrysize; /* estimate revised during execution */ + Size spill_mem_peak; /* peak memory usage of hash table or index */ + uint64 spill_ngroups_current; /* number of groups currently in + * memory in all hash tables */ + uint64 spill_disk_used; /* kB of disk space used */ + int spill_batches_used; /* batches used during entire execution */ + + /* these fields are used in AGG_INDEXED mode: */ + AggStatePerIndex perindex; /* pointer to per-index state data */ + bool index_filled; /* index filled yet? */ + MemoryContext index_metacxt; /* memory for index structure */ + MemoryContext index_nodecxt; /* memory for index nodes */ + MemoryContext index_entrycxt; /* memory for index entries */ + Sort *index_sort; /* ordering information for index */ + Tuplesortstate *mergestate; /* state for merging projected tuples if + * spill occurred */ /* support for evaluation of agg input expressions: */ -#define FIELDNO_AGGSTATE_ALL_PERGROUPS 54 +#define FIELDNO_AGGSTATE_ALL_PERGROUPS 62 AggStatePerGroup *all_pergroups; /* array of first ->pergroups, than * ->hash_pergroup */ SharedAggInfo *shared_info; /* one entry per worker */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index fb3957e75e5..b0e2d781c01 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -365,6 +365,7 @@ typedef enum AggStrategy AGG_SORTED, /* grouped agg, input must be sorted */ AGG_HASHED, /* grouped agg, use internal hashtable */ AGG_MIXED, /* grouped agg, hash and sort both used */ + AGG_INDEX, /* grouped agg, build index for input */ } AggStrategy; /* diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index c4393a94321..b19dacf5de4 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1219,7 +1219,7 @@ typedef struct Agg /* grouping sets to use */ List *groupingSets; - /* chained Agg/Sort nodes */ + /* chained Agg/Sort nodes, for AGG_INDEX contains single Sort node */ List *chain; } Agg; diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index 0bf55902aa1..f372c3e7e0a 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -475,6 +475,21 @@ extern GinTuple *tuplesort_getgintuple(Tuplesortstate *state, Size *len, bool forward); extern bool tuplesort_getdatum(Tuplesortstate *state, bool forward, bool copy, Datum *val, bool *isNull, Datum *abbrev); - +/* +* Special state for merge mode. +*/ +extern Tuplesortstate *tuplemerge_begin_common(int workMem, + SortCoordinate coordinate); +extern Tuplesortstate *tuplemerge_begin_heap(TupleDesc tupDesc, + int nkeys, AttrNumber *attNums, + Oid *sortOperators, Oid *sortCollations, + bool *nullsFirstFlags, + int workMem, SortCoordinate coordinate); +extern void tuplemerge_start_run(Tuplesortstate *state); +extern void tuplemerge_end_run(Tuplesortstate *state); +extern void tuplemerge_puttuple_common(Tuplesortstate *state, SortTuple *tuple, + Size tuplen); +extern void tuplemerge_puttupleslot(Tuplesortstate *state, TupleTableSlot *slot); +extern void tuplemerge_performmerge(Tuplesortstate *state); #endif /* TUPLESORT_H */ -- 2.43.0