From 737317370de0b41883551bf1d470f5d647d6117b Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 7 Jan 2020 16:28:32 -0800 Subject: [PATCH v4 2/4] Fixup tupleMetadata struct issues Remove __attribute__((packed)) from tupleMetadata. It is not needed since I am using sizeof(struct tupleMetadata). Change tupleMetadata members to include a union with an anonymous union containing tupleid/chunk number. tupleMetadata's tupleid member will be the tupleid in the outer side and the chunk number in the inner side. Use a union for this since they will be different types. Also, fix the signedness and type issues in code using it. For now, this uses a 32bit int for tuples as I use an atomic and 64bit atomic operations are not supported on all architecture/OS combinations. It remains a TODO to make this variable backend local and combine it to reduce the amount of synchronization needed. Additionally, the tupleid/chunk number member should not be included for non-fallback batches, as it bloats the tuplestore. Also, this patch contains assorted updates to variable names/TODOs. --- src/backend/executor/adaptiveHashjoin.c | 10 +++---- src/backend/executor/nodeHash.c | 30 +++++++++++++++------ src/backend/executor/nodeHashjoin.c | 25 ++++++++++------- src/backend/utils/sort/sharedtuplestore.c | 33 ++++++++++++----------- src/include/executor/hashjoin.h | 4 +-- src/include/utils/sharedtuplestore.h | 16 +++++------ 6 files changed, 70 insertions(+), 48 deletions(-) diff --git a/src/backend/executor/adaptiveHashjoin.c b/src/backend/executor/adaptiveHashjoin.c index dff5b38d38f8..64af2a24f346 100644 --- a/src/backend/executor/adaptiveHashjoin.c +++ b/src/backend/executor/adaptiveHashjoin.c @@ -51,7 +51,7 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) */ if (BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_PROBING)) - phj_batch->current_chunk_num++; + phj_batch->current_chunk++; /* Once the barrier is advanced we'll be in the DONE phase */ } @@ -68,7 +68,7 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) { /* * TODO: remove this phase and coordinate access to hashtable - * above goto and after incrementing current_chunk_num + * above goto and after incrementing current_chunk */ case PHJ_CHUNK_ELECTING: phj_chunk_electing: @@ -85,7 +85,7 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) { - if (metadata.tupleid != phj_batch->current_chunk_num) + if (metadata.chunk != phj_batch->current_chunk) continue; ExecForceStoreMinimalTuple(tuple, @@ -110,7 +110,7 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_DONE); - if (phj_batch->current_chunk_num > phj_batch->total_num_chunks) + if (phj_batch->current_chunk > phj_batch->total_chunks) { BarrierDetach(chunk_barrier); return false; @@ -276,7 +276,7 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) &hashtable->batches[batchno].shared->chunk_barrier; BarrierInit(chunk_barrier, 0); - hashtable->batches[batchno].shared->current_chunk_num = 1; + hashtable->batches[batchno].shared->current_chunk = 1; } /* Fall through. */ diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index c5420b169e6c..cb2f95ac0a76 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -1362,7 +1362,7 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable) /* TODO: should I check batch estimated size here at all? */ if (phj_batch->parallel_hashloop_fallback == true && (phj_batch->estimated_chunk_size + tuple_size > hashtable->parallel_state->space_allowed)) { - phj_batch->total_num_chunks++; + phj_batch->total_chunks++; phj_batch->estimated_chunk_size = tuple_size; } else @@ -1371,10 +1371,15 @@ ExecParallelHashRepartitionFirst(HashJoinTable hashtable) tupleMetadata metadata; metadata.hashvalue = hashTuple->hashvalue; - metadata.tupleid = phj_batch->total_num_chunks; + metadata.chunk = phj_batch->total_chunks; LWLockRelease(&phj_batch->lock); hashtable->batches[batchno].estimated_size += tuple_size; + + /* + * TODO: only put the chunk num if it is a fallback batch + * (avoid bloating the metadata written to the file) + */ sts_puttuple(hashtable->batches[batchno].inner_tuples, &metadata, tuple); } @@ -1451,14 +1456,19 @@ ExecParallelHashRepartitionRest(HashJoinTable hashtable) /* TODO: should I check batch estimated size here at all? */ if (phj_batch->parallel_hashloop_fallback == true && (phj_batch->estimated_chunk_size + tuple_size > pstate->space_allowed)) { - phj_batch->total_num_chunks++; + phj_batch->total_chunks++; phj_batch->estimated_chunk_size = tuple_size; } else phj_batch->estimated_chunk_size += tuple_size; - metadata.tupleid = phj_batch->total_num_chunks; + metadata.chunk = phj_batch->total_chunks; LWLockRelease(&phj_batch->lock); /* Store the tuple its new batch. */ + + /* + * TODO: only put the chunk num if it is a fallback batch (avoid + * bloating the metadata written to the file) + */ sts_puttuple(hashtable->batches[batchno].inner_tuples, &metadata, tuple); @@ -1821,7 +1831,7 @@ retry: */ if (phj_batch->parallel_hashloop_fallback == true && (phj_batch->estimated_chunk_size + tuple_size > pstate->space_allowed)) { - phj_batch->total_num_chunks++; + phj_batch->total_chunks++; phj_batch->estimated_chunk_size = tuple_size; } else @@ -1830,9 +1840,13 @@ retry: tupleMetadata metadata; metadata.hashvalue = hashvalue; - metadata.tupleid = phj_batch->total_num_chunks; + metadata.chunk = phj_batch->total_chunks; LWLockRelease(&phj_batch->lock); + /* + * TODO: only put the chunk num if it is a fallback batch (avoid + * bloating the metadata written to the file) + */ sts_puttuple(hashtable->batches[batchno].inner_tuples, &metadata, tuple); } @@ -3043,8 +3057,8 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch) shared->parallel_hashloop_fallback = false; LWLockInitialize(&shared->lock, LWTRANCHE_PARALLEL_HASH_JOIN_BATCH); - shared->current_chunk_num = 0; - shared->total_num_chunks = 1; + shared->current_chunk = 0; + shared->total_chunks = 1; shared->estimated_chunk_size = 0; /* diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 39a03000f8da..6a8efc0765a4 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -435,9 +435,8 @@ ExecHashJoin(PlanState *pstate) { /* * The current outer tuple has run out of matches, so - * check whether to emit a dummy outer-join tuple. - * Whether we emit one or not, the next state is - * NEED_NEW_OUTER. + * check whether to emit a dummy outer-join tuple. Whether + * we emit one or not, the next state is NEED_NEW_OUTER. */ node->hj_JoinState = HJ_NEED_NEW_OUTER; if (!node->hashloop_fallback || node->hj_HashTable->curbatch == 0) @@ -902,7 +901,7 @@ ExecParallelHashJoin(PlanState *pstate) ParallelHashJoinBatch *phj_batch = node->hj_HashTable->batches[node->hj_HashTable->curbatch].shared; - if (!phj_batch->parallel_hashloop_fallback || phj_batch->current_chunk_num == 1) + if (!phj_batch->parallel_hashloop_fallback || phj_batch->current_chunk == 1) node->hj_MatchedOuter = false; node->hj_JoinState = HJ_SCAN_BUCKET; @@ -919,9 +918,8 @@ ExecParallelHashJoin(PlanState *pstate) { /* * The current outer tuple has run out of matches, so - * check whether to emit a dummy outer-join tuple. - * Whether we emit one or not, the next state is - * NEED_NEW_OUTER. + * check whether to emit a dummy outer-join tuple. Whether + * we emit one or not, the next state is NEED_NEW_OUTER. */ node->hj_JoinState = HJ_NEED_NEW_OUTER; if (!phj_batch->parallel_hashloop_fallback) @@ -1084,7 +1082,7 @@ ExecParallelHashJoin(PlanState *pstate) if ((tuple = sts_parallel_scan_next(outer_acc, &metadata)) == NULL) break; - int bytenum = metadata.tupleid / 8; + uint32 bytenum = metadata.tupleid / 8; unsigned char bit = metadata.tupleid % 8; unsigned char byte_to_check = 0; @@ -1477,7 +1475,7 @@ ExecParallelHashJoinOuterGetTuple(PlanState *outerNode, MinimalTuple tuple; tupleMetadata metadata; - int tupleid; + uint32 tupleid; tuple = sts_parallel_scan_next(hashtable->batches[curbatch].outer_tuples, &metadata); @@ -1894,7 +1892,16 @@ ExecParallelHashJoinPartitionOuter(HashJoinState *hjstate) metadata.hashvalue = hashvalue; SharedTuplestoreAccessor *accessor = hashtable->batches[batchno].outer_tuples; + /* + * TODO: add a comment that this means the order is not + * deterministic so don't count on it + */ metadata.tupleid = sts_increment_tuplenum(accessor); + + /* + * TODO: only add the tupleid when it is a fallback batch to avoid + * bloating of the sharedtuplestore + */ sts_puttuple(accessor, &metadata, mintup); if (shouldFree) diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 3cd2ec2e2eb6..0e5e9db82034 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -57,11 +57,15 @@ typedef struct SharedTuplestoreParticipant } SharedTuplestoreParticipant; /* The control object that lives in shared memory. */ +/* TODO: ntuples atomic 32 bit int is iffy. Didn't use 64bit because wasn't sure */ +/* about 64bit atomic ints portability */ +/* Seems like it would be possible to reduce the amount of synchronization instead */ +/* potentially using worker number to unique-ify the tuple number */ struct SharedTuplestore { int nparticipants; /* Number of participants that can write. */ pg_atomic_uint32 ntuples; - //TODO:does this belong elsewhere + /* TODO:does this belong elsewhere */ int flags; /* Flag bits from SHARED_TUPLESTORE_XXX */ size_t meta_data_size; /* Size of per-tuple header. */ char name[NAMEDATALEN]; /* A name for this tuplestore. */ @@ -631,8 +635,7 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) return NULL; } -/* TODO: fix signedness */ -int +uint32 sts_increment_tuplenum(SharedTuplestoreAccessor *accessor) { return pg_atomic_fetch_add_u32(&accessor->sts->ntuples, 1); @@ -719,22 +722,22 @@ sts_combine_outer_match_status_files(SharedTuplestoreAccessor *accessor) BufFile *combined_bitmap_file = BufFileCreateTemp(false); for (int64 cur = 0; cur < BufFileSize(statuses[0]); cur++) - //make it while not - EOF - { - unsigned char combined_byte = 0; - - for (int i = 0; i < statuses_length; i++) - { - unsigned char read_byte; + /* make it while not */ + EOF + { + unsigned char combined_byte = 0; - BufFileRead(statuses[i], &read_byte, 1); - combined_byte |= read_byte; - } + for (int i = 0; i < statuses_length; i++) + { + unsigned char read_byte; - BufFileWrite(combined_bitmap_file, &combined_byte, 1); + BufFileRead(statuses[i], &read_byte, 1); + combined_byte |= read_byte; } + BufFileWrite(combined_bitmap_file, &combined_byte, 1); + } + if (BufFileSeek(combined_bitmap_file, 0, 0L, SEEK_SET)) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 3e4f4bd5747a..e5a00f84e321 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -163,8 +163,8 @@ typedef struct ParallelHashJoinBatch * and does not require a lock to read */ bool parallel_hashloop_fallback; - int total_num_chunks; - int current_chunk_num; + int total_chunks; + int current_chunk; size_t estimated_chunk_size; Barrier chunk_barrier; LWLock lock; diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h index 6152ac163da2..8b2433e5c4b0 100644 --- a/src/include/utils/sharedtuplestore.h +++ b/src/include/utils/sharedtuplestore.h @@ -24,17 +24,15 @@ struct SharedTuplestoreAccessor; typedef struct SharedTuplestoreAccessor SharedTuplestoreAccessor; struct tupleMetadata; typedef struct tupleMetadata tupleMetadata; - -/* TODO: conflicting types for tupleid with accessor->sts->ntuples (uint32) */ -/* TODO: use a union for tupleid (uint32) (make this a uint64) and chunk number (int) */ struct tupleMetadata { uint32 hashvalue; - int tupleid; /* tuple id on outer side and chunk number for - * inner side */ -} __attribute__((packed)); - -/* TODO: make sure I can get rid of packed now that using sizeof(struct) */ + union + { + uint32 tupleid; /* tuple number or id on the outer side */ + int chunk; /* chunk number for inner side */ + }; +}; /* * A flag indicating that the tuplestore will only be scanned once, so backing @@ -72,7 +70,7 @@ extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data); -extern int sts_increment_tuplenum(SharedTuplestoreAccessor *accessor); +extern uint32 sts_increment_tuplenum(SharedTuplestoreAccessor *accessor); extern void sts_make_outer_match_status_file(SharedTuplestoreAccessor *accessor); extern void sts_set_outer_match_status(SharedTuplestoreAccessor *accessor, uint32 tuplenum); -- 2.25.0