From a0dd541ad4e47735fc388d0c553e935f0956f254 Mon Sep 17 00:00:00 2001 From: Jehan-Guillaume de Rorthais Date: Mon, 27 Mar 2023 15:54:39 +0200 Subject: [PATCH] Allocate hash batches related BufFile in a dedicated context --- src/backend/executor/nodeHash.c | 39 +++++++++++++++++++++++++++-- src/backend/executor/nodeHashjoin.c | 14 ++++++++--- src/include/executor/hashjoin.h | 1 + 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 748c9b0024..3f1ae9755e 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, * * The hashtable control block is just palloc'd from the executor's * per-query memory context. Everything else should be kept inside the - * subsidiary hashCxt or batchCxt. + * subsidiary hashCxt, batchCxt or fileCxt. */ hashtable = palloc_object(HashJoinTableData); hashtable->nbuckets = nbuckets; @@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, "HashBatchContext", ALLOCSET_DEFAULT_SIZES); + hashtable->fileCxt = AllocSetContextCreate(CurrentMemoryContext, + "HashBatchFiles", + ALLOCSET_DEFAULT_SIZES); + /* Allocate data that will live for the life of the hashjoin */ oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); @@ -570,15 +574,21 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, if (nbatch > 1 && hashtable->parallel_state == NULL) { + MemoryContext oldctx; + /* * allocate and initialize the file arrays in hashCxt (not needed for * parallel case which uses shared tuplestores instead of raw files) */ + oldctx = MemoryContextSwitchTo(hashtable->fileCxt); + hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch); hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch); /* The files will not be opened until needed... */ /* ... but make sure we have temp tablespaces established for them */ PrepareTempTablespaces(); + + MemoryContextSwitchTo(oldctx); } MemoryContextSwitchTo(oldcxt); @@ -934,7 +944,7 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) hashtable, nbatch, hashtable->spaceUsed); #endif - oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + oldcxt = MemoryContextSwitchTo(hashtable->fileCxt); if (hashtable->innerBatchFile == NULL) { @@ -1020,12 +1030,19 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) } else { + MemoryContext oldctx; + /* dump it out */ Assert(batchno > curbatch); + + oldctx = MemoryContextSwitchTo(hashtable->fileCxt); + ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple), hashTuple->hashvalue, &hashtable->innerBatchFile[batchno]); + MemoryContextSwitchTo(oldctx); + hashtable->spaceUsed -= hashTupleSize; nfreed++; } @@ -1042,6 +1059,9 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable) oldchunks = nextchunk; } + if (hashtable->fileCxt->mem_allocated > hashtable->spaceAllowed) + elog(WARNING, "Growing number of hash batch is exhausting memory"); + #ifdef HJDEBUG printf("Hashjoin %p: freed %ld of %ld tuples, space now %zu\n", hashtable, nfreed, ninmemory, hashtable->spaceUsed); @@ -1677,13 +1697,20 @@ ExecHashTableInsert(HashJoinTable hashtable, } else { + MemoryContext oldctx; + /* * put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + oldctx = MemoryContextSwitchTo(hashtable->fileCxt); + ExecHashJoinSaveTuple(tuple, hashvalue, &hashtable->innerBatchFile[batchno]); + + MemoryContextSwitchTo(oldctx); } if (shouldFree) @@ -2532,10 +2559,18 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable) } else { + MemoryContext oldctx; + /* Put the tuple into a temp file for later batches */ Assert(batchno > hashtable->curbatch); + + oldctx = MemoryContextSwitchTo(hashtable->fileCxt); + ExecHashJoinSaveTuple(tuple, hashvalue, &hashtable->innerBatchFile[batchno]); + + MemoryContextSwitchTo(oldctx); + pfree(hashTuple); hashtable->spaceUsed -= tupleSize; hashtable->spaceUsedSkew -= tupleSize; diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index f189fb4d28..ba1f27c2c4 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -422,6 +422,7 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) if (batchno != hashtable->curbatch && node->hj_CurSkewBucketNo == INVALID_SKEW_BUCKET_NO) { + MemoryContext oldctx; bool shouldFree; MinimalTuple mintuple = ExecFetchSlotMinimalTuple(outerTupleSlot, &shouldFree); @@ -432,9 +433,14 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel) */ Assert(parallel_state == NULL); Assert(batchno > hashtable->curbatch); + + oldctx = MemoryContextSwitchTo(hashtable->fileCxt); + ExecHashJoinSaveTuple(mintuple, hashvalue, &hashtable->outerBatchFile[batchno]); + MemoryContextSwitchTo(oldctx); + if (shouldFree) heap_free_minimal_tuple(mintuple); @@ -1234,9 +1240,9 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) * The data recorded in the file for each tuple is its hash value, * then the tuple in MinimalTuple format. * - * Note: it is important always to call this in the regular executor - * context, not in a shorter-lived context; else the temp file buffers - * will get messed up. + * Note: it is important always to call this in the HashBatchFiles context, + * not in a shorter-lived context; else the temp file buffers will get messed + * up. */ void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, @@ -1246,6 +1252,8 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue, if (file == NULL) { + Assert(strcmp(CurrentMemoryContext->name, "HashBatchFiles") == 0); + /* First write to this batch file, so open it. */ file = BufFileCreateTemp(false); *fileptr = file; diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index acb7592ca0..d36beb7229 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -348,6 +348,7 @@ typedef struct HashJoinTableData MemoryContext hashCxt; /* context for whole-hash-join storage */ MemoryContext batchCxt; /* context for this-batch-only storage */ + MemoryContext fileCxt; /* context for the BufFile related storage */ /* used for dense allocation of tuples (into linked chunks) */ HashMemoryChunk chunks; /* one list for the whole batch */ -- 2.39.2