From 26fcccf150b0a7c1f210b331a47fba5ae302130d Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Sat, 11 Jan 2020 16:57:34 -0800 Subject: [PATCH v4 3/4] Address barrier wait deadlock hazard Previously, in the chunk phase machine in ExecParallelHashJoinNewChunk(), we reused one chunk barrier for all chunks and looped through the phases then set the phase back to the initial phase and explicitly jumped there. Now, we initialize an array of chunk barriers, one per chunk, and use a new chunk barrier for each chunk. After finishing probing a chunk, upon re-entering ExecParallelHashJoinNewChunk(), workers will wait on the chunk barrier for all participants to arrive. This is okay because the barrier is advanced to the final phase as part of this wait (per the comment in nodeHashJoin.c about deadlock risk with waiting on barriers after emitting tuples). The last worker to arrive will increment the chunk number. All workers detach from the chunk barrier they are attached to and select the next chunk barrier. The hashtable is now reset in the first phase of the chunk phase machine PHJ_CHUNK_ELECTING. Note that this will cause an unnecessary hashtable reset for the first chunk. The loading and probing phases of the chunk phase machine stay the same. If a worker joins in the PHJ_CHUNK_DONE phase, it will simply detach from the chunk barrier and move on to the next chunk barrier in the array of chunk barriers. In order to mitigate the other cause of deadlock hazard (workers wait on the batch barrier after emitting tuples), now, in ExecParallelHashJoinNewBatch(), if we are attached to a batch barrier and it is a fallback batch, all workers will detach from the batch barrier and then end their scan of that batch. The last worker to detach will combine the outer match status files, then it will detach from the batch, clean up the hashtable, and end its scan of the inner side. Then it will return and proceed to emit unmatched outer tuples. In PHJ_BATCH_ELECTING, the worker that ends up allocating the hashtable will also initialize the chunk barriers. Also, this commit moves combined_bitmap to batch from hjstate. It will be moved into the SharedBits store once that API is added. Co-authored-by: Jesse Zhang --- src/backend/executor/adaptiveHashjoin.c | 357 +++++++++++++++--------- src/backend/executor/nodeHash.c | 1 + src/backend/executor/nodeHashjoin.c | 107 ++++--- src/backend/postmaster/pgstat.c | 3 + src/include/executor/adaptiveHashjoin.h | 1 - src/include/executor/hashjoin.h | 15 +- src/include/executor/nodeHash.h | 1 + src/include/nodes/execnodes.h | 1 - src/include/pgstat.h | 1 + 9 files changed, 301 insertions(+), 186 deletions(-) diff --git a/src/backend/executor/adaptiveHashjoin.c b/src/backend/executor/adaptiveHashjoin.c index 64af2a24f346..45846a076916 100644 --- a/src/backend/executor/adaptiveHashjoin.c +++ b/src/backend/executor/adaptiveHashjoin.c @@ -24,7 +24,9 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) ParallelHashJoinBatch *phj_batch; SharedTuplestoreAccessor *outer_tuples; SharedTuplestoreAccessor *inner_tuples; + Barrier *barriers; Barrier *chunk_barrier; + Barrier *old_chunk_barrier; hashtable = hjstate->hj_HashTable; batchno = hashtable->curbatch; @@ -33,10 +35,11 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) inner_tuples = hashtable->batches[batchno].inner_tuples; /* - * This chunk_barrier is initialized in the ELECTING phase when this + * These chunk_barriers are initialized in the ELECTING phase when this * worker attached to the batch in ExecParallelHashJoinNewBatch() */ - chunk_barrier = &hashtable->batches[batchno].shared->chunk_barrier; + barriers = dsa_get_address(hashtable->area, hashtable->batches[batchno].shared->chunk_barriers); + old_chunk_barrier = &(barriers[phj_batch->current_chunk - 1]); /* * If this worker just came from probing (from HJ_SCAN_BUCKET) we need to @@ -49,14 +52,16 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) * The current chunk number can't be incremented if *any* worker isn't * done yet (otherwise they might access the wrong data structure!) */ - if (BarrierArriveAndWait(chunk_barrier, + if (BarrierArriveAndWait(old_chunk_barrier, WAIT_EVENT_HASH_CHUNK_PROBING)) phj_batch->current_chunk++; - + BarrierDetach(old_chunk_barrier); /* Once the barrier is advanced we'll be in the DONE phase */ } - else - BarrierAttach(chunk_barrier); + if (phj_batch->current_chunk > phj_batch->total_chunks) + return false; + chunk_barrier = &(barriers[phj_batch->current_chunk - 1]); + /* is this a race condition ? */ /* * The outer side is exhausted and either 1) the current chunk of the @@ -64,105 +69,186 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) * chunk of the inner side is exhausted and it is time to advance the * batch */ - switch (BarrierPhase(chunk_barrier)) + + for (;;) { - /* - * TODO: remove this phase and coordinate access to hashtable - * above goto and after incrementing current_chunk - */ - case PHJ_CHUNK_ELECTING: - phj_chunk_electing: - BarrierArriveAndWait(chunk_barrier, - WAIT_EVENT_HASH_CHUNK_ELECTING); - /* Fall through. */ + switch (BarrierAttach(chunk_barrier)) + { + case PHJ_CHUNK_ELECTING: + if (BarrierArriveAndWait(chunk_barrier, + WAIT_EVENT_HASH_CHUNK_ELECTING)) + { + /* + * TODO: this will unnecessarily reset the hashtable for + * the first chunk. fix this? + */ + /* + * rewind/reset outer tuplestore and rewind outer match + * status files + */ + sts_reinitialize(outer_tuples); - case PHJ_CHUNK_LOADING: - /* Start (or join in) loading the next chunk of inner tuples. */ - sts_begin_parallel_scan(inner_tuples); + /* + * reset inner's hashtable and recycle the existing bucket + * array. + */ + dsa_pointer_atomic *buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, phj_batch->buckets); - MinimalTuple tuple; - tupleMetadata metadata; + for (size_t i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); - while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) - { - if (metadata.chunk != phj_batch->current_chunk) - continue; + /* + * TODO: this will unfortunately rescan all inner tuples + * in the batch for each chunk + */ - ExecForceStoreMinimalTuple(tuple, - hjstate->hj_HashTupleSlot, - false); + /* + * should be able to save the block in the file which + * starts the next chunk instead + */ + sts_reinitialize(inner_tuples); + } + /* Fall through. */ + case PHJ_CHUNK_RESETTING: + BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_RESETTING); + case PHJ_CHUNK_LOADING: + /* Start (or join in) loading the next chunk of inner tuples. */ + sts_begin_parallel_scan(inner_tuples); + + MinimalTuple tuple; + tupleMetadata metadata; + + while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) + { + if (metadata.chunk != phj_batch->current_chunk) + continue; + + ExecForceStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + + ExecParallelHashTableInsertCurrentBatch( + hashtable, + hjstate->hj_HashTupleSlot, + metadata.hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(chunk_barrier, + WAIT_EVENT_HASH_CHUNK_LOADING); + /* Fall through. */ + + case PHJ_CHUNK_PROBING: + sts_begin_parallel_scan(outer_tuples); + return true; - ExecParallelHashTableInsertCurrentBatch( - hashtable, - hjstate->hj_HashTupleSlot, - metadata.hashvalue); - } - sts_end_parallel_scan(inner_tuples); - BarrierArriveAndWait(chunk_barrier, - WAIT_EVENT_HASH_CHUNK_LOADING); - /* Fall through. */ + case PHJ_CHUNK_DONE: + if (phj_batch->current_chunk > phj_batch->total_chunks) + return false; + /* TODO: exercise this somehow (ideally, in a test) */ + BarrierDetach(chunk_barrier); + if (chunk_barrier < barriers + phj_batch->total_chunks) + { + ++chunk_barrier; + continue; + } + else + return false; - case PHJ_CHUNK_PROBING: - sts_begin_parallel_scan(outer_tuples); - return true; + default: + elog(ERROR, "unexpected chunk phase %d. pid %i. batch %i.", + BarrierPhase(chunk_barrier), MyProcPid, batchno); + } + } - case PHJ_CHUNK_DONE: + return false; +} - BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_DONE); - if (phj_batch->current_chunk > phj_batch->total_chunks) - { - BarrierDetach(chunk_barrier); - return false; - } +static void +ExecHashTableLoopDetachBatchForChosen(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; - /* - * Otherwise it is time for the next chunk. One worker should - * reset the hashtable - */ - if (BarrierArriveExplicitAndWait(chunk_barrier, PHJ_CHUNK_ELECTING, WAIT_EVENT_HASH_ADVANCE_CHUNK)) - { - /* - * rewind/reset outer tuplestore and rewind outer match status - * files - */ - sts_reinitialize(outer_tuples); + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); - /* - * reset inner's hashtable and recycle the existing bucket - * array. - */ - dsa_pointer_atomic *buckets = (dsa_pointer_atomic *) - dsa_get_address(hashtable->area, phj_batch->buckets); + /* Detach from the batch we were last working on. */ - for (size_t i = 0; i < hashtable->nbuckets; ++i) - dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); + /* + * Technically we shouldn't access the barrier because we're no longer + * attached, but since there is no way it's moving after this point it + * seems safe to make the following assertion. + */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* - * TODO: this will unfortunately rescan all inner tuples in - * the batch for each chunk - */ + /* Free shared chunks and buckets. */ + while (DsaPointerIsValid(batch->chunks)) + { + HashMemoryChunk chunk = + dsa_get_address(hashtable->area, batch->chunks); + dsa_pointer next = chunk->next.shared; - /* - * should be able to save the block in the file which starts - * the next chunk instead - */ - sts_reinitialize(inner_tuples); - } - goto phj_chunk_electing; + dsa_free(hashtable->area, batch->chunks); + batch->chunks = next; + } + if (DsaPointerIsValid(batch->buckets)) + { + dsa_free(hashtable->area, batch->buckets); + batch->buckets = InvalidDsaPointer; + } - case PHJ_CHUNK_FINAL: - BarrierDetach(chunk_barrier); - return false; + /* + * Free chunk barrier + */ + /* TODO: why is this NULL check needed? */ + if (DsaPointerIsValid(batch->chunk_barriers)) + { + dsa_free(hashtable->area, batch->chunk_barriers); + batch->chunk_barriers = InvalidDsaPointer; + } - default: - elog(ERROR, "unexpected chunk phase %d. pid %i. batch %i.", - BarrierPhase(chunk_barrier), MyProcPid, batchno); - } + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); - return false; + } } +static void +ExecHashTableLoopDetachBatchForOthers(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + + /* Remember that we are not attached to a batch. */ + hashtable->curbatch = -1; + } +} /* * Choose a batch to work on, and attach to it. Returns true if successful, @@ -183,18 +269,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) if (hashtable->batches == NULL) return false; - /* - * For hashloop fallback only Only the elected worker who was chosen to - * combine the outer match status bitmaps should reach here. This worker - * must do some final cleanup and then detach from the batch - */ - if (hjstate->combined_bitmap != NULL) - { - BufFileClose(hjstate->combined_bitmap); - hjstate->combined_bitmap = NULL; - hashtable->batches[hashtable->curbatch].done = true; - ExecHashTableDetachBatch(hashtable); - } /* * If we were already attached to a batch, remember not to bother checking @@ -211,41 +285,62 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ParallelHashJoinBatchAccessor *accessor = hashtable->batches + hashtable->curbatch; ParallelHashJoinBatch *batch = accessor->shared; - /* - * End the parallel scan on the outer tuples before we arrive at the - * next barrier so that the last worker to arrive at that barrier can - * reinitialize the SharedTuplestore for another parallel scan. - */ - if (!batch->parallel_hashloop_fallback) - BarrierArriveAndWait(&batch->batch_barrier, - WAIT_EVENT_HASH_BATCH_PROBING); + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + else if (accessor->combined_bitmap != NULL) + { + BufFileClose(accessor->combined_bitmap); + accessor->combined_bitmap = NULL; + accessor->done = true; + + /* + * though we have already de-commissioned the shared area of the + * hashtable the curbatch is backend-local and should still be + * valid + */ + sts_end_parallel_scan(hashtable->batches[hashtable->curbatch].outer_tuples); + hashtable->curbatch = -1; + } + else { sts_close_outer_match_status_file(accessor->outer_tuples); /* * If all workers (including this one) have finished probing the - * batch, one worker is elected to Combine all the outer match - * status files from the workers who were attached to this batch - * Loop through the outer match status files from all workers that - * were attached to this batch Combine them into one bitmap Use - * the bitmap, loop through the outer batch file again, and emit - * unmatched tuples + * batch, one worker is elected to Loop through the outer match + * status files from all workers that were attached to this batch + * Combine them into one bitmap Use the bitmap, loop through the + * outer batch file again, and emit unmatched tuples All workers + * will detach from the batch barrier and the last worker will + * clean up the hashtable. All workers except the last worker will + * end their scans of the outer and inner side The last worker + * will end its scan of the inner side */ - - if (BarrierArriveAndWait(&batch->batch_barrier, - WAIT_EVENT_HASH_BATCH_PROBING)) + if (BarrierArriveAndDetach(&batch->batch_barrier)) { - hjstate->combined_bitmap = sts_combine_outer_match_status_files(accessor->outer_tuples); + /* + * For hashloop fallback only Only the elected worker who was + * chosen to combine the outer match status bitmaps should + * reach here. This worker must do some final cleanup and then + * detach from the batch + */ + accessor->combined_bitmap = sts_combine_outer_match_status_files(accessor->outer_tuples); + ExecHashTableLoopDetachBatchForChosen(hashtable); hjstate->last_worker = true; return true; } + /* the elected combining worker should not reach here */ + else + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableLoopDetachBatchForOthers(hashtable); + } } - - /* the elected combining worker should not reach here */ - hashtable->batches[hashtable->curbatch].done = true; - ExecHashTableDetachBatch(hashtable); } /* @@ -272,11 +367,16 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) WAIT_EVENT_HASH_BATCH_ELECTING)) { ExecParallelHashTableAlloc(hashtable, batchno); - Barrier *chunk_barrier = - &hashtable->batches[batchno].shared->chunk_barrier; + ParallelHashJoinBatch *phj_batch = hashtable->batches[batchno].shared; + + phj_batch->chunk_barriers = dsa_allocate(hashtable->area, phj_batch->total_chunks * sizeof(Barrier)); + Barrier *barriers = dsa_get_address(hashtable->area, phj_batch->chunk_barriers); - BarrierInit(chunk_barrier, 0); - hashtable->batches[batchno].shared->current_chunk = 1; + for (int i = 0; i < phj_batch->total_chunks; i++) + { + BarrierInit(&(barriers[i]), 0); + } + phj_batch->current_chunk = 1; } /* Fall through. */ @@ -314,17 +414,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) return true; - case PHJ_BATCH_OUTER_MATCH_STATUS_PROCESSING: - - /* - * The batch isn't done but this worker can't contribute - * anything to it so it might as well be done from this - * worker's perspective. (Only one worker can do work in - * this phase). - */ - - /* Fall through. */ - case PHJ_BATCH_DONE: /* diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index cb2f95ac0a76..afdc31a3b30c 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -3173,6 +3173,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->combined_bitmap = NULL; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 6a8efc0765a4..a454cba54543 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -1068,57 +1068,79 @@ ExecParallelHashJoin(PlanState *pstate) /* FALL THRU */ case HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER: + { + ParallelHashJoinBatchAccessor *batch_accessor = + &node->hj_HashTable->batches[node->hj_HashTable->curbatch]; - Assert(node->combined_bitmap != NULL); - - outer_acc = node->hj_HashTable->batches[node->hj_HashTable->curbatch].outer_tuples; + Assert(batch_accessor->combined_bitmap != NULL); - MinimalTuple tuple; + /* + * TODO: there should be a way to know the current batch + * for the purposes of getting + */ - do - { - tupleMetadata metadata; + /* + * the outer tuplestore without needing curbatch from the + * hashtable so we can detach + */ + /* from the batch (ExecHashTableDetachBatch) */ + outer_acc = + batch_accessor->outer_tuples; + MinimalTuple tuple; - if ((tuple = sts_parallel_scan_next(outer_acc, &metadata)) == NULL) - break; + do + { + tupleMetadata metadata; + + if ((tuple = + sts_parallel_scan_next(outer_acc, &metadata)) == + NULL) + break; + + uint32 bytenum = metadata.tupleid / 8; + unsigned char bit = metadata.tupleid % 8; + unsigned char byte_to_check = 0; + + /* seek to byte to check */ + if (BufFileSeek(batch_accessor->combined_bitmap, + 0, + bytenum, + SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not rewind shared outer temporary file: %m"))); + /* read byte containing ntuple bit */ + if (BufFileRead(batch_accessor->combined_bitmap, &byte_to_check, 1) == + 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not read byte in outer match status bitmap: %m."))); + /* if bit is set */ + bool match = ((byte_to_check) >> bit) & 1; + + if (!match) + break; + } + while (1); - uint32 bytenum = metadata.tupleid / 8; - unsigned char bit = metadata.tupleid % 8; - unsigned char byte_to_check = 0; - - /* seek to byte to check */ - if (BufFileSeek(node->combined_bitmap, 0, bytenum, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind shared outer temporary file: %m"))); - /* read byte containing ntuple bit */ - if (BufFileRead(node->combined_bitmap, &byte_to_check, 1) == 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read byte in outer match status bitmap: %m."))); - /* if bit is set */ - bool match = ((byte_to_check) >> bit) & 1; - - if (!match) + if (tuple == NULL) + { + sts_end_parallel_scan(outer_acc); + node->hj_JoinState = HJ_NEED_NEW_BATCH; break; - } while (1); - - if (tuple == NULL) - { - sts_end_parallel_scan(outer_acc); - node->hj_JoinState = HJ_NEED_NEW_BATCH; - break; - } - - /* Emit the unmatched tuple */ - ExecForceStoreMinimalTuple(tuple, - econtext->ecxt_outertuple, - false); - econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; + } - return ExecProject(node->js.ps.ps_ProjInfo); + /* Emit the unmatched tuple */ + ExecForceStoreMinimalTuple(tuple, + econtext->ecxt_outertuple, + false); + econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; + return ExecProject(node->js.ps.ps_ProjInfo); + } default: elog(ERROR, "unrecognized hashjoin state: %d", (int) node->hj_JoinState); @@ -1170,7 +1192,6 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate->hj_InnerExhausted = false; hjstate->last_worker = false; - hjstate->combined_bitmap = NULL; /* * Miscellaneous initialization diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index e582365e8409..887e3fa75022 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3812,6 +3812,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_CHUNK_ELECTING: event_name = "Hash/Chunk/Electing"; break; + case WAIT_EVENT_HASH_CHUNK_RESETTING: + event_name = "Hash/Chunk/Resetting"; + break; case WAIT_EVENT_HASH_CHUNK_LOADING: event_name = "Hash/Chunk/Loading"; break; diff --git a/src/include/executor/adaptiveHashjoin.h b/src/include/executor/adaptiveHashjoin.h index 030a04c5c005..135aed0b199c 100644 --- a/src/include/executor/adaptiveHashjoin.h +++ b/src/include/executor/adaptiveHashjoin.h @@ -5,5 +5,4 @@ extern bool ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing); extern bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); - #endif /* ADAPTIVE_HASHJOIN_H */ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index e5a00f84e321..b2cc12dc19be 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -166,7 +166,7 @@ typedef struct ParallelHashJoinBatch int total_chunks; int current_chunk; size_t estimated_chunk_size; - Barrier chunk_barrier; + dsa_pointer chunk_barriers; LWLock lock; dsa_pointer chunks; /* chunks of tuples loaded */ @@ -221,6 +221,7 @@ typedef struct ParallelHashJoinBatchAccessor bool at_least_one_chunk; /* has this backend allocated a chunk? */ bool done; /* flag to remember that a batch is done */ + BufFile *combined_bitmap; /* for Adaptive Hashjoin only */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; } ParallelHashJoinBatchAccessor; @@ -282,14 +283,14 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ELECTING 0 #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_CHUNKING 2 -#define PHJ_BATCH_OUTER_MATCH_STATUS_PROCESSING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_DONE 3 #define PHJ_CHUNK_ELECTING 0 -#define PHJ_CHUNK_LOADING 1 -#define PHJ_CHUNK_PROBING 2 -#define PHJ_CHUNK_DONE 3 -#define PHJ_CHUNK_FINAL 4 +#define PHJ_CHUNK_RESETTING 1 +#define PHJ_CHUNK_LOADING 2 +#define PHJ_CHUNK_PROBING 3 +#define PHJ_CHUNK_DONE 4 +#define PHJ_CHUNK_FINAL 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index dfc221e6a111..f6d5b477085e 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -31,6 +31,7 @@ extern void ExecParallelHashTableAlloc(HashJoinTable hashtable, extern void ExecHashTableDestroy(HashJoinTable hashtable); extern void ExecHashTableDetach(HashJoinTable hashtable); extern void ExecHashTableDetachBatch(HashJoinTable hashtable); + extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index b4f5f0357cb7..21e682334b15 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1967,7 +1967,6 @@ typedef struct HashJoinState /* parallel hashloop fallback outer side */ bool last_worker; - BufFile *combined_bitmap; } HashJoinState; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 340086a7e77c..dd2e8bd655d5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -843,6 +843,7 @@ typedef enum WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING, WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING, WAIT_EVENT_HASH_CHUNK_ELECTING, + WAIT_EVENT_HASH_CHUNK_RESETTING, WAIT_EVENT_HASH_CHUNK_LOADING, WAIT_EVENT_HASH_CHUNK_PROBING, WAIT_EVENT_HASH_CHUNK_DONE, -- 2.25.0