From 46b05c9ac73ab661332a77fd4bd90238cb652836 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Tue, 11 Feb 2020 11:11:38 -0800 Subject: [PATCH v5 5/5] Avoid rescanning inner tuples per stripe Instead of reinitializing the SharedTuplestore for the inner side for each stripe during fallback, each participant's read_page is set to the begininng of the SharedTuplestoreChunk which contains the end of one stripe and the beginning of another. Previously all inner tuples were scanned and only tuples from the current stripe were loaded. Each SharedTuplestoreAccessor now has a variable start_page, which is initialized when it is assigned its read_page (which will always be the beginning of a SharedTuplestoreChunk). While loading tuples into the hashtable, if a tuple is from a past stripe, the worker skips it (that will happen when a stripe straddles two SharedTuplestoreChunks). If a tuple is from the future, the worker backs that SharedTuplestoreChunk out and sets the shared read_page (in the shared SharedTuplestoreParticipant) back to its start_page. There are a couple mechanisms to provide for synchronization that address specific race conditions/synchronization points: Scenario 1: - given a batch which has multiple stripes and has chosen the fallback strategy, two workers have each started reading from a single participant file - worker0 is assigned pages 0-4 - worker1 is assigned pages 4-8 - stripe 1 starts on page 0 and ends on page 3 - worker0 sees that a tuple on page 3 is from stripe 2, so it proceeds to back out the read_page for this participant from 8 to its start_page of 0 - read_page is now 0 - worker1 was descheduled or distracted and starts reading a bit later. It sees that the very first tuple on page 4 is from a future stripe, so it wants to back out read_page to its start_page: 4 - If worker1 was allowed to do this, read_page would incorrectly be 4 and tuples from stripe 2 on pages 3 and 4 would not be loaded into the hashtable To handle this, a worker can only set read_page to a start_page which is less than the current value of read_page Scenario 2: - given a batch which has multiple stripes and has chosen the fallback strategy, worker0 reads from participant file 0 and worker1 reads from participant file 1 - worker0 is assigned pages 0-4 in file1 - stripe 1 starts on page 0 and ends on page 3 - the current stripe is stripe 1 - worker0 sees on page3 that stripe 2 starts, so it backs out the read_page to 0 - worker1 finishes participant file 1 and proceeds to read from participant file 0 - worker1 opens the file and goes to get read_page - read_page is 0, so worker1 loads tuples from stripe 1 in pages 0-3 - now both workers have loaded the same tuples into the hashtable To handle this scenario, the participant has a rewound flag, which indicates if this participant has been rewound during loading of the current stripe. If it has, a worker cannot be assigned a SharedTuplestoreChunk. This flag is reset later. In this patch, Hashjoin makes an unacceptable intrusion into the SharedTuplestore API. I am looking for feedback on how to solve this. Basically, because the SharedTuplestore does not know about stripes or about HashJoin, the logic to decide if a tuple should be loaded into a hashtable or not is in the stripe phase machine where tuples are loaded into the hashtable. So, to ensure that workers have read from all participant files before assuming all tuples from a stripe are loaded, I have duplicated the logic from sts_parallel_scan_next() which has workers try the next participant in the body of the tuple loading loop in the stripe phase machine (see sts_ready_for_next_stripe() and sts_seen_all_participants()). This clearly needs to be fixed and it is arguable that there are other intrusions into the SharedTuplestore API in these patches. One option is to write each stripe for each participant to a different file, preserving the idea that a worker is done with a read_file when it is at EOF. --- src/backend/executor/adaptiveHashjoin.c | 37 +++++--- src/backend/utils/sort/sharedtuplestore.c | 108 +++++++++++++++++++++- src/include/utils/sharedtuplestore.h | 9 ++ 3 files changed, 140 insertions(+), 14 deletions(-) diff --git a/src/backend/executor/adaptiveHashjoin.c b/src/backend/executor/adaptiveHashjoin.c index 696bfc1c79..bc0eb46b90 100644 --- a/src/backend/executor/adaptiveHashjoin.c +++ b/src/backend/executor/adaptiveHashjoin.c @@ -85,6 +85,9 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) sts_reinitialize(outer_tuples); + /* set the rewound flag back to false to prepare for the next stripe */ + sts_reset_rewound(inner_tuples); + /* * reset inner's hashtable and recycle the existing bucket * array. @@ -96,33 +99,37 @@ ExecParallelHashJoinNewChunk(HashJoinState *hjstate, bool advance_from_probing) for (size_t i = 0; i < hashtable->nbuckets; ++i) dsa_pointer_atomic_write(&buckets[i], InvalidDsaPointer); - - /* - * TODO: this will unfortunately rescan all inner tuples - * in the batch for each chunk - */ - - /* - * should be able to save the block in the file which - * starts the next chunk instead - */ - sts_reinitialize(inner_tuples); } /* Fall through. */ case PHJ_CHUNK_RESETTING: BarrierArriveAndWait(chunk_barrier, WAIT_EVENT_HASH_CHUNK_RESETTING); case PHJ_CHUNK_LOADING: /* Start (or join in) loading the next chunk of inner tuples. */ - sts_begin_parallel_scan(inner_tuples); + sts_resume_parallel_scan(inner_tuples); MinimalTuple tuple; tupleMetadata metadata; while ((tuple = sts_parallel_scan_next(inner_tuples, &metadata))) { - if (metadata.chunk != phj_batch->current_chunk) + int current_stripe; + LWLockAcquire(&phj_batch->lock, LW_SHARED); + current_stripe = phj_batch->current_chunk; + LWLockRelease(&phj_batch->lock); + + /* tuple from past. skip */ + if (metadata.chunk < current_stripe) continue; + /* tuple from future. time to back out read_page. end of stripe */ + else if (metadata.chunk > current_stripe) + { + sts_backout_chunk(inner_tuples); + if (sts_seen_all_participants(inner_tuples)) + break; + sts_ready_for_next_stripe(inner_tuples); + continue; + } ExecForceStoreMinimalTuple(tuple, hjstate->hj_HashTupleSlot, false); @@ -384,6 +391,8 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) BarrierInit(&(barriers[i]), 0); } phj_batch->current_chunk = 1; + /* one worker needs to 0 out the read_pages of all the participants in the new batch */ + sts_reinitialize(hashtable->batches[batchno].inner_tuples); } /* Fall through. */ @@ -410,6 +419,8 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate) if (batchno == 0) sts_begin_parallel_scan(hashtable->batches[batchno].outer_tuples); + sts_begin_parallel_scan(hashtable->batches[batchno].inner_tuples); + /* * Create an outer match status file for this batch for * this worker This file must be accessible to the other diff --git a/src/backend/utils/sort/sharedtuplestore.c b/src/backend/utils/sort/sharedtuplestore.c index 045b8eca80..a45f86bdd2 100644 --- a/src/backend/utils/sort/sharedtuplestore.c +++ b/src/backend/utils/sort/sharedtuplestore.c @@ -52,6 +52,7 @@ typedef struct SharedTuplestoreParticipant { LWLock lock; BlockNumber read_page; /* Page number for next read. */ + bool rewound; BlockNumber npages; /* Number of pages written. */ bool writing; /* Used only for assertions. */ } SharedTuplestoreParticipant; @@ -91,6 +92,7 @@ struct SharedTuplestoreAccessor char *read_buffer; /* A buffer for loading tuples. */ size_t read_buffer_size; BlockNumber read_next_page; /* Lowest block we'll consider reading. */ + BlockNumber start_page; /* page to reset p->read_page to if back out required */ /* State for writing. */ SharedTuplestoreChunk *write_chunk; /* Buffer for writing. */ @@ -103,6 +105,21 @@ struct SharedTuplestoreAccessor static void sts_filename(char *name, SharedTuplestoreAccessor *accessor, int participant); +bool +sts_seen_all_participants(SharedTuplestoreAccessor *accessor) +{ + accessor->read_participant = (accessor->read_participant + 1) % + accessor->sts->nparticipants; + return accessor->read_participant == accessor->participant; +} +void +sts_ready_for_next_stripe(SharedTuplestoreAccessor *accessor) +{ + accessor->read_next_page = 0; + BufFileClose(accessor->read_file); + accessor->read_file = NULL; +} + /* * Return the amount of shared memory required to hold SharedTuplestore for a * given number of participants. @@ -166,6 +183,7 @@ sts_initialize(SharedTuplestore *sts, int participants, LWTRANCHE_SHARED_TUPLESTORE); sts->participants[i].read_page = 0; sts->participants[i].writing = false; + sts->participants[i].rewound = false; } accessor = palloc0(sizeof(SharedTuplestoreAccessor)); @@ -284,6 +302,47 @@ sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor) accessor->read_participant = accessor->participant; accessor->read_file = NULL; accessor->read_next_page = 0; + /* + * As long as all code paths go through the Stripe Phase Machine and the + * Batch Phase Machine, it is not required to zero out start_page here. + * Do it anyway, for now. + */ + accessor->start_page = 0; +} + +void +sts_resume_parallel_scan(SharedTuplestoreAccessor *accessor) +{ + int i PG_USED_FOR_ASSERTS_ONLY; + + /* End any existing scan that was in progress. */ + sts_end_parallel_scan(accessor); + + /* + * Any backend that might have written into this shared tuplestore must + * have called sts_end_write(), so that all buffers are flushed and the + * files have stopped growing. + */ + for (i = 0; i < accessor->sts->nparticipants; ++i) + Assert(!accessor->sts->participants[i].writing); + + /* + * We will start out reading the file that THIS backend wrote. There may + * be some caching locality advantage to that. + */ + /* + * TODO: does this still apply in the multi-stripe case? + * It seems like if a participant file is exhausted for the current stripe + * it might be better to remember that + */ + accessor->read_participant = accessor->participant; + accessor->read_file = NULL; + SharedTuplestoreParticipant *p = &accessor->sts->participants[accessor->read_participant]; + + LWLockAcquire(&p->lock, LW_SHARED); + accessor->start_page = accessor->sts->participants[accessor->read_participant].read_page; + LWLockRelease(&p->lock); + accessor->read_next_page = 0; } /* @@ -302,6 +361,30 @@ sts_end_parallel_scan(SharedTuplestoreAccessor *accessor) BufFileClose(accessor->read_file); accessor->read_file = NULL; } + /* It is probably not required to zero out start_page here */ + accessor->start_page = 0; +} + +void +sts_backout_chunk(SharedTuplestoreAccessor *accessor) +{ + SharedTuplestoreParticipant *p = &accessor->sts->participants[accessor->read_participant]; + + LWLockAcquire(&p->lock, LW_EXCLUSIVE); + /* + * Only set the read_page back to the start of the sts_chunk this worker was + * reading if some other worker has not already done so. It could be the case + * that this worker saw a tuple from a future stripe and another worker did + * also in its stschunk and it already set read_page to its start_page + * If so, we want to set read_page to the lowest value to ensure that we + * read all tuples from the stripe (don't miss tuples) + */ + if (accessor->start_page < p->read_page) + { + p->read_page = accessor->start_page; + p->rewound = true; + } + LWLockRelease(&p->lock); } /* @@ -526,6 +609,17 @@ sts_read_tuple(SharedTuplestoreAccessor *accessor, void *meta_data) return tuple; } +void +sts_reset_rewound(SharedTuplestoreAccessor *accessor) +{ + SharedTuplestoreParticipant *p; + for (int i = 0; i < accessor->sts->nparticipants; ++i) + { + p = &accessor->sts->participants[i]; + p->rewound = false; + } +} + /* * Get the next tuple in the current parallel scan. */ @@ -539,7 +633,12 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) for (;;) { /* Can we read more tuples from the current chunk? */ - if (accessor->read_ntuples < accessor->read_ntuples_available) + /* + * Added a check for accessor->read_file being present here, as it + * became relevant for adaptive hashjoin. Not sure if this has + * other consequences for correctness + */ + if (accessor->read_ntuples < accessor->read_ntuples_available && accessor->read_file) return sts_read_tuple(accessor, meta_data); /* Find the location of a new chunk to read. */ @@ -552,11 +651,18 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data) eof = p->read_page >= p->npages; if (!eof) { + if (p->rewound == true) + { + LWLockRelease(&p->lock); + return NULL; + } /* Claim the next chunk. */ read_page = p->read_page; /* Advance the read head for the next reader. */ p->read_page += STS_CHUNK_PAGES; accessor->read_next_page = p->read_page; + /* initialize start_page to the read_page this participant will start reading from */ + accessor->start_page = read_page; } LWLockRelease(&p->lock); diff --git a/src/include/utils/sharedtuplestore.h b/src/include/utils/sharedtuplestore.h index 5e78f4bb15..fadf0232d0 100644 --- a/src/include/utils/sharedtuplestore.h +++ b/src/include/utils/sharedtuplestore.h @@ -60,8 +60,16 @@ extern void sts_reinitialize(SharedTuplestoreAccessor *accessor); extern void sts_begin_parallel_scan(SharedTuplestoreAccessor *accessor); +extern void sts_resume_parallel_scan(SharedTuplestoreAccessor *accessor); + extern void sts_end_parallel_scan(SharedTuplestoreAccessor *accessor); +extern void sts_backout_chunk(SharedTuplestoreAccessor *accessor); + +extern bool sts_seen_all_participants(SharedTuplestoreAccessor *accessor); + +extern void sts_ready_for_next_stripe(SharedTuplestoreAccessor *accessor); + extern void sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data, MinimalTuple tuple); @@ -69,6 +77,7 @@ extern void sts_puttuple(SharedTuplestoreAccessor *accessor, extern MinimalTuple sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data); +extern void sts_reset_rewound(SharedTuplestoreAccessor *accessor); extern uint32 sts_increment_tuplenum(SharedTuplestoreAccessor *accessor); extern uint32 sts_get_tuplenum(SharedTuplestoreAccessor *accessor); -- 2.20.1 (Apple Git-117)