From eb8a463f9c952cb17a88d1666ab4dc2ccefa1b44 Mon Sep 17 00:00:00 2001 From: David Kimura Date: Wed, 29 Apr 2020 16:54:36 +0000 Subject: [PATCH v6 2/2] Implement fallback of batch 0 for serial adaptive hash join There is some fuzzyness around concerns of different functions, specifically ExecHashTableInsert() and ExecHashIncreaseNumBatches(). Existing model allows insert to succeed and then later adjusts the number of batches or fallback. But this doesn't address exceeding work_mem until after the fact. Instead this change makes a decision of whether to insert into hashtable of batch file when relocating tuples in between batches inside ExecHashIncreaseNumBatches(). --- src/backend/executor/nodeHash.c | 17 ++++++----------- src/backend/executor/nodeHashjoin.c | 24 ++++++++++++++++++++++++ 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 6ecbc76ab5..ca8d8f475a 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -925,6 +925,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) int childbatch_outgoing_tuples; int target_batch; FallbackBatchStats *fallback_batch_stats; + size_t currentBatchSize = 0; if (hashtable->hashloop_fallback && hashtable->hashloop_fallback[curbatch]) return; @@ -1029,7 +1030,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) ExecHashGetBucketAndBatch(hashtable, hashTuple->hashvalue, &bucketno, &batchno); - if (batchno == curbatch) + if (batchno == curbatch && (curbatch != 0 || currentBatchSize + hashTupleSize < hashtable->spaceAllowed)) { /* keep tuple in memory - copy it into the new chunk */ HashJoinTuple copyTuple; @@ -1041,11 +1042,12 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) copyTuple->next.unshared = hashtable->buckets.unshared[bucketno]; hashtable->buckets.unshared[bucketno] = copyTuple; curbatch_outgoing_tuples++; + currentBatchSize += hashTupleSize; } else { /* dump it out */ - Assert(batchno > curbatch); + Assert(batchno > curbatch || currentBatchSize + hashTupleSize >= hashtable->spaceAllowed); ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, &hashtable->innerBatchFile[batchno]); @@ -1081,13 +1083,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable, nfreed, ninmemory, hashtable->spaceUsed); #endif - /* - * For now we do not support fallback in batch 0 as it is a special case - * and assumed to fit in hashtable. - */ - if (curbatch == 0) - return; - /* * The same batch should not be marked to fall back more than once */ @@ -1097,9 +1092,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) if ((curbatch_outgoing_tuples / (float) ninmemory) >= 0.8) printf("curbatch %i targeted to fallback.", curbatch); #endif - if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && childbatch > 0) + if ((childbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION) target_batch = childbatch; - else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION && curbatch > 0) + else if ((curbatch_outgoing_tuples / (float) ninmemory) >= MAX_RELOCATION) target_batch = curbatch; else return; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 516067f176..735677ba81 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -507,6 +507,23 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) /* Loop around, staying in HJ_NEED_NEW_OUTER state */ continue; } + if (batchno == 0 && node->hj_HashTable->curstripe == 0 && IsHashloopFallback(hashtable)) + { + bool shouldFree; + MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, + &shouldFree); + + /* + * Need to save this outer tuple to a batch since batch 0 + * is fallback and we must later rewind. + */ + Assert(parallel_state == NULL); + ExecHashJoinSaveTuple(mintuple, hashvalue, + &hashtable->outerBatchFile[batchno]); + + if (shouldFree) + heap_free_minimal_tuple(mintuple); + } /* * While probing the phantom stripe, don't increment @@ -1255,6 +1272,13 @@ ExecHashJoinLoadStripe(HashJoinState *hjstate) (errcode_for_file_access(), errmsg("could not rewind hash-join temporary file: %m"))); } + if (hashtable->innerBatchFile && hashtable->innerBatchFile[curbatch] && hashtable->curbatch == 0 && hashtable->curstripe == 0) + { + if (BufFileSeek(hashtable->innerBatchFile[curbatch], 0, 0L, SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not rewind hash-join temporary file: %m"))); + } hashtable->curstripe++; -- 2.17.1