From a352a861372aa0738746fce0b5a5dc962bb87d95 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Wed, 10 Apr 2019 15:19:23 +1200 Subject: [PATCH] Experimental ExecScrollSlot() for hash join prefetch. Incomplet, inkorrect, proof-of-concept code only. Problems with ExecScrollSlot(): * needs a configure test * not preserving deform state when scrolling back and forth * should ExecScrollSlot(n) be absolute or relative? * how could we have a "consuming" mode, so that has build could use a tight inner loop over all the tuples in a page? * look-ahead size of just 1 for now! * only works with page-mode BufferHeapTupleTableSlot with no key test * ... Problems with hash join prefetch: * anti-joins are broken (!), see make check, haven't looked into why yet * ignoring skew hash table * not prefetching in hash build phase * need parallel hash join support * ... Author: Thomas Munro Discussion: https://postgr.es/m/flat/CA%2Bq6zcXg5-Rc4k0JY%2B7%3DgEDGWjCVp0X9t7JdnCuaAfeNmtTEZQ%40mail.gmail.com#35d34634efe8315587efd0b0da9775fc --- src/backend/access/heap/heapam.c | 21 +++++++++++++- src/backend/access/heap/heapam_handler.c | 10 +++---- src/backend/executor/execTuples.c | 36 +++++++++++++++++++++-- src/backend/executor/nodeHash.c | 37 ++++++++++++++++++++++++ src/backend/executor/nodeHashjoin.c | 4 +++ src/include/access/heapam.h | 1 + src/include/executor/hashjoin.h | 5 ++++ src/include/executor/nodeHash.h | 6 ++++ src/include/executor/tuptable.h | 26 ++++++++++++++++- 9 files changed, 136 insertions(+), 10 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index a05b6a07ad..6e66acb688 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -292,6 +292,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_numblocks = InvalidBlockNumber; scan->rs_inited = false; scan->rs_ctup.t_data = NULL; + scan->rs_ntup.t_data = NULL; ItemPointerSetInvalid(&scan->rs_ctup.t_self); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; @@ -493,6 +494,8 @@ heapgettup(HeapScanDesc scan, int linesleft; ItemId lpp; + scan->rs_ntup.t_data = NULL; + /* * calculate next starting lineoff, given scan direction */ @@ -807,6 +810,8 @@ heapgettup_pagemode(HeapScanDesc scan, int linesleft; ItemId lpp; + scan->rs_ntup.t_data = NULL; + /* * calculate next starting lineindex, given scan direction */ @@ -983,6 +988,18 @@ heapgettup_pagemode(HeapScanDesc scan, else { scan->rs_cindex = lineindex; + + /* Allow executor nodes to scroll to the next tuple. */ + if (linesleft > 1) + { + lineoff = scan->rs_vistuples[lineindex + (backward ? -1 : 1)]; + lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); + tuple = &scan->rs_ntup; + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), page, lineoff); + } return; } @@ -1356,7 +1373,9 @@ heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *s pgstat_count_heap_getnext(scan->rs_base.rs_rd); - ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, + ExecStoreBufferHeapTuple(&scan->rs_ctup, + scan->rs_ntup.t_data ? &scan->rs_ntup : NULL, + slot, scan->rs_cbuf); return true; } diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 1d8ca2429f..8c1a3c6564 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -163,7 +163,7 @@ heapam_index_fetch_tuple(struct IndexFetchTableData *scan, *call_again = !IsMVCCSnapshot(snapshot); slot->tts_tableOid = RelationGetRelid(scan->rel); - ExecStoreBufferHeapTuple(&bslot->base.tupdata, slot, hscan->xs_cbuf); + ExecStoreBufferHeapTuple(&bslot->base.tupdata, NULL, slot, hscan->xs_cbuf); } else { @@ -1104,7 +1104,7 @@ heapam_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, if (sample_it) { - ExecStoreBufferHeapTuple(targtuple, slot, hscan->rs_cbuf); + ExecStoreBufferHeapTuple(targtuple, NULL, slot, hscan->rs_cbuf); hscan->rs_cindex++; /* note that we leave the buffer locked here! */ @@ -1578,7 +1578,7 @@ heapam_index_build_range_scan(Relation heapRelation, MemoryContextReset(econtext->ecxt_per_tuple_memory); /* Set up for predicate or expression evaluation */ - ExecStoreBufferHeapTuple(heapTuple, slot, hscan->rs_cbuf); + ExecStoreBufferHeapTuple(heapTuple, NULL, slot, hscan->rs_cbuf); /* * In a partial index, discard tuples that don't satisfy the @@ -2220,7 +2220,7 @@ heapam_scan_bitmap_next_tuple(TableScanDesc scan, * Set up the result slot to point to this tuple. Note that the slot * acquires a pin on the buffer. */ - ExecStoreBufferHeapTuple(&hscan->rs_ctup, + ExecStoreBufferHeapTuple(&hscan->rs_ctup, NULL, slot, hscan->rs_cbuf); @@ -2374,7 +2374,7 @@ heapam_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, if (!pagemode) LockBuffer(hscan->rs_cbuf, BUFFER_LOCK_UNLOCK); - ExecStoreBufferHeapTuple(tuple, slot, hscan->rs_cbuf); + ExecStoreBufferHeapTuple(tuple, NULL, slot, hscan->rs_cbuf); /* Count successfully-fetched tuples as heap fetches */ pgstat_count_heap_getnext(scan->rs_rd); diff --git a/src/backend/executor/execTuples.c b/src/backend/executor/execTuples.c index 41fa374b6f..715b7dcb7d 100644 --- a/src/backend/executor/execTuples.c +++ b/src/backend/executor/execTuples.c @@ -76,6 +76,7 @@ slot_deform_heap_tuple(TupleTableSlot *slot, HeapTuple tuple, uint32 *offp, int natts); static inline void tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, + HeapTuple next, Buffer buffer, bool transfer_pin); static void tts_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, bool shouldFree); @@ -664,6 +665,7 @@ tts_buffer_heap_clear(TupleTableSlot *slot) ItemPointerSetInvalid(&slot->tts_tid); bslot->base.tuple = NULL; bslot->base.off = 0; + bslot->ntuples = 0; bslot->buffer = InvalidBuffer; } @@ -737,6 +739,9 @@ tts_buffer_heap_materialize(TupleTableSlot *slot) */ bslot->base.off = 0; slot->tts_nvalid = 0; + + bslot->ntuples = 1; + bslot->tuples[0] = bslot->base.tuple; } static void @@ -767,7 +772,7 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot) { Assert(BufferIsValid(bsrcslot->buffer)); - tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple, + tts_buffer_heap_store_tuple(dstslot, bsrcslot->base.tuple, NULL, bsrcslot->buffer, false); /* @@ -779,6 +784,24 @@ tts_buffer_heap_copyslot(TupleTableSlot *dstslot, TupleTableSlot *srcslot) memcpy(&bdstslot->base.tupdata, bdstslot->base.tuple, sizeof(HeapTupleData)); bdstslot->base.tuple = &bdstslot->base.tupdata; } + bdstslot->tuples[0] = bdstslot->base.tuple; + bdstslot->ntuples = 1; +} + +static bool +tts_buffer_heap_scrollslot(TupleTableSlot *slot, int n) +{ + BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; + + if (n >= bslot->ntuples) + return false; + + bslot->base.tuple = bslot->tuples[n]; + bslot->base.off = 0; + slot->tts_nvalid = 0; + slot->tts_tid = bslot->base.tuple->t_self; + + return true; } static HeapTuple @@ -822,6 +845,7 @@ tts_buffer_heap_copy_minimal_tuple(TupleTableSlot *slot) static inline void tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, + HeapTuple next_tuple, Buffer buffer, bool transfer_pin) { BufferHeapTupleTableSlot *bslot = (BufferHeapTupleTableSlot *) slot; @@ -838,6 +862,9 @@ tts_buffer_heap_store_tuple(TupleTableSlot *slot, HeapTuple tuple, slot->tts_flags &= ~TTS_FLAG_EMPTY; slot->tts_nvalid = 0; bslot->base.tuple = tuple; + bslot->tuples[0] = tuple; + bslot->tuples[1] = next_tuple; + bslot->ntuples = next_tuple ? 2 : 1; bslot->base.off = 0; slot->tts_tid = tuple->t_self; @@ -1050,6 +1077,7 @@ const TupleTableSlotOps TTSOpsBufferHeapTuple = { .getsysattr = tts_buffer_heap_getsysattr, .materialize = tts_buffer_heap_materialize, .copyslot = tts_buffer_heap_copyslot, + .scrollslot= tts_buffer_heap_scrollslot, .get_heap_tuple = tts_buffer_heap_get_heap_tuple, /* A buffer heap tuple table slot can not "own" a minimal tuple. */ @@ -1339,6 +1367,7 @@ ExecStoreHeapTuple(HeapTuple tuple, * into a specified slot in the tuple table. * * tuple: tuple to store + * next_tuple: if available, the next tuple of a scan, in the same buffer * slot: TTSOpsBufferHeapTuple type slot to store it in * buffer: disk buffer if tuple is in a disk page, else InvalidBuffer * @@ -1353,6 +1382,7 @@ ExecStoreHeapTuple(HeapTuple tuple, */ TupleTableSlot * ExecStoreBufferHeapTuple(HeapTuple tuple, + HeapTuple next_tuple, TupleTableSlot *slot, Buffer buffer) { @@ -1366,7 +1396,7 @@ ExecStoreBufferHeapTuple(HeapTuple tuple, if (unlikely(!TTS_IS_BUFFERTUPLE(slot))) elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot"); - tts_buffer_heap_store_tuple(slot, tuple, buffer, false); + tts_buffer_heap_store_tuple(slot, tuple, next_tuple, buffer, false); slot->tts_tableOid = tuple->t_tableOid; @@ -1392,7 +1422,7 @@ ExecStorePinnedBufferHeapTuple(HeapTuple tuple, if (unlikely(!TTS_IS_BUFFERTUPLE(slot))) elog(ERROR, "trying to store an on-disk heap tuple into wrong type of slot"); - tts_buffer_heap_store_tuple(slot, tuple, buffer, true); + tts_buffer_heap_store_tuple(slot, tuple, NULL, buffer, true); slot->tts_tableOid = tuple->t_tableOid; diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index 64eec91f8b..cdf5f4db1b 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -510,6 +510,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations, hashtable->parallel_state = state->parallel_state; hashtable->area = state->ps.state->es_query_dsa; hashtable->batches = NULL; + hashtable->have_prefetch = false; #ifdef HJDEBUG printf("Hashjoin %p: initial nbatch = %d, nbuckets = %d\n", @@ -1769,6 +1770,34 @@ ExecParallelHashTableInsertCurrentBatch(HashJoinTable hashtable, heap_free_minimal_tuple(tuple); } +void +ExecHashPrefetch(HashJoinTable hashtable, + TupleTableSlot *slot, + ExprContext *econtext, + List *hashkeys, + bool outer_tuple, + bool keep_nulls) +{ + hashtable->have_prefetch = false; + + if (ExecScrollSlot(slot, 1)) + { + uint32 next_bucket; + + hashtable->prefetch_result = + ExecHashGetHashValue(hashtable, + econtext, + hashkeys, + outer_tuple, + keep_nulls, + &hashtable->prefetch_hash); + hashtable->have_prefetch = true; + next_bucket = hashtable->prefetch_hash % hashtable->nbuckets; + __builtin_prefetch(&hashtable->buckets.unshared[next_bucket]); + ExecScrollSlot(slot, 0); + } +} + /* * ExecHashGetHashValue * Compute the hash value for a tuple @@ -1796,6 +1825,14 @@ ExecHashGetHashValue(HashJoinTable hashtable, int i = 0; MemoryContext oldContext; + /* Do we have a pre-computed result from ExecHashPrefetch()? */ + if (hashtable->have_prefetch) + { + hashtable->have_prefetch = false; + *hashvalue = hashtable->prefetch_hash; + return hashtable->prefetch_result; + } + /* * We reset the eval context each time to reclaim any memory leaked in the * hashkey expressions. diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index aa43296e26..853eff4682 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -855,6 +855,10 @@ ExecHashJoinOuterGetTuple(PlanState *outerNode, /* remember outer relation is not empty for possible rescan */ hjstate->hj_OuterNotEmpty = true; + ExecHashPrefetch(hashtable, slot, econtext, + hjstate->hj_OuterHashKeys, + true, /* outer tuple */ + HJ_FILL_OUTER(hjstate)); return slot; } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 77e5e603b0..1b2cd06dc5 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -63,6 +63,7 @@ typedef struct HeapScanDescData BufferAccessStrategy rs_strategy; /* access strategy for reads */ HeapTupleData rs_ctup; /* current tuple in scan, if any */ + HeapTupleData rs_ntup; /* next tuple in scan, if any */ /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ diff --git a/src/include/executor/hashjoin.h b/src/include/executor/hashjoin.h index 2c94b926d3..dd0d3ded3d 100644 --- a/src/include/executor/hashjoin.h +++ b/src/include/executor/hashjoin.h @@ -329,6 +329,11 @@ typedef struct HashJoinTableData BufFile **innerBatchFile; /* buffered virtual temp file per batch */ BufFile **outerBatchFile; /* buffered virtual temp file per batch */ + /* State used for prefetching cache lines for future tuples. */ + bool have_prefetch; + uint32 prefetch_hash; + bool prefetch_result; + /* * Info about the datatype-specific hash functions for the datatypes being * hashed. These are arrays of the same length as the number of hash join diff --git a/src/include/executor/nodeHash.h b/src/include/executor/nodeHash.h index 1233766023..8dab0c4e99 100644 --- a/src/include/executor/nodeHash.h +++ b/src/include/executor/nodeHash.h @@ -49,6 +49,12 @@ extern bool ExecHashGetHashValue(HashJoinTable hashtable, bool outer_tuple, bool keep_nulls, uint32 *hashvalue); +extern void ExecHashPrefetch(HashJoinTable hashtable, + TupleTableSlot *slot, + ExprContext *econtext, + List *hashkeys, + bool outer_tuple, + bool keep_nulls); extern void ExecHashGetBucketAndBatch(HashJoinTable hashtable, uint32 hashvalue, int *bucketno, diff --git a/src/include/executor/tuptable.h b/src/include/executor/tuptable.h index b0561ebe29..951ade0de8 100644 --- a/src/include/executor/tuptable.h +++ b/src/include/executor/tuptable.h @@ -179,6 +179,11 @@ struct TupleTableSlotOps */ void (*copyslot) (TupleTableSlot *dstslot, TupleTableSlot *srcslot); + /* + * Scroll forward to a future tuple, if the slot allows it. + */ + bool (*scrollslot) (TupleTableSlot *slot, int n); + /* * Return a heap tuple "owned" by the slot. It is slot's responsibility to * free the memory consumed by the heap tuple. If the slot can not "own" a @@ -258,6 +263,12 @@ typedef struct BufferHeapTupleTableSlot { HeapTupleTableSlot base; +#define BUFFERHEAPTUPLE_SCROLL_SIZE 2 + /* Buffer to support scrolling. */ + HeapTuple tuples[BUFFERHEAPTUPLE_SCROLL_SIZE]; + int ntuples; + int ctuple; + /* * If buffer is not InvalidBuffer, then the slot is holding a pin on the * indicated buffer page; drop the pin when we release the slot's @@ -265,7 +276,7 @@ typedef struct BufferHeapTupleTableSlot * false in such a case, since presumably tts_tuple is pointing at the * buffer page.) */ - Buffer buffer; /* tuple's buffer, or InvalidBuffer */ + Buffer buffer; /* tuples' buffer, or InvalidBuffer */ } BufferHeapTupleTableSlot; typedef struct MinimalTupleTableSlot @@ -308,6 +319,7 @@ extern TupleTableSlot *ExecStoreHeapTuple(HeapTuple tuple, bool shouldFree); extern void ExecForceStoreHeapTuple(HeapTuple tuple, TupleTableSlot *slot); extern TupleTableSlot *ExecStoreBufferHeapTuple(HeapTuple tuple, + HeapTuple peek_next, TupleTableSlot *slot, Buffer buffer); extern TupleTableSlot *ExecStorePinnedBufferHeapTuple(HeapTuple tuple, @@ -480,6 +492,18 @@ ExecCopySlot(TupleTableSlot *dstslot, TupleTableSlot *srcslot) return dstslot; } +/* + * Try to scroll to another tuple. + */ +static inline bool +ExecScrollSlot(TupleTableSlot *slot, int n) +{ + Assert(!TTS_EMPTY(slot)); + + return slot->tts_ops->scrollslot && slot->tts_ops->scrollslot(slot, n); +} + + #endif /* FRONTEND */ #endif /* TUPTABLE_H */ -- 2.21.0