From 44ab42cb839660b94a26eced5f5f9366a65c7289 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Sat, 11 Jan 2020 16:57:34 -0800 Subject: [PATCH v5 3/5] Address barrier wait deadlock hazard Previously, in the chunk phase machine in ExecParallelHashJoinNewChunk(), we reused one chunk barrier for all chunks and looped through the phases then set the phase back to the initial phase and explicitly jumped there. Now, we initialize an array of chunk barriers, one per chunk, and use a new chunk barrier for each chunk. After finishing probing a chunk, upon re-entering ExecParallelHashJoinNewChunk(), workers will wait on the chunk barrier for all participants to arrive. This is okay because the barrier is advanced to the final phase as part of this wait (per the comment in nodeHashJoin.c about deadlock risk with waiting on barriers after emitting tuples). The last worker to arrive will increment the chunk number. All workers detach from the chunk barrier they are attached to and select the next chunk barrier. The hashtable is now reset in the first phase of the chunk phase machine PHJ_CHUNK_ELECTING. Note that this will cause an unnecessary hashtable reset for the first chunk. The loading and probing phases of the chunk phase machine stay the same. If a worker joins in the PHJ_CHUNK_DONE phase, it will simply detach from the chunk barrier and move on to the next chunk barrier in the array of chunk barriers. In order to mitigate the other cause of deadlock hazard (workers wait on the batch barrier after emitting tuples), now, in ExecParallelHashJoinNewBatch(), if we are attached to a batch barrier and it is a fallback batch, all workers will detach from the batch barrier and then end their scan of that batch. The last worker to detach will combine the outer match status files, then it will detach from the batch, clean up the hashtable, and end its scan of the inner side. Then it will return and proceed to emit unmatched outer tuples. In PHJ_BATCH_ELECTING, the worker that ends up allocating the hashtable will also initialize the chunk barriers. Also, this commit moves combined_bitmap to batch from hjstate. It will be moved into the SharedBits store once that API is added. Co-authored-by: Jesse Zhang --- src/backend/executor/adaptiveHashjoin.c | 368 +++++++++++++++--------- src/backend/executor/nodeHash.c | 40 +-- src/backend/executor/nodeHashjoin.c | 116 +++++--- src/backend/postmaster/pgstat.c | 3 + src/include/executor/adaptiveHashjoin.h | 1 - src/include/executor/hashjoin.h | 15 +- src/include/executor/nodeHash.h | 1 + src/include/nodes/execnodes.h | 1 - src/include/pgstat.h | 1 + 9 files changed, 331 insertions(+), 215 deletions(-) diff --git a/src/backend/executor/adaptiveHashjoin.c b/src/backend/executor/adaptiveHashjoin.c index 64af2a24f3..20678ad9ff 100644 --- a/src/backend/executor/adaptiveHashjoin.c +++ b/src/backend/executor/adaptiveHashjoin.c @@ -24,7 +24,9 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) ParallelHashJoinBatch *phj_batch; SharedTuplestoreAccessor *outer_tuples; SharedTuplestoreAccessor *inner_tuples; + Barrier *barriers; Barrier *chunk_barrier; + Barrier *old_chunk_barrier; hashtable = hjstate->hj_HashTable; batchno = hashtable->curbatch; @@ -33,10 +35,13 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) inner_tuples = hashtable->batches[batchno].inner_tuples; /* - * This chunk_barrier is initialized in the ELECTING phase when this + * These chunk_barriers are initialized in the ELECTING phase when this * worker attached to the batch in ExecParallelHashJoinNewBatch() */ - chunk_barrier = &hashtable->batches[batchno].shared->chunk_barrier; + barriers = dsa_get_address(hashtable->area, hashtable->batches[batchno].shared->chunk_barriers); + LWLockAcquire(&phj_batch->lock, LW_SHARED); + old_chunk_barrier = &(barriers[phj_batch->current_chunk - 1]); + LWLockRelease(&phj_batch->lock); /* * If this worker just came from probing (from HJ_SCAN_BUCKET) we need to @@ -49,14 +54,21 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) * The current chunk number can't be incremented if *any* worker isn't * done yet (otherwise they might access the wrong data structure!) */ - if (BarrierArriveAndWait(chunk_barrier, + if (BarrierArriveAndWait(old_chunk_barrier, WAIT_EVENT_HASH_CHUNK_PROBING)) phj_batch->current_chunk++; - + BarrierDetach(old_chunk_barrier); /* Once the barrier is advanced we'll be in the DONE phase */ } - else - BarrierAttach(chunk_barrier); + /* TODO: definitely seems like a race condition around value of current_chunk */ + LWLockAcquire(&phj_batch->lock, LW_SHARED); + if (phj_batch->current_chunk > phj_batch->total_chunks) + { + LWLockRelease(&phj_batch->lock); + return false; + } + chunk_barrier = &(barriers[phj_batch->current_chunk - 1]); + LWLockRelease(&phj_batch->lock); /* * The outer side is exhausted and either 1) the current chunk of the @@ -64,105 +76,190 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) * chunk of the inner side is exhausted and it is time to advance the * batch */ - switch (BarrierPhase(chunk_barrier)) + + for (;;) { - /* - * TODO: remove this phase and coordinate access to hashtable - * above goto and after incrementing current_chunk - */ - case PHJ_CHUNK_ELECTING: - phj_chunk_electing: - BarrierArriveAndWait(chunk_barrier, - WAIT_EVENT_HASH_CHUNK_ELECTING); - /* Fall through. */ + switch (BarrierAttach(chunk_barrier)) + { + case PHJ_CHUNK_ELECTING: + if (BarrierArriveAndWait(chunk_barrier, + WAIT_EVENT_HASH_CHUNK_ELECTING)) + { - case PHJ_CHUNK_LOADING: - /* Start (or join in) loading the next chunk of inner tuples. */ - sts_begin_parallel_scan(inner_tuples); + sts_reinitialize(outer_tuples); - MinimalTuple tuple; - tupleMetadata metadata; + /* + * reset inner's hashtable and recycle the existing bucket + * array. + * TODO: this will unnecessarily reset the hashtable for the + * first stripe. fix this? + */ + dsa_pointer_atomic *buckets = (dsa_pointer_atomic *) + dsa_get_address(hashtable->area, phj_batch->buckets); - while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) - { - if (metadata.chunk != phj_batch->current_chunk) - continue; + for (size_t i = 0; i < hashtable->nbuckets; ++i) + dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); - ExecForceStoreMinimalTuple(tuple, - hjstate->hj_HashTupleSlot, - false); + /* + * TODO: this will unfortunately rescan all inner tuples + * in the batch for each chunk + */ - ExecParallelHashTableInsertCurrentBatch( - hashtable, - hjstate->hj_HashTupleSlot, - metadata.hashvalue); - } - sts_end_parallel_scan(inner_tuples); - BarrierArriveAndWait(chunk_barrier, - WAIT_EVENT_HASH_CHUNK_LOADING); - /* Fall through. */ + /* + * should be able to save the block in the file which + * starts the next chunk instead + */ + sts_reinitialize(inner_tuples); + } + /* Fall through. */ + case PHJ_CHUNK_RESETTING: + BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_RESETTING); + case PHJ_CHUNK_LOADING: + /* Start (or join in) loading the next chunk of inner tuples. */ + sts_begin_parallel_scan(inner_tuples); + + MinimalTuple tuple; + tupleMetadata metadata; + + while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) + { + if (metadata.chunk != phj_batch->current_chunk) + continue; + + ExecForceStoreMinimalTuple(tuple, + hjstate->hj_HashTupleSlot, + false); + + ExecParallelHashTableInsertCurrentBatch( + hashtable, + hjstate->hj_HashTupleSlot, + metadata.hashvalue); + } + sts_end_parallel_scan(inner_tuples); + BarrierArriveAndWait(chunk_barrier, + WAIT_EVENT_HASH_CHUNK_LOADING); + /* Fall through. */ + + case PHJ_CHUNK_PROBING: + /* + * TODO: Is it a race condition where a worker enters here + * and starts probing before the hashtable is fully loaded? + */ + sts_begin_parallel_scan(outer_tuples); + return true; - case PHJ_CHUNK_PROBING: - sts_begin_parallel_scan(outer_tuples); - return true; + case PHJ_CHUNK_DONE: + LWLockAcquire(&phj_batch->lock, LW_SHARED); + if (phj_batch->current_chunk > phj_batch->total_chunks) + { + LWLockRelease(&phj_batch->lock); + return false; + } + LWLockRelease(&phj_batch->lock); + /* TODO: exercise this somehow (ideally, in a test) */ + BarrierDetach(chunk_barrier); + if (chunk_barrier < barriers + phj_batch->total_chunks) + { + ++chunk_barrier; + continue; + } + else + return false; - case PHJ_CHUNK_DONE: + default: + elog(ERROR, "unexpected chunk phase %d. pid %i. batch %i.", + BarrierPhase(chunk_barrier), MyProcPid, batchno); + } + } - BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_DONE); + return false; +} - if (phj_batch->current_chunk > phj_batch->total_chunks) - { - BarrierDetach(chunk_barrier); - return false; - } - /* - * Otherwise it is time for the next chunk. One worker should - * reset the hashtable - */ - if (BarrierArriveExplicitAndWait(chunk_barrier, PHJ_CHUNK_ELECTING, WAIT_EVENT_HASH_ADVANCE_CHUNK)) - { - /* - * rewind/reset outer tuplestore and rewind outer match status - * files - */ - sts_reinitialize(outer_tuples); +static void +ExecHashTableLoopDetachBatchForChosen(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; - /* - * reset inner's hashtable and recycle the existing bucket - * array. - */ - dsa_pointer_atomic *buckets = (dsa_pointer_atomic *) - dsa_get_address(hashtable->area, phj_batch->buckets); + /* Make sure any temporary files are closed. */ + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); - for (size_t i = 0; i < hashtable->nbuckets; ++i) - dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); + /* Detach from the batch we were last working on. */ - /* - * TODO: this will unfortunately rescan all inner tuples in - * the batch for each chunk - */ + /* + * Technically we shouldn't access the barrier because we're no longer + * attached, but since there is no way it's moving after this point it + * seems safe to make the following assertion. + */ + Assert(BarrierPhase(&batch->batch_barrier) == PHJ_BATCH_DONE); - /* - * should be able to save the block in the file which starts - * the next chunk instead - */ - sts_reinitialize(inner_tuples); - } - goto phj_chunk_electing; + /* Free shared chunks and buckets. */ + while (DsaPointerIsValid(batch->chunks)) + { + HashMemoryChunk chunk = + dsa_get_address(hashtable->area, batch->chunks); + dsa_pointer next = chunk->next.shared; - case PHJ_CHUNK_FINAL: - BarrierDetach(chunk_barrier); - return false; + dsa_free(hashtable->area, batch->chunks); + batch->chunks = next; + } + if (DsaPointerIsValid(batch->buckets)) + { + dsa_free(hashtable->area, batch->buckets); + batch->buckets = InvalidDsaPointer; + } - default: - elog(ERROR, "unexpected chunk phase %d. pid %i. batch %i.", - BarrierPhase(chunk_barrier), MyProcPid, batchno); - } + /* + * Free chunk barrier + */ + /* TODO: why is this NULL check needed? */ + if (DsaPointerIsValid(batch->chunk_barriers)) + { + dsa_free(hashtable->area, batch->chunk_barriers); + batch->chunk_barriers = InvalidDsaPointer; + } - return false; + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + + } } +static void +ExecHashTableLoopDetachBatchForOthers(HashJoinTable hashtable) +{ + if (hashtable->parallel_state != NULL && + hashtable->curbatch >= 0) + { + int curbatch = hashtable->curbatch; + ParallelHashJoinBatch *batch = hashtable->batches[curbatch].shared; + + sts_end_parallel_scan(hashtable->batches[curbatch].inner_tuples); + sts_end_parallel_scan(hashtable->batches[curbatch].outer_tuples); + + /* + * Track the largest batch we've been attached to. Though each + * backend might see a different subset of batches, explain.c will + * scan the results from all backends to find the largest value. + */ + hashtable->spacePeak = + Max(hashtable->spacePeak, + batch->size + sizeof(dsa_pointer_atomic) * hashtable->nbuckets); + + /* Remember that we are not attached to a batch. */ + hashtable->curbatch = -1; + } +} /* * Choose a batch to work on, and attach to it. Returns true if successful, @@ -183,18 +280,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) if (hashtable->batches == NULL) return false; - /* - * For hashloop fallback only Only the elected worker who was chosen to - * combine the outer match status bitmaps should reach here. This worker - * must do some final cleanup and then detach from the batch - */ - if (hjstate->combined_bitmap != NULL) - { - BufFileClose(hjstate->combined_bitmap); - hjstate->combined_bitmap = NULL; - hashtable->batches[hashtable->curbatch].done = true; - ExecHashTableDetachBatch(hashtable); - } /* * If we were already attached to a batch, remember not to bother checking @@ -211,41 +296,62 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) ParallelHashJoinBatchAccessor *accessor = hashtable->batches + hashtable->curbatch; ParallelHashJoinBatch *batch = accessor->shared; - /* - * End the parallel scan on the outer tuples before we arrive at the - * next barrier so that the last worker to arrive at that barrier can - * reinitialize the SharedTuplestore for another parallel scan. - */ - if (!batch->parallel_hashloop_fallback) - BarrierArriveAndWait(&batch->batch_barrier, - WAIT_EVENT_HASH_BATCH_PROBING); + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableDetachBatch(hashtable); + } + + else if (accessor->combined_bitmap != NULL) + { + BufFileClose(accessor->combined_bitmap); + accessor->combined_bitmap = NULL; + accessor->done = true; + + /* + * though we have already de-commissioned the shared area of the + * hashtable the curbatch is backend-local and should still be + * valid + */ + sts_end_parallel_scan(hashtable->batches[hashtable->curbatch].outer_tuples); + hashtable->curbatch = -1; + } + else { sts_close_outer_match_status_file(accessor->outer_tuples); /* * If all workers (including this one) have finished probing the - * batch, one worker is elected to Combine all the outer match - * status files from the workers who were attached to this batch - * Loop through the outer match status files from all workers that - * were attached to this batch Combine them into one bitmap Use - * the bitmap, loop through the outer batch file again, and emit - * unmatched tuples + * batch, one worker is elected to Loop through the outer match + * status files from all workers that were attached to this batch + * Combine them into one bitmap Use the bitmap, loop through the + * outer batch file again, and emit unmatched tuples All workers + * will detach from the batch barrier and the last worker will + * clean up the hashtable. All workers except the last worker will + * end their scans of the outer and inner side The last worker + * will end its scan of the inner side */ - - if (BarrierArriveAndWait(&batch->batch_barrier, - WAIT_EVENT_HASH_BATCH_PROBING)) + if (BarrierArriveAndDetach(&batch->batch_barrier)) { - hjstate->combined_bitmap = sts_combine_outer_match_status_files(accessor->outer_tuples); + /* + * For hashloop fallback only Only the elected worker who was + * chosen to combine the outer match status bitmaps should + * reach here. This worker must do some final cleanup and then + * detach from the batch + */ + accessor->combined_bitmap = sts_combine_outer_match_status_files(accessor->outer_tuples); + ExecHashTableLoopDetachBatchForChosen(hashtable); hjstate->last_worker = true; return true; } + /* the elected combining worker should not reach here */ + else + { + hashtable->batches[hashtable->curbatch].done = true; + ExecHashTableLoopDetachBatchForOthers(hashtable); + } } - - /* the elected combining worker should not reach here */ - hashtable->batches[hashtable->curbatch].done = true; - ExecHashTableDetachBatch(hashtable); } /* @@ -272,11 +378,16 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) WAIT_EVENT_HASH_BATCH_ELECTING)) { ExecParallelHashTableAlloc(hashtable, batchno); - Barrier *chunk_barrier = - &hashtable->batches[batchno].shared->chunk_barrier; + ParallelHashJoinBatch *phj_batch = hashtable->batches[batchno].shared; + + phj_batch->chunk_barriers = dsa_allocate(hashtable->area, phj_batch->total_chunks * sizeof(Barrier)); + Barrier *barriers = dsa_get_address(hashtable->area, phj_batch->chunk_barriers); - BarrierInit(chunk_barrier, 0); - hashtable->batches[batchno].shared->current_chunk = 1; + for (int i = 0; i < phj_batch->total_chunks; i++) + { + BarrierInit(&(barriers[i]), 0); + } + phj_batch->current_chunk = 1; } /* Fall through. */ @@ -314,17 +425,6 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) return true; - case PHJ_BATCH_OUTER_MATCH_STATUS_PROCESSING: - - /* - * The batch isn't done but this worker can't contribute - * anything to it so it might as well be done from this - * worker's perspective. (Only one worker can do work in - * this phase). - */ - - /* Fall through. */ - case PHJ_BATCH_DONE: /* diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index cb2f95ac0a..86f3aaff82 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1225,7 +1225,19 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) ExecParallelHashEnsureBatchAccessors(hashtable); ExecParallelHashTableSetCurrentBatch(hashtable, 0); - LWLockAcquire(&pstate->lock, LW_EXCLUSIVE); + /* + * Currently adaptive hashjoin keeps track of the global + * (HashJoin global) number of increases to nbatches. + * If the number of increases exceeds a fixed amount, + * any subsequent batch exceeding the space_allowed + * (one which triggers disabling growth in nbatches) + * afterward will have parallel_hashloop_fallback set + * + * Note that stripe numbers were already being added to tuples + * going into the STS, so, even batches that do not fall back + * might have more than one stripe. + */ + LWLockAcquire(&pstate->lock, LW_SHARED); if (pstate->batch_increases >= 2) excessive_batch_num_increases = true; LWLockRelease(&pstate->lock); @@ -1242,33 +1254,10 @@ ExecParallelHashIncreaseNumBatches(HashJoinTable hashtable) space_exhausted = true; - /* - * only once we've increased the number of batches - * overall many times should we start setting - */ - - /* - * some batches to use the fallback strategy. Those - * that are still too big will have this option set - */ - /* * we better not repartition again (growth should be - * disabled), so that we don't overwrite this value - */ - - /* - * this tells us if we have set fallback to true or - * not and how many chunks -- useful for seeing how - * many chunks - */ - - /* - * we can get to before setting it to true (since we - * still mark chunks (work_mem sized chunks)) in - * batches even if we don't fall back + * disabled), so that we don't overwrite this flag */ - /* same for below but opposite */ if (excessive_batch_num_increases == true) batch->parallel_hashloop_fallback = true; @@ -3173,6 +3162,7 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable) accessor->shared = shared; accessor->preallocated = 0; accessor->done = false; + accessor->combined_bitmap = NULL; accessor->inner_tuples = sts_attach(ParallelHashJoinBatchInner(shared), ParallelWorkerNumber + 1, diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 91c0170e40..ca09012d17 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -905,9 +905,10 @@ ExecParallelHashJoin(PlanState *pstate) */ ParallelHashJoinBatch *phj_batch = node->hj_HashTable->batches[node->hj_HashTable->curbatch].shared; - + LWLockAcquire(&phj_batch->lock, LW_SHARED); if (!phj_batch->parallel_hashloop_fallback || phj_batch->current_chunk == 1) node->hj_MatchedOuter = false; + LWLockRelease(&phj_batch->lock); node->hj_JoinState = HJ_SCAN_BUCKET; /* FALL THRU */ @@ -1029,13 +1030,12 @@ ExecParallelHashJoin(PlanState *pstate) case HJ_NEED_NEW_BATCH: - phj_batch = hashtable->batches[hashtable->curbatch].shared; - /* * Try to advance to next batch. Done if there are no more. */ if (!ExecParallelHashJoinNewBatch(node)) return NULL; /* end of parallel-aware join */ + phj_batch = hashtable->batches[hashtable->curbatch].shared; if (node->last_worker && HJ_FILL_OUTER(node) && phj_batch->parallel_hashloop_fallback) @@ -1073,6 +1073,11 @@ ExecParallelHashJoin(PlanState *pstate) case HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER_INIT: outer_acc = hashtable->batches[hashtable->curbatch].outer_tuples; + /* + * This should only ever be called by one worker. + * It is not protected by a barrier explicitly here. However, + * more than one worker should never enter this state for a batch + */ sts_reinitialize(outer_acc); sts_begin_parallel_scan(outer_acc); @@ -1080,57 +1085,75 @@ ExecParallelHashJoin(PlanState *pstate) /* FALL THRU */ case HJ_ADAPTIVE_EMIT_UNMATCHED_OUTER: - - Assert(node->combined_bitmap != NULL); - - outer_acc = node->hj_HashTable->batches[node->hj_HashTable->curbatch].outer_tuples; - - MinimalTuple tuple; - - do { - tupleMetadata metadata; + ParallelHashJoinBatchAccessor *batch_accessor = + &node->hj_HashTable->batches[node->hj_HashTable->curbatch]; - if ((tuple = sts_parallel_scan_next(outer_acc, &metadata)) == NULL) - break; + Assert(batch_accessor->combined_bitmap != NULL); - uint32 bytenum = metadata.tupleid / 8; - unsigned char bit = metadata.tupleid % 8; - unsigned char byte_to_check = 0; - - /* seek to byte to check */ - if (BufFileSeek(node->combined_bitmap, 0, bytenum, SEEK_SET)) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not rewind shared outer temporary file: %m"))); - /* read byte containing ntuple bit */ - if (BufFileRead(node->combined_bitmap, &byte_to_check, 1) == 0) - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read byte in outer match status bitmap: %m."))); - /* if bit is set */ - bool match = ((byte_to_check) >> bit) & 1; - - if (!match) - break; - } while (1); + /* + * TODO: there should be a way to know the current batch + * for the purposes of getting the outer tuplestore without + * needing curbatch from the hashtable so we can detach + * from the batch (ExecHashTableDetachBatch) + */ + outer_acc = + batch_accessor->outer_tuples; + MinimalTuple tuple; - if (tuple == NULL) - { - sts_end_parallel_scan(outer_acc); - node->hj_JoinState = HJ_NEED_NEW_BATCH; - break; - } + do + { + tupleMetadata metadata; + + if ((tuple = + sts_parallel_scan_next(outer_acc, &metadata)) == + NULL) + break; + + uint32 bytenum = metadata.tupleid / 8; + unsigned char bit = metadata.tupleid % 8; + unsigned char byte_to_check = 0; + + /* seek to byte to check */ + if (BufFileSeek(batch_accessor->combined_bitmap, + 0, + bytenum, + SEEK_SET)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not rewind shared outer temporary file: %m"))); + /* read byte containing ntuple bit */ + if (BufFileRead(batch_accessor->combined_bitmap, &byte_to_check, 1) == + 0) + ereport(ERROR, + (errcode_for_file_access(), + errmsg( + "could not read byte in outer match status bitmap: %m."))); + /* if bit is set */ + bool match = ((byte_to_check) >> bit) & 1; + + if (!match) + break; + } + while (1); - /* Emit the unmatched tuple */ - ExecForceStoreMinimalTuple(tuple, - econtext->ecxt_outertuple, - false); - econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; + if (tuple == NULL) + { + sts_end_parallel_scan(outer_acc); + node->hj_JoinState = HJ_NEED_NEW_BATCH; + break; + } - return ExecProject(node->js.ps.ps_ProjInfo); + /* Emit the unmatched tuple */ + ExecForceStoreMinimalTuple(tuple, + econtext->ecxt_outertuple, + false); + econtext->ecxt_innertuple = node->hj_NullInnerTupleSlot; + return ExecProject(node->js.ps.ps_ProjInfo); + } default: elog(ERROR, "unrecognized hashjoin state: %d", (int) node->hj_JoinState); @@ -1182,7 +1205,6 @@ ExecInitHashJoin(HashJoin *node, EState *estate, int eflags) hjstate->hj_InnerExhausted = false; hjstate->last_worker = false; - hjstate->combined_bitmap = NULL; /* * Miscellaneous initialization diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index eeddf0009c..8aced92b31 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3812,6 +3812,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_HASH_CHUNK_ELECTING: event_name = "Hash/Chunk/Electing"; break; + case WAIT_EVENT_HASH_CHUNK_RESETTING: + event_name = "Hash/Chunk/Resetting"; + break; case WAIT_EVENT_HASH_CHUNK_LOADING: event_name = "Hash/Chunk/Loading"; break; diff --git a/src/include/executor/adaptiveHashjoin.h b/src/include/executor/adaptiveHashjoin.h index 030a04c5c0..135aed0b19 100644 --- a/src/include/executor/adaptiveHashjoin.h +++ b/src/include/executor/adaptiveHashjoin.h @@ -5,5 +5,4 @@ extern bool ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing); extern bool ExecParallelHashJoinNewBatch(HashJoinState *hjstate); - #endif /* ADAPTIVE_HASHJOIN_H */ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index e5a00f84e3..b2cc12dc19 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -166,7 +166,7 @@ typedef struct ParallelHashJoinBatch int total_chunks; int current_chunk; size_t estimated_chunk_size; - Barrier chunk_barrier; + dsa_pointer chunk_barriers; LWLock lock; dsa_pointer chunks; /* chunks of tuples loaded */ @@ -221,6 +221,7 @@ typedef struct ParallelHashJoinBatchAccessor bool at_least_one_chunk; /* has this backend allocated a chunk? */ bool done; /* flag to remember that a batch is done */ + BufFile *combined_bitmap; /* for Adaptive Hashjoin only */ SharedTuplestoreAccessor *inner_tuples; SharedTuplestoreAccessor *outer_tuples; } ParallelHashJoinBatchAccessor; @@ -282,14 +283,14 @@ typedef struct ParallelHashJoinState #define PHJ_BATCH_ELECTING 0 #define PHJ_BATCH_ALLOCATING 1 #define PHJ_BATCH_CHUNKING 2 -#define PHJ_BATCH_OUTER_MATCH_STATUS_PROCESSING 3 -#define PHJ_BATCH_DONE 4 +#define PHJ_BATCH_DONE 3 #define PHJ_CHUNK_ELECTING 0 -#define PHJ_CHUNK_LOADING 1 -#define PHJ_CHUNK_PROBING 2 -#define PHJ_CHUNK_DONE 3 -#define PHJ_CHUNK_FINAL 4 +#define PHJ_CHUNK_RESETTING 1 +#define PHJ_CHUNK_LOADING 2 +#define PHJ_CHUNK_PROBING 3 +#define PHJ_CHUNK_DONE 4 +#define PHJ_CHUNK_FINAL 5 /* The phases of batch growth while hashing, for grow_batches_barrier. */ #define PHJ_GROW_BATCHES_ELECTING 0 diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index dfc221e6a1..f6d5b47708 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -31,6 +31,7 @@ extern void ExecParallelHashTableAlloc(HashJoinTable hashtable, extern void ExecHashTableDestroy(HashJoinTable hashtable); extern void ExecHashTableDetach(HashJoinTable hashtable); extern void ExecHashTableDetachBatch(HashJoinTable hashtable); + extern void ExecParallelHashTableSetCurrentBatch(HashJoinTable hashtable, int batchno); diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 93fe6dddb2..32f0dd8cfe 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1968,7 +1968,6 @@ typedef struct HashJoinState /* parallel hashloop fallback outer side */ bool last_worker; - BufFile *combined_bitmap; } HashJoinState; diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 340086a7e7..dd2e8bd655 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -843,6 +843,7 @@ typedef enum WAIT_EVENT_HASH_GROW_BUCKETS_ELECTING, WAIT_EVENT_HASH_GROW_BUCKETS_REINSERTING, WAIT_EVENT_HASH_CHUNK_ELECTING, + WAIT_EVENT_HASH_CHUNK_RESETTING, WAIT_EVENT_HASH_CHUNK_LOADING, WAIT_EVENT_HASH_CHUNK_PROBING, WAIT_EVENT_HASH_CHUNK_DONE, -- 2.20.1 (Apple Git-117)