From f0a3bbed9c80ad304f6cea9ace33534be4f4c3cd Mon Sep 17 00:00:00 2001 From: David Kimura Date: Wed, 29 Apr 2020 16:54:36 +0000 Subject: [PATCH v6 2/2] Implement fallback of batch 0 for serial adaptive hash join There is some fuzzyness around concerns of different functions, specifically ExecHashTableInsert() and ExecHashIncreaseNumBatches(). Existing model allows insert to succeed and then later adjusts the number of batches or fallback. But this doesn't address exceeding work_mem until after the fact. Instead this change makes a decision of whether to insert into hashtable of batch file when relocating tuples in between batches inside ExecHashIncreaseNumBatches(). --- src/backend/executor/nodeHash.c | 43 +++++++++++++++++++++-------- src/backend/executor/nodeHashjoin.c | 17 ++++++++++++ 2 files changed, 48 insertions(+), 12 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ecbc76ab5..9340db9fb7 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -183,12 +183,36 @@ MultiExecPrivateHash(HashState *node) else { /* Not subject to skew optimization, so insert normally */ - ExecHashTableInsert(hashtable, slot, hashvalue); + int bucketno; + int batchno; + bool shouldFree; + MinimalTuple tuple = ExecFetchSlotMinimalTuple(slot, &shouldFree); + + ExecHashGetBucketAndBatch(hashtable, hashvalue, + &bucketno, &batchno); + if (hashtable->hashloop_fallback && hashtable->hashloop_fallback[0]) + ExecHashJoinSaveTuple(tuple, + hashvalue, + &hashtable->innerBatchFile[batchno]); + else + ExecHashTableInsert(hashtable, slot, hashvalue); + + if (shouldFree) + heap_free_minimal_tuple(tuple); + } hashtable->totalTuples += 1; } } + if (hashtable->innerBatchFile && hashtable->innerBatchFile[0]) + { + if (BufFileSeek(hashtable->innerBatchFile[0], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } + /* resize the hash table if needed (NTUP_PER_BUCKET exceeded) */ if (hashtable->nbuckets != hashtable->nbuckets_optimal) ExecHashIncreaseNumBuckets(hashtable); @@ -925,6 +949,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) int childbatch_outgoing_tuples; int target_batch; FallbackBatchStats *fallback_batch_stats; + size_t currentBatchSize = 0; if (hashtable->hashloop_fallback && hashtable->hashloop_fallback[curbatch]) return; @@ -1029,7 +1054,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, &bucketno, &batchno); - if (batchno == curbatch) + if (batchno == curbatch && (curbatch != 0 || currentBatchSize + hashTupleSize < hashtable->spaceAllowed)) { /* keep tuple in memory - copy it into the new chunk */ HashJoinTuple copyTuple; @@ -1041,11 +1066,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; hashtable->buckets.unshared[bucketno] = copyTuple; curbatch_outgoing_tuples++; + currentBatchSize += hashTupleSize; } else { /* dump it out */ - Assert(batchno > curbatch); + Assert(batchno > curbatch || currentBatchSize + hashTupleSize >= hashtable->spaceAllowed); ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, &hashtable->innerBatchFile[batchno]); @@ -1081,13 +1107,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable, nfreed, ninmemory, hashtable->spaceUsed); #endif - /* - * For now we do not support fallback in batch 0 as it is a special case - * and assumed to fit in hashtable. - */ - if (curbatch == 0) - return; - /* * The same batch should not be marked to fall back more than once */ @@ -1097,9 +1116,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if ((curbatch_outgoing_tuples / (float) ninmemory) >= 0.8) printf("curbatch %i targeted to fallback.", curbatch); #endif - if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && childbatch > 0) + if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION) target_batch = childbatch; - else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && curbatch > 0) + else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION) target_batch = curbatch; else return; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 516067f176..8f3f4d4b44 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -507,6 +507,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } + if (batchno == 0 && node->hj_HashTable->curstripe == 0 && IsHashloopFallback(hashtable)) + { + bool shouldFree; + MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, + &shouldFree); + + /* + * Need to save this outer tuple to a batch since batch 0 + * is fallback and we must later rewind. + */ + Assert(parallel_state == NULL); + ExecHashJoinSaveTuple(mintuple, hashvalue, + &hashtable->outerBatchFile[batchno]); + + if (shouldFree) + heap_free_minimal_tuple(mintuple); + } /* * While probing the phantom stripe, don't increment -- 2.17.0