diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index d8e5d68..cb837a8 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -442,6 +442,7 @@ struct Tuplesortstate elog(ERROR, "unexpected end of data"); \ } while(0) +#define HEAP_ANCESTOR(slot, levels) ((((slot)+1)>>(levels))-1) static Tuplesortstate *tuplesort_begin_common(int workMem, bool randomAccess); static void puttuple_common(Tuplesortstate *state, SortTuple *tuple); @@ -2549,31 +2550,74 @@ tuplesort_heap_siftup(Tuplesortstate *state, bool checkIndex) SortTuple *memtuples = state->memtuples; SortTuple *tuple; int i, - n; + n, + levels = 0, + l; if (--state->memtupcount <= 0) return; CHECK_FOR_INTERRUPTS(); - n = state->memtupcount; - tuple = &memtuples[n]; /* tuple that must be reinserted */ - i = 0; /* i is where the "hole" is */ - for (;;) + tuple = &memtuples[state->memtupcount]; /* tuple that must be reinserted */ + n = (state->memtupcount - 1) >> 1; /* first tuple without two children */ + + /* Descend heap, following lesser child at each level. */ + i = 0; + while (i < n) { int j = 2 * i + 1; - if (j >= n) - break; - if (j + 1 < n && - HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) + if (HEAPCOMPARE(&memtuples[j], &memtuples[j + 1]) > 0) j++; - if (HEAPCOMPARE(tuple, &memtuples[j]) <= 0) - break; - memtuples[i] = memtuples[j]; i = j; + ++levels; + } + + /* + * If the heap is of even size, we might have landed on the tuple that has + * just a single child. In that case, we don't need to compare the children + * to determine which is smaller, but we do need to descend to the child. + */ + if (i == n && (state->memtupcount & 1) == 0) + { + i = 2 * i + 1; + ++levels; + } + Assert(i < state->memtupcount); + Assert(HEAP_ANCESTOR(i, levels) == 0); + + /* + * Search path of least children to find the insert position. + * + * From the heap property, we know that the path of least children is + * sorted; that is, memtuples[i] is greater than its parent, which in + * turn is greater than its own parent, and so on. We'd like to find the + * smallest number of levels we must travel up the heap to find a tuple + * less than or equal to the one we're reinserting; if there is no + * such shift, then we reinsert at the top of the heap. + * + * The element we're reinserting here came from the bottom of the heap, + * so it's likely to be large. Accordingly, we perform a linear search. + * A binary search would be better if the insertion position were just + * as likely to be at the top of the heap as at the bottom, but in reality + * the bottom is much more likely. + */ + for (l = 0; l < levels; ++l) + if (HEAPCOMPARE(tuple, &memtuples[HEAP_ANCESTOR(i, l)]) > 0) + break; + + /* + * Move the hole down the tree until it reaches the correct position, and + * then stick the re-inserted tuple into it. + */ + while (levels > l) + { + memtuples[HEAP_ANCESTOR(i, levels)] = + memtuples[HEAP_ANCESTOR(i, levels - 1)]; + --levels; } - memtuples[i] = *tuple; + memtuples[HEAP_ANCESTOR(i, l)] = *tuple; }