From a3176b7a8fce30097d8452bbb6d89a5d87a0b764 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 29 Jan 2024 11:50:01 -0500 Subject: [PATCH v1 2/4] Replace blocks with buffers in heapgettup control flow Future commits will introduce the streaming read API and the sequential scan streaming read API user. Streaming read API users implement a callback which returns the next block to read. Sequential scans previously looped through the blocks in the relation, synchronously reading in a block and then processing it. An InvalidBlockNumber returned by heapgettup_advance_block() meant that the relation was exhausted and all blocks had been processed. The streaming read API may exhaust the blocks in a relation (having read all of them into buffers) before they have all been processed by the sequential scan. As such, the sequential scan should continue processing blocks until heapfetchbuf() returns InvalidBuffer. Note that this commit does not implement the streaming read API user. It simply restructures heapgettup() and heapgettup_pagemode() to use buffers instead of blocks for control flow. Not all sequential scans will support streaming reads. As such, this code will remain for compatability even after sequential scans support streaming reads. --- src/backend/access/heap/heapam.c | 75 ++++++++++++++------------------ 1 file changed, 33 insertions(+), 42 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 608d43c4efd..9e3e6d8b52b 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -87,6 +87,9 @@ static Bitmapset *HeapDetermineColumnsInfo(Relation relation, static bool heap_acquire_tuplock(Relation relation, ItemPointer tid, LockTupleMode mode, LockWaitPolicy wait_policy, bool *have_tuple_lock); +static inline BlockNumber heapgettup_advance_block(HeapScanDesc scan, + BlockNumber block, ScanDirection dir); +static inline BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir); static void compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 old_infomask2, TransactionId add_to_xmax, LockTupleMode mode, bool is_update, @@ -467,12 +470,10 @@ heapbuildvis(TableScanDesc sscan) * returns with that pinned buffer saved in the scan descriptor. */ static inline void -heapfetchbuf(TableScanDesc sscan, BlockNumber block) +heapfetchbuf(TableScanDesc sscan, ScanDirection dir) { HeapScanDesc scan = (HeapScanDesc) sscan; - Assert(block < scan->rs_nblocks); - /* release previous scan buffer, if any */ if (BufferIsValid(scan->rs_cbuf)) { @@ -487,10 +488,19 @@ heapfetchbuf(TableScanDesc sscan, BlockNumber block) */ CHECK_FOR_INTERRUPTS(); - /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, block, - RBM_NORMAL, scan->rs_strategy); - scan->rs_cblock = block; + if (!scan->rs_inited) + { + scan->rs_cblock = heapgettup_initial_block(scan, dir); + Assert(scan->rs_cblock != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + scan->rs_inited = true; + } + else + scan->rs_cblock = heapgettup_advance_block(scan, scan->rs_cblock, dir); + + /* read block if valid */ + if (BlockNumberIsValid(scan->rs_cblock)) + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, + scan->rs_cblock, RBM_NORMAL, scan->rs_strategy); } /* @@ -500,7 +510,7 @@ heapfetchbuf(TableScanDesc sscan, BlockNumber block) * occur with empty tables and in parallel scans when parallel workers get all * of the pages before we can get a chance to get our first page. */ -static BlockNumber +BlockNumber heapgettup_initial_block(HeapScanDesc scan, ScanDirection dir) { Assert(!scan->rs_inited); @@ -640,7 +650,7 @@ heapgettup_continue_page(HeapScanDesc scan, ScanDirection dir, int *linesleft, * This also adjusts rs_numblocks when a limit has been imposed by * heap_setscanlimits(). */ -static inline BlockNumber +BlockNumber heapgettup_advance_block(HeapScanDesc scan, BlockNumber block, ScanDirection dir) { if (ScanDirectionIsForward(dir)) @@ -738,23 +748,13 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; OffsetNumber lineoff; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (scan->rs_inited) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_continue_page(scan, dir, &linesleft, &lineoff); goto continue_page; @@ -764,9 +764,12 @@ heapgettup(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf((TableScanDesc) scan, block); + heapfetchbuf((TableScanDesc) scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_start_page(scan, dir, &linesleft, &lineoff); continue_page: @@ -788,7 +791,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); visible = HeapTupleSatisfiesVisibility(tuple, scan->rs_base.rs_snapshot, @@ -818,9 +821,6 @@ continue_page: * it's time to move to the next. */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ @@ -853,22 +853,13 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - BlockNumber block; Page page; int lineindex; int linesleft; - if (unlikely(!scan->rs_inited)) - { - block = heapgettup_initial_block(scan, dir); - /* ensure rs_cbuf is invalid when we get InvalidBlockNumber */ - Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); - scan->rs_inited = true; - } - else + if (scan->rs_inited) { /* continue from previously returned page/tuple */ - block = scan->rs_cblock; /* current page */ page = BufferGetPage(scan->rs_cbuf); lineindex = scan->rs_cindex + dir; @@ -885,9 +876,12 @@ heapgettup_pagemode(HeapScanDesc scan, * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - while (block != InvalidBlockNumber) + while (true) { - heapfetchbuf((TableScanDesc) scan, block); + heapfetchbuf((TableScanDesc) scan, dir); + if (!BufferIsValid(scan->rs_cbuf)) + break; + Assert(BufferGetBlockNumber(scan->rs_cbuf) == scan->rs_cblock); heapbuildvis((TableScanDesc) scan); page = BufferGetPage(scan->rs_cbuf); linesleft = scan->rs_ntuples; @@ -907,7 +901,7 @@ continue_page: tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); /* skip any tuples that don't match the scan key */ if (key != NULL && @@ -918,9 +912,6 @@ continue_page: scan->rs_cindex = lineindex; return; } - - /* get the BlockNumber to scan next */ - block = heapgettup_advance_block(scan, block, dir); } /* end of scan */ -- 2.37.2