diff --git a/src/backend/utils/adt/numeric.c b/src/backend/utils/adt/numeric.c new file mode 100644 index d0f0923..f81bdd3 --- a/src/backend/utils/adt/numeric.c +++ b/src/backend/utils/adt/numeric.c @@ -101,6 +101,8 @@ typedef signed char NumericDigit; typedef int16 NumericDigit; #endif +#define NBASE_SQR (NBASE * NBASE) + /* * The Numeric type as stored on disk. * @@ -558,8 +560,9 @@ static void sub_var(const NumericVar *va static void mul_var(const NumericVar *var1, const NumericVar *var2, NumericVar *result, int rscale); -static void mul_var_short(const NumericVar *var1, const NumericVar *var2, - NumericVar *result); +static pg_noinline void mul_var_short(const NumericVar *var1, + const NumericVar *var2, + NumericVar *result); static void div_var(const NumericVar *var1, const NumericVar *var2, NumericVar *result, int rscale, bool round); @@ -8674,17 +8677,23 @@ mul_var(const NumericVar *var1, const Nu int rscale) { int res_ndigits; + int res_ndigitpairs; int res_sign; int res_weight; + int pair_offset; int maxdigits; - int *dig; - int carry; - int maxdig; - int newdig; + int maxdigitpairs; + uint64 *dig; + uint64 carry; + uint64 maxdig; + uint64 newdig; int var1ndigits; int var2ndigits; + int var1ndigitpairs; + int var2ndigitpairs; NumericDigit *var1digits; NumericDigit *var2digits; + uint32 *var2digitpairs; NumericDigit *res_digits; int i, i1, @@ -8729,86 +8738,139 @@ mul_var(const NumericVar *var1, const Nu return; } - /* Determine result sign and (maximum possible) weight */ + /* Determine result sign */ if (var1->sign == var2->sign) res_sign = NUMERIC_POS; else res_sign = NUMERIC_NEG; - res_weight = var1->weight + var2->weight + 2; /* - * Determine the number of result digits to compute. If the exact result - * would have more than rscale fractional digits, truncate the computation - * with MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that - * would only contribute to the right of that. (This will give the exact + * Determine the number of result digits to compute and the (maximum + * possible) result weight. If the exact result would have more than + * rscale fractional digits, truncate the computation with + * MUL_GUARD_DIGITS guard digits, i.e., ignore input digits that would + * only contribute to the right of that. (This will give the exact * rounded-to-rscale answer unless carries out of the ignored positions * would have propagated through more than MUL_GUARD_DIGITS digits.) * * Note: an exact computation could not produce more than var1ndigits + - * var2ndigits digits, but we allocate one extra output digit in case - * rscale-driven rounding produces a carry out of the highest exact digit. + * var2ndigits digits, but we allocate at least one extra output digit in + * case rscale-driven rounding produces a carry out of the highest exact + * digit. + * + * To speed up the computation, we process the digits of each input in + * pairs, converting them to base NBASE^2, and producing a base-NBASE^2 + * intermediate result. */ - res_ndigits = var1ndigits + var2ndigits + 1; + /* digit pairs in each input */ + var1ndigitpairs = (var1ndigits + 1) / 2; + var2ndigitpairs = (var2ndigits + 1) / 2; + + /* digits in exact result */ + res_ndigits = var1ndigits + var2ndigits; + + /* digit pairs in exact result with at least one extra output digit */ + res_ndigitpairs = res_ndigits / 2 + 1; + + /* pair offset to align output to end of dig[] */ + pair_offset = res_ndigitpairs - var1ndigitpairs - var2ndigitpairs + 1; + + /* maximum possible result weight */ + res_weight = var1->weight + var2->weight + 1 + 2 * res_ndigitpairs - + res_ndigits; + + /* truncate computation based on requested rscale */ maxdigits = res_weight + 1 + (rscale + DEC_DIGITS - 1) / DEC_DIGITS + MUL_GUARD_DIGITS; - res_ndigits = Min(res_ndigits, maxdigits); + maxdigitpairs = (maxdigits + 1) / 2; + res_ndigitpairs = Min(res_ndigitpairs, maxdigitpairs); + res_ndigits = 2 * res_ndigitpairs; - if (res_ndigits < 3) + /* + * In the computation below, digit pair i1 of var1 and digit pair i2 of + * var2 are multiplied and added to digit i1+i2+pair_offset of dig[]. Thus + * input digit pairs with index >= res_ndigitpairs - pair_offset don't + * contribute to the result, and can be ignored. + */ + if (res_ndigitpairs <= pair_offset) { /* All input digits will be ignored; so result is zero */ zero_var(result); result->dscale = rscale; return; } + var1ndigitpairs = Min(var1ndigitpairs, res_ndigitpairs - pair_offset); + var2ndigitpairs = Min(var2ndigitpairs, res_ndigitpairs - pair_offset); /* - * We do the arithmetic in an array "dig[]" of signed int's. Since - * INT_MAX is noticeably larger than NBASE*NBASE, this gives us headroom - * to avoid normalizing carries immediately. + * We do the arithmetic in an array "dig[]" of unsigned 64-bit integers. + * Since PG_UINT64_MAX is noticeably larger than NBASE^4, this gives us + * headroom to avoid normalizing carries immediately. * * maxdig tracks the maximum possible value of any dig[] entry; when this - * threatens to exceed INT_MAX, we take the time to propagate carries. - * Furthermore, we need to ensure that overflow doesn't occur during the - * carry propagation passes either. The carry values could be as much as - * INT_MAX/NBASE, so really we must normalize when digits threaten to - * exceed INT_MAX - INT_MAX/NBASE. + * threatens to exceed PG_UINT64_MAX, we take the time to propagate + * carries. Furthermore, we need to ensure that overflow doesn't occur + * during the carry propagation passes either. The carry values could be + * as much as PG_UINT64_MAX/NBASE^2, so really we must normalize when + * digits threaten to exceed PG_UINT64_MAX - PG_UINT64_MAX/NBASE^2. * - * To avoid overflow in maxdig itself, it actually represents the max - * possible value divided by NBASE-1, ie, at the top of the loop it is - * known that no dig[] entry exceeds maxdig * (NBASE-1). + * To avoid overflow in maxdig itself, it actually represents the maximum + * possible value divided by NBASE^2-1, i.e., at the top of the loop it is + * known that no dig[] entry exceeds maxdig * (NBASE^2-1). + * + * The conversion of var1 to base NBASE^2 is done on the fly, as each new + * digit is required. The digits of var2 are converted upfront, and + * stored at the end of dig[]. */ - dig = (int *) palloc0(res_ndigits * sizeof(int)); + dig = (uint64 *) palloc(res_ndigitpairs * sizeof(uint64) + + var2ndigitpairs * sizeof(uint32)); + + /* zero the result digits */ + MemSetAligned(dig, 0, res_ndigitpairs * sizeof(uint64)); maxdig = 0; + /* convert var2 to base NBASE^2, shifting up if length is odd */ + var2digitpairs = (uint32 *) (dig + res_ndigitpairs); + for (i1 = i2 = 0; i1 < var2ndigitpairs - (var2ndigits & 1); i1++, i2 += 2) + var2digitpairs[i1] = var2digits[i2] * NBASE + var2digits[i2 + 1]; + if ((var2ndigits & 1) != 0) + { + var2digitpairs[i1] = var2digits[i2] * NBASE; + if (i2 + 1 < var2ndigits) + var2digitpairs[i1] += var2digits[i2 + 1]; + } + /* - * The least significant digits of var1 should be ignored if they don't - * contribute directly to the first res_ndigits digits of the result that - * we are computing. - * - * Digit i1 of var1 and digit i2 of var2 are multiplied and added to digit - * i1+i2+2 of the accumulator array, so we need only consider digits of - * var1 for which i1 <= res_ndigits - 3. + * Compute the base-NBASE^2 result in dig[]. The adjustment made to + * var1ndigitpairs above ensures that this loop only considers var1 digits + * that actually contribute to the result. */ - for (i1 = Min(var1ndigits - 1, res_ndigits - 3); i1 >= 0; i1--) + for (i1 = 0; i1 < var1ndigitpairs; i1++) { - NumericDigit var1digit = var1digits[i1]; + uint32 var1digitpair; - if (var1digit == 0) + /* Next base-NBASE^2 digit from var1 */ + if (2 * i1 + 1 < var1ndigits) + var1digitpair = var1digits[2 * i1] * NBASE + var1digits[2 * i1 + 1]; + else + var1digitpair = var1digits[2 * i1] * NBASE; + + if (var1digitpair == 0) continue; /* Time to normalize? */ - maxdig += var1digit; - if (maxdig > (INT_MAX - INT_MAX / NBASE) / (NBASE - 1)) + maxdig += var1digitpair; + if (maxdig > (PG_UINT64_MAX - PG_UINT64_MAX / NBASE_SQR) / (NBASE_SQR - 1)) { - /* Yes, do it */ + /* Yes, do it (to base NBASE^2) */ carry = 0; - for (i = res_ndigits - 1; i >= 0; i--) + for (i = res_ndigitpairs - 1; i >= 0; i--) { newdig = dig[i] + carry; - if (newdig >= NBASE) + if (newdig >= NBASE_SQR) { - carry = newdig / NBASE; - newdig -= carry * NBASE; + carry = newdig / NBASE_SQR; + newdig -= carry * NBASE_SQR; } else carry = 0; @@ -8816,14 +8878,15 @@ mul_var(const NumericVar *var1, const Nu } Assert(carry == 0); /* Reset maxdig to indicate new worst-case */ - maxdig = 1 + var1digit; + maxdig = 1 + var1digitpair; } /* * Add the appropriate multiple of var2 into the accumulator. * - * As above, digits of var2 can be ignored if they don't contribute, - * so we only include digits for which i1+i2+2 < res_ndigits. + * This must only include digits pairs of var2 that contribute to the + * first res_ndigitpairs of the result, so we only include digit pairs + * for which i1+i2+pair_offset < res_ndigitpairs. * * This inner loop is the performance bottleneck for multiplication, * so we want to keep it simple enough so that it can be @@ -8833,42 +8896,46 @@ mul_var(const NumericVar *var1, const Nu * not matter. */ { - int i2limit = Min(var2ndigits, res_ndigits - i1 - 2); - int *dig_i1_2 = &dig[i1 + 2]; + int i2limit = Min(var2ndigitpairs, + res_ndigitpairs - i1 - pair_offset); + uint64 *dig_i1_off = &dig[i1 + pair_offset]; for (i2 = 0; i2 < i2limit; i2++) - dig_i1_2[i2] += var1digit * var2digits[i2]; + dig_i1_off[i2] += (uint64) var1digitpair * var2digitpairs[i2]; } } /* - * Now we do a final carry propagation pass to normalize the result, which - * we combine with storing the result digits into the output. Note that - * this is still done at full precision w/guard digits. + * Now we do a final carry propagation pass to normalize the base-NBASE^2 + * result, and convert it back to base NBASE, storing the result digits + * into the output. Note that this is still done at full precision w/guard + * digits. */ alloc_var(result, res_ndigits); res_digits = result->digits; carry = 0; - for (i = res_ndigits - 1; i >= 0; i--) + for (i1 = res_ndigitpairs - 1, i2 = res_ndigits - 1; i1 >= 0; i1--) { - newdig = dig[i] + carry; - if (newdig >= NBASE) + newdig = dig[i1] + carry; + if (newdig >= NBASE_SQR) { - carry = newdig / NBASE; - newdig -= carry * NBASE; + carry = newdig / NBASE_SQR; + newdig -= carry * NBASE_SQR; } else carry = 0; - res_digits[i] = newdig; + res_digits[i2--] = (NumericDigit) (newdig % NBASE); + res_digits[i2--] = (NumericDigit) (newdig / NBASE); } Assert(carry == 0); pfree(dig); /* - * Finally, round the result to the requested precision. + * Adjust the weight, if the inputs were shifted up during base + * conversion, and round the result to the requested precision. */ - result->weight = res_weight; + result->weight = res_weight - (var1ndigits & 1) - (var2ndigits & 1); result->sign = res_sign; /* Round to target rscale (and set result->dscale) */ @@ -8886,7 +8953,7 @@ mul_var(const NumericVar *var1, const Nu * has at least as many digits as var1, and the exact product var1 * var2 is * requested. */ -static void +static pg_noinline void mul_var_short(const NumericVar *var1, const NumericVar *var2, NumericVar *result) {