From 47eb8392b8cf75a4e21dc50414af4857a5eade56 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Fri, 5 Apr 2024 13:32:14 +1300 Subject: [PATCH v11 1/4] Use streaming IO in heapam sequential and TID range scans Instead of calling ReadBuffer() for each block heap sequential scans and TID range scans now use the streaming read API introduced in b5a9b18cd0. Author: Melanie Plageman Reviewed-by: Thomas Munro Discussion: https://postgr.es/m/flat/CAAKRu_YtXJiYKQvb5JsA2SkwrsizYLugs4sSOZh3EAjKUg%3DgEQ%40mail.gmail.com --- src/backend/access/heap/heapam.c | 98 ++++++++++++++++++++++++++------ src/include/access/heapam.h | 15 +++++ 2 files changed, 95 insertions(+), 18 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dada2ecd1e3..bafe023bce8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -223,6 +223,25 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_scan_stream_read_next(ReadStream *pgsr, void *private_data, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) private_data; + + if (unlikely(!scan->rs_inited)) + { + scan->rs_prefetch_block = heapgettup_initial_block(scan, scan->rs_dir); + scan->rs_inited = true; + } + else + scan->rs_prefetch_block = heapgettup_advance_block(scan, + scan->rs_prefetch_block, + scan->rs_dir); + + return scan->rs_prefetch_block; +} + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -325,6 +344,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + /* + * Initialize to ForwardScanDirection because it is most common and heap + * scans usually must go forwards before going backward. + */ + scan->rs_dir = ForwardScanDirection; + scan->rs_prefetch_block = InvalidBlockNumber; + /* page-at-a-time fields are always invalid when not rs_inited */ /* @@ -462,12 +488,14 @@ heap_prepare_pagescan(TableScanDesc sscan) /* * heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM. * - * Read the next block of the scan relation into a buffer and pin that buffer - * before saving it in the scan descriptor. + * Read the next block of the scan relation from the read stream and pin that + * buffer before saving it in the scan descriptor. */ static inline void heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) { + Assert(scan->rs_read_stream); + /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -482,25 +510,23 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir) */ CHECK_FOR_INTERRUPTS(); - if (unlikely(!scan->rs_inited)) + /* + * If the scan direction is changing, reset the prefetch block to the + * current block. Otherwise, we will incorrectly prefetch the blocks + * between the prefetch block and the current block again before + * prefetching blocks in the new, correct scan direction. + */ + if (unlikely(scan->rs_dir != dir)) { - scan->rs_cblock = heapgettup_initial_block(scan, dir); + scan->rs_prefetch_block = scan->rs_cblock; + read_stream_reset(scan->rs_read_stream); + } - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(scan->rs_cblock != InvalidBlockNumber || - !BufferIsValid(scan->rs_cbuf)); + scan->rs_dir = dir; - scan->rs_inited = true; - } - else - scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, - dir); - - /* read block if valid */ - if (BlockNumberIsValid(scan->rs_cblock)) - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, - scan->rs_cblock, RBM_NORMAL, - scan->rs_strategy); + scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL); + if (BufferIsValid(scan->rs_cbuf)) + scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf); } /* @@ -833,6 +859,7 @@ continue_page: scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -928,6 +955,7 @@ continue_page: ReleaseBuffer(scan->rs_cbuf); scan->rs_cbuf = InvalidBuffer; scan->rs_cblock = InvalidBlockNumber; + scan->rs_prefetch_block = InvalidBlockNumber; tuple->t_data = NULL; scan->rs_inited = false; } @@ -1021,6 +1049,26 @@ heap_beginscan(Relation relation, Snapshot snapshot, initscan(scan, key, false); + scan->rs_read_stream = NULL; + + /* + * Set up a read stream for sequential scans and TID range scans. This + * should be done after initscan() because initscan() allocates the + * BufferAccessStrategy object passed to the streaming read API. + */ + if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN || + scan->rs_base.rs_flags & SO_TYPE_TIDRANGESCAN) + { + scan->rs_read_stream = read_stream_begin_relation(READ_STREAM_SEQUENTIAL, + scan->rs_strategy, + scan->rs_base.rs_rd, + MAIN_FORKNUM, + heap_scan_stream_read_next, + scan, + 0); + } + + return (TableScanDesc) scan; } @@ -1055,6 +1103,14 @@ heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * The read stream is reset on rescan. This must be done before + * initscan(), as some state referred to by read_stream_reset() is reset + * in initscan(). + */ + if (scan->rs_read_stream) + read_stream_reset(scan->rs_read_stream); + /* * reinitialize scan descriptor */ @@ -1074,6 +1130,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + /* + * Must free the read stream before freeing the BufferAccessStrategy. + */ + if (scan->rs_read_stream) + read_stream_end(scan->rs_read_stream); + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 2765efc4e5e..332a7faa8d1 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -25,6 +25,7 @@ #include "storage/bufpage.h" #include "storage/dsm.h" #include "storage/lockdefs.h" +#include "storage/read_stream.h" #include "storage/shm_toc.h" #include "utils/relcache.h" #include "utils/snapshot.h" @@ -70,6 +71,20 @@ typedef struct HeapScanDescData HeapTupleData rs_ctup; /* current tuple in scan, if any */ + /* For scans that stream reads */ + ReadStream *rs_read_stream; + + /* + * For sequential scans and TID range scans to stream reads. The read + * stream is allocated at the beginning of the scan and reset on rescan or + * when the scan direction changes. The scan direction is saved each time + * a new page is requested. If the scan direction changes from one page to + * the next, the read stream releases all previously pinned buffers and + * resets the prefetch block. + */ + ScanDirection rs_dir; + BlockNumber rs_prefetch_block; + /* * For parallel scans to store page allocation data. NULL when not * performing a parallel scan. -- 2.40.1