From eef1a00ebc208b8d6c6d02eb37325cd7602fa707 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 16 Jan 2024 18:34:09 +0100 Subject: [PATCH v11] nbtree: merge array scankeys's arrays more efficiently Instead of n * log(m), we use mergejoin to merge the arrays in O(n + m) time. We further use exponential search to improve mergejoin's O(n+m) complexity to O(log(n | m)) in cases where one array's data range is completely disjunct from the other. While technically this last case could be further improved to O(1), it'd be only a marginal speedup when compared to this one's. --- src/backend/access/nbtree/nbtutils.c | 207 ++++++++++++++++++++++++++- 1 file changed, 205 insertions(+), 2 deletions(-) diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index 78d16d3330..7013bf7315 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -48,6 +48,9 @@ static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, static int _bt_merge_arrays(IndexScanDesc scan, ScanKey skey, bool reverse, Datum *elems_orig, int nelems_orig, Datum *elems_next, int nelems_next); +static int _bt_merge_arrays_search_next(Datum *elems, int nelems, int start_index, + Datum *key, BTSortArrayContext *cxt, + int *compare_result); static int _bt_compare_array_elements(const void *a, const void *b, void *arg); static inline int32 _bt_compare_array_skey(FmgrInfo *orderproc, Datum tupdatum, bool tupnull, @@ -644,8 +647,9 @@ _bt_merge_arrays(IndexScanDesc scan, ScanKey skey, bool reverse, { BTScanOpaque so = (BTScanOpaque) scan->opaque; BTSortArrayContext cxt; - Datum *merged = palloc(sizeof(Datum) * Min(nelems_orig, nelems_next)); + Datum *merged PG_USED_FOR_ASSERTS_ONLY; int merged_nelems = 0; + int merged_nelems_check PG_USED_FOR_ASSERTS_ONLY = 0; /* * Incrementally copy the original array into a temp buffer, skipping over @@ -654,25 +658,224 @@ _bt_merge_arrays(IndexScanDesc scan, ScanKey skey, bool reverse, cxt.orderproc = &so->orderProcs[skey->sk_attno - 1]; cxt.collation = skey->sk_collation; cxt.reverse = reverse; + + /* + * When assertions are enabled, use binary searches to create an array of + * matches that we'll use to validate our merge join + exponential search + * algorithm below. + * + * Note that this scratch space is only used in assert-enabled builds; we + * write directly to elems_orig when we don't have assertions enabled, + * saving one palloc/pfree. + */ +#ifdef USE_ASSERT_CHECKING + merged = palloc(sizeof(Datum) * Min(nelems_orig, nelems_next)); + for (int i = 0; i < nelems_orig; i++) { Datum *elem = elems_orig + i; if (bsearch_arg(elem, elems_next, nelems_next, sizeof(Datum), _bt_compare_array_elements, &cxt)) - merged[merged_nelems++] = *elem; + merged[merged_nelems_check++] = *elem; + } +#endif + + for (int i = 0, j = 0; i < nelems_orig && j < nelems_next;) + { + Datum *orig = &elems_orig[i]; + Datum *next = &elems_next[j]; + int res; + + res = _bt_compare_array_elements(orig, next, &cxt); + + /* + * Ratchet each array forward until we find a match. + */ + do + { + if (res < 0) + { + int prev_i = i; + /* + * Find the next element in elems_orig that is >= next, + * storing the compare result in &res + */ + i = _bt_merge_arrays_search_next(elems_orig, nelems_orig, i, + next, &cxt, &res); + Assert(i > prev_i); + + /* + * i is now either out of bounds, or has progressed to the + * first offset that is >= next. + */ + if (i != nelems_orig) + { + orig = &elems_orig[i]; + Assert(_bt_compare_array_elements(orig, next, &cxt) == res); + Assert(res >= 0); + } + } + else if (res > 0) + { + int prev_j = j; + /* + * Find the next element in elems_next that is >= next, + * storing the compare result in &res + * + * Note that this does compare(array_elem, key), so the + * compare result in &res must be reversed before use in this + * case. + */ + j = _bt_merge_arrays_search_next(elems_next, nelems_next, j, + orig, &cxt, &res); + res = -res; + Assert(j > prev_j); + + /* + * j is now either out of bounds, or has progressed to the + * first offset that is >= orig. + */ + if (j != nelems_next) + { + next = &elems_next[j]; + Assert(_bt_compare_array_elements(orig, next, &cxt) == res); + Assert(res <= 0); + } + } + } while (res != 0 && i < nelems_orig && j < nelems_next); + + /* + * We are either at the end of one of the input arrays, or both of + * the current array indexes are equal in this array entry. + */ + if (res != 0) + { + Assert(i == nelems_orig || j == nelems_next); + } + else /* res == 0 */ + { +#ifdef USE_ASSERT_CHECKING + /* + * Make sure that the current index in elems_orig is not smaller + * than the number of merged elements: if that were the case, we'd + * have double-counted at least one element, which would break + * the assumption we use in non-assertion builds to directly write + * to elems_orig. + */ + Assert(merged_nelems <= i); + Assert(_bt_compare_array_elements(&merged[merged_nelems++], orig, &cxt) == 0); +#else + /* Move the element to the merged section, if needed */ + if (merged_nelems != i) + elems_orig[merged_nelems++] = *orig; +#endif + i++; + j++; + } } + Assert(merged_nelems == merged_nelems_check); + +#ifdef USE_ASSERT_CHECKING /* * Overwrite the original array with temp buffer so that we're only left * with intersecting array elements */ memcpy(elems_orig, merged, merged_nelems * sizeof(Datum)); pfree(merged); +#endif return merged_nelems; } +/* + * Find the next element in the array that is >= key, using cxt with + * _bt_compare_array_elements as sort operator. + * + * Will always return a value > start_index. + * + * Takes O(1) if the next element is a neighbour, up to worst case + * O(log(n)) for n remaining entries. + * + * The function assumes that the input array is sorted. + */ +static int +_bt_merge_arrays_search_next(Datum *elems, int nelems, int start_index, + Datum *key, BTSortArrayContext *cxt, + int *compare_result) +{ + int step = 1; + int min = start_index + 1, + max = start_index, + compare, + max_compare = -1; + + /* + * Exponential search forward to find the first element + * + * We use exponential search forward to make sure we only need to do one + * compare in many cases. + * + */ + for(;;) + { + max += step; + + if (max >= nelems) + { + max = nelems; + compare = -1; + break; + } + + compare = _bt_compare_array_elements(&elems[max], key, cxt); + + step = step * 2; + + if (compare < 0) + min = max + 1; + else + break; + } + + max_compare = compare; + + /* if we happened to land on an equal tuple we return early */ + if (compare == 0) + { + *compare_result = compare; + return max; + } + + /* Now do a binary search to get to the boundary position */ + while (max > min) + { + int mid = min + ((max - min) / 2); + + compare = _bt_compare_array_elements(&elems[mid], key, cxt); + + if (compare < 0) + { + min = mid + 1; + } + else if (compare > 0) + { + max = mid; + max_compare = compare; + } + else if (compare == 0) + { + max = mid; + max_compare = compare; + break; + } + } + + *compare_result = max_compare; + return max; +} + /* * qsort_arg comparator for sorting array elements */ -- 2.40.1