From 766703de5f03bf9d2bda73a0bb703f1c49bb2178 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Wed, 12 Apr 2023 18:16:06 -0700 Subject: [PATCH v5 07/10] WIP: Use streaming reads in heapam scans. XXX Cherry-picked from https://github.com/anarazel/postgres/tree/aio and lightly modified by TM, for demonstration purposes. Author: Andres Freund --- src/backend/access/heap/heapam.c | 205 +++++++++++++++++++++-- src/backend/access/heap/heapam_handler.c | 2 +- src/include/access/heapam.h | 5 +- 3 files changed, 192 insertions(+), 20 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 707460a536..76a53f68b6 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -65,6 +65,7 @@ #include "storage/smgr.h" #include "storage/spin.h" #include "storage/standby.h" +#include "storage/streaming_read.h" #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -225,6 +226,89 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = * ---------------------------------------------------------------- */ +static BlockNumber +heap_pgsr_next_single(PgStreamingRead *pgsr, void *pgsr_private, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) pgsr_private; + BlockNumber blockno; + + Assert(!scan->rs_base.rs_parallel); + Assert(scan->rs_nblocks > 0); + + if (scan->rs_prefetch_block == InvalidBlockNumber) + { + scan->rs_prefetch_block = blockno = scan->rs_startblock; + } + else + { + blockno = ++scan->rs_prefetch_block; + + /* wrap back to the start of the heap */ + if (blockno >= scan->rs_nblocks) + scan->rs_prefetch_block = blockno = 0; + + /* we're done if we're back at where we started */ + if (blockno == scan->rs_startblock) + return InvalidBlockNumber; + + /* check if the limit imposed by heap_setscanlimits() is met */ + if (scan->rs_numblocks != InvalidBlockNumber) + { + if (--scan->rs_numblocks == 0) + return InvalidBlockNumber; + } + } + + return blockno; +} + +static BlockNumber +heap_pgsr_next_parallel(PgStreamingRead *pgsr, void *pgsr_private, + void *per_buffer_data) +{ + HeapScanDesc scan = (HeapScanDesc) pgsr_private; + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + ParallelBlockTableScanWorker pbscanwork = + scan->rs_parallelworkerdata; + BlockNumber blockno; + + Assert(scan->rs_base.rs_parallel); + Assert(scan->rs_nblocks > 0); + + /* Note that other processes might have already finished the scan */ + blockno = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscanwork, pbscan); + + return blockno; +} + +static PgStreamingRead * +heap_pgsr_single_alloc(HeapScanDesc scan) +{ + return pg_streaming_read_buffer_alloc(PGSR_FLAG_DEFAULT, + scan, + 0, + scan->rs_strategy, + BMR_REL(scan->rs_base.rs_rd), + MAIN_FORKNUM, + heap_pgsr_next_single); +} + +static PgStreamingRead * +heap_pgsr_parallel_alloc(HeapScanDesc scan) +{ + return pg_streaming_read_buffer_alloc(PGSR_FLAG_SEQUENTIAL, + scan, + 0, + scan->rs_strategy, + BMR_REL(scan->rs_base.rs_rd), + MAIN_FORKNUM, + heap_pgsr_next_parallel); +} + + /* ---------------- * initscan - scan code common to heap_beginscan and heap_rescan * ---------------- @@ -342,6 +426,26 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) */ if (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN) pgstat_count_heap_scan(scan->rs_base.rs_rd); + + scan->rs_prefetch_block = InvalidBlockNumber; + if (scan->pgsr) + { + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + } + + /* + * FIXME: This probably should be done in the !rs_inited blocks instead. + */ + scan->pgsr = NULL; + if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && + (scan->rs_base.rs_flags & SO_TYPE_SEQSCAN)) + { + if (scan->rs_base.rs_parallel) + scan->pgsr = heap_pgsr_parallel_alloc(scan); + else + scan->pgsr = heap_pgsr_single_alloc(scan); + } } /* @@ -374,7 +478,7 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk * which tuples on the page are visible. */ void -heapgetpage(TableScanDesc sscan, BlockNumber block) +heapgetpage(TableScanDesc sscan, BlockNumber block, Buffer pgsr_buffer) { HeapScanDesc scan = (HeapScanDesc) sscan; Buffer buffer; @@ -401,9 +505,20 @@ heapgetpage(TableScanDesc sscan, BlockNumber block) */ CHECK_FOR_INTERRUPTS(); - /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, - RBM_NORMAL, scan->rs_strategy); + if (BufferIsValid(pgsr_buffer)) + { + Assert(scan->pgsr); + Assert(BufferGetBlockNumber(pgsr_buffer) == block); + scan->rs_cbuf = pgsr_buffer; + } + else + { + Assert(!scan->pgsr); + + /* read page using selected strategy */ + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, + RBM_NORMAL, scan->rs_strategy); + } scan->rs_cblock = block; if (!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)) @@ -490,7 +605,7 @@ heapgetpage(TableScanDesc sscan, BlockNumber block) * of the pages before we can get a chance to get our first page. */ static BlockNumber -heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) +heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir, Buffer *pgsr_buf) { Assert(!scan->rs_inited); @@ -500,16 +615,25 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) if (ScanDirectionIsForward(dir)) { + if (scan->rs_base.rs_parallel != NULL) + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + + /* FIXME: Integrate more neatly */ + if (scan->pgsr) + { + *pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); + if (*pgsr_buf == InvalidBuffer) + return InvalidBlockNumber; + return BufferGetBlockNumber(*pgsr_buf); + } + /* serial scan */ if (scan->rs_base.rs_parallel == NULL) return scan->rs_startblock; else { - /* parallel scan */ - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - scan->rs_parallelworkerdata, - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); - /* may return InvalidBlockNumber if there are no more blocks */ return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, scan->rs_parallelworkerdata, @@ -529,6 +653,12 @@ heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) */ scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; + if (scan->pgsr) + { + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + } + /* * Start from last page of the scan. Ensure we take into account * rs_numblocks if it's been adjusted by heap_setscanlimits(). @@ -630,11 +760,33 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, * heap_setscanlimits(). */ static inline BlockNumber -heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) +heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir, + Buffer *pgsr_buf) { if (ScanDirectionIsForward(dir)) { - if (scan->rs_base.rs_parallel == NULL) + if (scan->pgsr) + { +#ifdef USE_ASSERT_CHECKING + block++; + + /* wrap back to the start of the heap */ + if (block >= scan->rs_nblocks) + block = 0; + + /* we're done if we're back at where we started */ + if (block == scan->rs_startblock) + block = InvalidBlockNumber; +#endif + *pgsr_buf = pg_streaming_read_buffer_get_next(scan->pgsr, NULL); + if (*pgsr_buf == InvalidBuffer) + return InvalidBlockNumber; + + Assert(scan->rs_base.rs_parallel || + block == BufferGetBlockNumber(*pgsr_buf)); + return BufferGetBlockNumber(*pgsr_buf); + } + else if (scan->rs_base.rs_parallel == NULL) { block++; @@ -679,6 +831,12 @@ heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir } else { + if (scan->pgsr) + { + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + } + /* we're done if the last block is the start position */ if (block == scan->rs_startblock) return InvalidBlockNumber; @@ -728,13 +886,14 @@ heapgettup(HeapScanDesc scan, { HeapTuple tuple = &(scan->rs_ctup); BlockNumber block; + Buffer pgsr_buf = InvalidBuffer; Page page; OffsetNumber lineoff; int linesleft; if (unlikely(!scan->rs_inited)) { - block = heapgettup_initial_block(scan, dir); + block = heapgettup_initial_block(scan, dir, &pgsr_buf); /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); scan->rs_inited = true; @@ -755,7 +914,7 @@ heapgettup(HeapScanDesc scan, */ while (block != InvalidBlockNumber) { - heapgetpage((TableScanDesc) scan, block); + heapgetpage((TableScanDesc) scan, block, pgsr_buf); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_start_page(scan, dir, &linesleft, &lineoff); continue_page: @@ -807,9 +966,10 @@ continue_page: * it's time to move to the next. */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); + pgsr_buf = InvalidBuffer; /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); + block = heapgettup_advance_block(scan, block, dir, &pgsr_buf); } /* end of scan */ @@ -843,13 +1003,14 @@ heapgettup_pagemode(HeapScanDesc scan, { HeapTuple tuple = &(scan->rs_ctup); BlockNumber block; + Buffer pgsr_buf = InvalidBuffer; Page page; int lineindex; int linesleft; if (unlikely(!scan->rs_inited)) { - block = heapgettup_initial_block(scan, dir); + block = heapgettup_initial_block(scan, dir, &pgsr_buf); /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); scan->rs_inited = true; @@ -876,7 +1037,7 @@ heapgettup_pagemode(HeapScanDesc scan, */ while (block != InvalidBlockNumber) { - heapgetpage((TableScanDesc) scan, block); + heapgetpage((TableScanDesc) scan, block, pgsr_buf); page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; @@ -908,7 +1069,7 @@ continue_page: } /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); + block = heapgettup_advance_block(scan, block, dir, &pgsr_buf); } /* end of scan */ @@ -956,6 +1117,8 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_parallel = parallel_scan; scan->rs_strategy = NULL; /* set in initscan */ + scan->pgsr = NULL; + /* * Disable page-at-a-time mode if it's not a MVCC-safe snapshot. */ @@ -1062,6 +1225,12 @@ heap_endscan(TableScanDesc sscan) if (BufferIsValid(scan->rs_cbuf)) ReleaseBuffer(scan->rs_cbuf); + if (scan->pgsr) + { + pg_streaming_read_free(scan->pgsr); + scan->pgsr = NULL; + } + /* * decrement relation reference count and free scan descriptor storage */ diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index d15a02b2be..6127f9d75a 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -2335,7 +2335,7 @@ heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) return false; } - heapgetpage(scan, blockno); + heapgetpage(scan, blockno, InvalidBuffer); hscan->rs_inited = true; return true; diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 932ec0d6f2..bc53f9f4d2 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -59,6 +59,7 @@ typedef struct HeapScanDescData bool rs_inited; /* false = scan not init'd yet */ OffsetNumber rs_coffset; /* current offset # in non-page-at-a-time mode */ BlockNumber rs_cblock; /* current block # in scan, if any */ + BlockNumber rs_prefetch_block; /* block being prefetched */ Buffer rs_cbuf; /* current buffer in scan, if any */ /* NB: if rs_cbuf is not InvalidBuffer, we hold a pin on that buffer */ @@ -72,6 +73,8 @@ typedef struct HeapScanDescData */ ParallelBlockTableScanWorkerData *rs_parallelworkerdata; + struct PgStreamingRead *pgsr; + /* these fields only used in page-at-a-time mode and for bitmap scans */ int rs_cindex; /* current tuple's index in vistuples */ int rs_ntuples; /* number of visible tuples on page */ @@ -246,7 +249,7 @@ extern TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot, uint32 flags); extern void heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks); -extern void heapgetpage(TableScanDesc sscan, BlockNumber block); +extern void heapgetpage(TableScanDesc sscan, BlockNumber block, Buffer buffer); extern void heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, bool allow_strat, bool allow_sync, bool allow_pagemode); extern void heap_endscan(TableScanDesc sscan); -- 2.39.2