From a894ce38c488df6546392b9f3bd894b67edf951e Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Mon, 31 Oct 2022 13:40:29 -0400 Subject: [PATCH v1 3/3] Refactor heapgettup* and heapgetpage Simplify heapgettup(), heapgettup_pagemode(), and heapgetpage(). All three contained several unnecessary local variables, duplicate code, and nested if statements. Streamlining these improves readability and extensibility. --- src/backend/access/heap/heapam.c | 874 ++++++++++++------------------- 1 file changed, 342 insertions(+), 532 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1c995faa12..aee4dd3a41 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -379,7 +379,6 @@ heapgetpage(TableScanDesc sscan, BlockNumber page) int lines; int ntup; OffsetNumber lineoff; - ItemId lpp; bool all_visible; Assert(page < scan->rs_nblocks); @@ -448,31 +447,31 @@ heapgetpage(TableScanDesc sscan, BlockNumber page) */ all_visible = PageIsAllVisible(dp) && !snapshot->takenDuringRecovery; - for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff); - lineoff <= lines; - lineoff++, lpp++) + for (lineoff = FirstOffsetNumber; lineoff <= lines; lineoff++) { - if (ItemIdIsNormal(lpp)) - { - HeapTupleData loctup; - bool valid; + HeapTupleData loctup; + bool valid; - loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd); - loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - loctup.t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(loctup.t_self), page, lineoff); + ItemId lpp = PageGetItemId(dp, lineoff); - if (all_visible) - valid = true; - else - valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + if (!ItemIdIsNormal(lpp)) + continue; - HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, - &loctup, buffer, snapshot); + loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd); + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + loctup.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(loctup.t_self), page, lineoff); - if (valid) - scan->rs_vistuples[ntup++] = lineoff; - } + if (all_visible) + valid = true; + else + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + + HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, + &loctup, buffer, snapshot); + + if (valid) + scan->rs_vistuples[ntup++] = lineoff; } LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -481,6 +480,207 @@ heapgetpage(TableScanDesc sscan, BlockNumber page) scan->rs_ntuples = ntup; } + +static inline void heapgettup_no_movement(HeapScanDesc scan) +{ + ItemId lpp; + OffsetNumber lineoff; + BlockNumber page; + Page dp; + HeapTuple tuple = &(scan->rs_ctup); + /* + * ``no movement'' scan direction: refetch prior tuple + */ + + /* Since the tuple was previously fetched, needn't lock page here */ + if (!scan->rs_inited) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + + page = ItemPointerGetBlockNumber(&(tuple->t_self)); + if (page != scan->rs_cblock) + heapgetpage((TableScanDesc) scan, page); + + /* Since the tuple was previously fetched, needn't lock page here */ + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); + lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); + lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsNormal(lpp)); + + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + + /* check that rs_cindex is in sync if in pagemode */ + Assert(!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) || + (scan->rs_cindex < scan->rs_ntuples)); + + Assert(!(scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE) || + (lineoff == scan->rs_vistuples[scan->rs_cindex])); + + return; +} + +static inline Page heapgettup_continue_page(HeapScanDesc scan, BlockNumber page, ScanDirection dir, + int *linesleft, OffsetNumber *lineoff) +{ + HeapTuple tuple = &(scan->rs_ctup); + Page dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); + + if (ScanDirectionIsForward(dir)) + { + *lineoff = OffsetNumberNext(ItemPointerGetOffsetNumber(&(tuple->t_self))); + *linesleft = PageGetMaxOffsetNumber(dp) - (*lineoff) + 1; + } + else + { + /* + * The previous returned tuple may have been vacuumed since the + * previous scan when we use a non-MVCC snapshot, so we must + * re-establish the lineoff <= PageGetMaxOffsetNumber(dp) + * invariant + */ + *lineoff = Min(PageGetMaxOffsetNumber(dp), + OffsetNumberPrev(ItemPointerGetOffsetNumber(&(tuple->t_self)))); + *linesleft = *lineoff; + } + /* page and lineoff now reference the physically next tid */ + return dp; +} + +static inline Page heapgettup_start_page(HeapScanDesc scan, + BlockNumber page, ScanDirection dir, int *linesleft, OffsetNumber *lineoff) +{ + Page dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); + + *linesleft = PageGetMaxOffsetNumber((Page) dp) - FirstOffsetNumber + 1; + + if (ScanDirectionIsForward(dir)) + *lineoff = FirstOffsetNumber; + else + *lineoff = (OffsetNumber) (*linesleft); + + return dp; +} + +static inline BlockNumber heapgettup_initial_page(HeapScanDesc scan, ScanDirection dir) +{ + Assert(!ScanDirectionIsNoMovement(dir)); + Assert(!scan->rs_inited); + + /* forward and serial */ + if (ScanDirectionIsForward(dir) && scan->rs_base.rs_parallel == NULL) + return scan->rs_startblock; + + /* forward and parallel */ + if (ScanDirectionIsForward(dir)) + { + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + + return table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + } + + /* backward parallel scan not supported */ + Assert(scan->rs_base.rs_parallel == NULL); + + /* + * Disable reporting to syncscan logic in a backwards scan; it's + * not very likely anyone else is doing the same thing at the same + * time, and much more likely that we'll just bollix things for + * forward scanners. + */ + scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; + + /* + * Start from last page of the scan. Ensure we take into account + * rs_numblocks if it's been adjusted by heap_setscanlimits(). + */ + if (scan->rs_numblocks != InvalidBlockNumber) + return (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; + + if (scan->rs_startblock > 0) + return scan->rs_startblock - 1; + + return scan->rs_nblocks - 1; +} + +static inline BlockNumber heapgettup_advance_page(HeapScanDesc scan, BlockNumber page, ScanDirection dir) +{ + if (ScanDirectionIsBackward(dir)) + { + if (page == scan->rs_startblock) + return InvalidBlockNumber; + + if (scan->rs_numblocks != InvalidBlockNumber) + { + if (--scan->rs_numblocks == 0) + return InvalidBlockNumber; + } + + if (page == 0) + page = scan->rs_nblocks; + + page--; + + return page; + } + else if (scan->rs_base.rs_parallel != NULL) + { + Assert(ScanDirectionIsForward(dir)); + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + scan->rs_parallelworkerdata, + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel); + + return page; + } + else + { + Assert(ScanDirectionIsForward(dir)); + + page++; + + if (page >= scan->rs_nblocks) + page = 0; + + if (page == scan->rs_startblock) + return InvalidBlockNumber; + + if (scan->rs_numblocks != InvalidBlockNumber) + { + if (--scan->rs_numblocks == 0) + return InvalidBlockNumber; + } + + /* + * Report our new scan position for synchronization purposes. We + * don't do that when moving backwards, however. That would just + * mess up any other forward-moving scanners. + * + * Note: we do this before checking for end of scan so that the + * final state of the position hint is back at the start of the + * rel. That's not strictly necessary, but otherwise when you run + * the same query multiple times the starting position would shift + * a little bit backwards on every invocation, which is confusing. + * We don't guarantee any specific ordering in general, though. + */ + if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) + ss_report_location(scan->rs_base.rs_rd, page); + + return page; + } +} + + /* ---------------- * heapgettup - fetch next heap tuple * @@ -511,182 +711,55 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - Snapshot snapshot = scan->rs_base.rs_snapshot; - bool backward = ScanDirectionIsBackward(dir); BlockNumber page; - bool finished; Page dp; - int lines; OffsetNumber lineoff; int linesleft; - ItemId lpp; + + if (ScanDirectionIsNoMovement(dir)) + return heapgettup_no_movement(scan); /* - * calculate next starting lineoff, given scan direction + * return null immediately if relation is empty */ - if (ScanDirectionIsForward(dir)) + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) { - if (!scan->rs_inited) - { - /* - * return null immediately if relation is empty - */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - /* Other processes might have already finished the scan. */ - if (page == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - } - else - page = scan->rs_startblock; /* first page */ - heapgetpage((TableScanDesc) scan, page); - lineoff = FirstOffsetNumber; /* first offnum */ - scan->rs_inited = true; - } - else - { - /* continue from previously returned page/tuple */ - page = scan->rs_cblock; /* current page */ - lineoff = /* next offnum */ - OffsetNumberNext(ItemPointerGetOffsetNumber(&(tuple->t_self))); - } - - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); - lines = PageGetMaxOffsetNumber(dp); - /* page and lineoff now reference the physically next tid */ - - linesleft = lines - lineoff + 1; + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; } - else if (backward) - { - /* backward parallel scan not supported */ - Assert(scan->rs_base.rs_parallel == NULL); - - if (!scan->rs_inited) - { - /* - * return null immediately if relation is empty - */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - /* - * Disable reporting to syncscan logic in a backwards scan; it's - * not very likely anyone else is doing the same thing at the same - * time, and much more likely that we'll just bollix things for - * forward scanners. - */ - scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; - - /* - * Start from last page of the scan. Ensure we take into account - * rs_numblocks if it's been adjusted by heap_setscanlimits(). - */ - if (scan->rs_numblocks != InvalidBlockNumber) - page = (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; - else if (scan->rs_startblock > 0) - page = scan->rs_startblock - 1; - else - page = scan->rs_nblocks - 1; - heapgetpage((TableScanDesc) scan, page); - } - else - { - /* continue from previously returned page/tuple */ - page = scan->rs_cblock; /* current page */ - } - - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); - lines = PageGetMaxOffsetNumber(dp); - - if (!scan->rs_inited) - { - lineoff = lines; /* final offnum */ - scan->rs_inited = true; - } - else - { - /* - * The previous returned tuple may have been vacuumed since the - * previous scan when we use a non-MVCC snapshot, so we must - * re-establish the lineoff <= PageGetMaxOffsetNumber(dp) - * invariant - */ - lineoff = /* previous offnum */ - Min(lines, - OffsetNumberPrev(ItemPointerGetOffsetNumber(&(tuple->t_self)))); - } - /* page and lineoff now reference the physically previous tid */ + if (!scan->rs_inited) + { + page = heapgettup_initial_page(scan, dir); + scan->rs_inited = true; - linesleft = lineoff; + /* + * If parallel and other processes have already finished the scan, the + * returned page is expected to be InvalidBlockNumber. In this case, + * ensure that the backend is not sitting on a valid buffer. + */ + Assert(page != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); } else { - /* - * ``no movement'' scan direction: refetch prior tuple - */ - if (!scan->rs_inited) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - page = ItemPointerGetBlockNumber(&(tuple->t_self)); - if (page != scan->rs_cblock) - heapgetpage((TableScanDesc) scan, page); - - /* Since the tuple was previously fetched, needn't lock page here */ - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); - lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); - lpp = PageGetItemId(dp, lineoff); - Assert(ItemIdIsNormal(lpp)); - - tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - tuple->t_len = ItemIdGetLength(lpp); - - return; + page = scan->rs_cblock; + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + dp = heapgettup_continue_page(scan, page, dir, &linesleft, &lineoff); + goto continue_page; } /* * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - lpp = PageGetItemId(dp, lineoff); - for (;;) + while (page != InvalidBlockNumber) { + heapgetpage((TableScanDesc) scan, page); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + dp = heapgettup_start_page(scan, page, dir, &linesleft, &lineoff); + continue_page: + /* * Only continue scanning the page while we have lines left. * @@ -694,54 +767,39 @@ heapgettup(HeapScanDesc scan, * PageGetMaxOffsetNumber(); both for forward scans when we resume the * table scan, and for when we start scanning a new page. */ - while (linesleft > 0) + for (; linesleft > 0; linesleft--, lineoff += dir) { - if (ItemIdIsNormal(lpp)) - { - bool valid; - - tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), page, lineoff); + bool visible; + ItemId lpp = PageGetItemId(dp, lineoff); - /* - * if current tuple qualifies, return it. - */ - valid = HeapTupleSatisfiesVisibility(tuple, - snapshot, - scan->rs_cbuf); + if (!ItemIdIsNormal(lpp)) + continue; - HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, - tuple, scan->rs_cbuf, - snapshot); + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); - if (valid && key != NULL) - { - valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), - nkeys, key); - } + /* + * if current tuple qualifies, return it. + * otherwise move to the next item on the page + */ + visible = HeapTupleSatisfiesVisibility(tuple, + scan->rs_base.rs_snapshot, + scan->rs_cbuf); + + HeapCheckForSerializableConflictOut(visible, scan->rs_base.rs_rd, + tuple, scan->rs_cbuf, + scan->rs_base.rs_snapshot); + + if (!visible) + continue; - if (valid) - { - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - return; - } - } + if (key && !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), + nkeys, key)) + continue; - /* - * otherwise move to the next item on the page - */ - --linesleft; - if (backward) - { - --lpp; /* move back in this page's ItemId array */ - --lineoff; - } - else - { - ++lpp; /* move forward in this page's ItemId array */ - ++lineoff; - } + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); + return; } /* @@ -750,85 +808,17 @@ heapgettup(HeapScanDesc scan, */ LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - /* - * advance to next/prior page and detect end of scan - */ - if (backward) - { - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); - if (page == 0) - page = scan->rs_nblocks; - page--; - } - else if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - finished = (page == InvalidBlockNumber); - } - else - { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); - - /* - * Report our new scan position for synchronization purposes. We - * don't do that when moving backwards, however. That would just - * mess up any other forward-moving scanners. - * - * Note: we do this before checking for end of scan so that the - * final state of the position hint is back at the start of the - * rel. That's not strictly necessary, but otherwise when you run - * the same query multiple times the starting position would shift - * a little bit backwards on every invocation, which is confusing. - * We don't guarantee any specific ordering in general, though. - */ - if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) - ss_report_location(scan->rs_base.rs_rd, page); - } - - /* - * return NULL if we've exhausted all the pages - */ - if (finished) - { - if (BufferIsValid(scan->rs_cbuf)) - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - scan->rs_cblock = InvalidBlockNumber; - tuple->t_data = NULL; - scan->rs_inited = false; - return; - } - - heapgetpage((TableScanDesc) scan, page); + page = heapgettup_advance_page(scan, page, dir); + } - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + /* end of scan */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); - lines = PageGetMaxOffsetNumber((Page) dp); - linesleft = lines; - if (backward) - { - lineoff = lines; - lpp = PageGetItemId(dp, lines); - } - else - { - lineoff = FirstOffsetNumber; - lpp = PageGetItemId(dp, FirstOffsetNumber); - } - } + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; } /* ---------------- @@ -851,174 +841,71 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - bool backward = ScanDirectionIsBackward(dir); BlockNumber page; - bool finished; Page dp; - int lines; int lineindex; - OffsetNumber lineoff; int linesleft; - ItemId lpp; + + if (ScanDirectionIsNoMovement(dir)) + return heapgettup_no_movement(scan); /* - * calculate next starting lineindex, given scan direction + * return null immediately if relation is empty */ - if (ScanDirectionIsForward(dir)) + if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) { - if (!scan->rs_inited) - { - /* - * return null immediately if relation is empty - */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - - /* Other processes might have already finished the scan. */ - if (page == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - } - else - page = scan->rs_startblock; /* first page */ - heapgetpage((TableScanDesc) scan, page); - lineindex = 0; - scan->rs_inited = true; - } - else - { - /* continue from previously returned page/tuple */ - page = scan->rs_cblock; /* current page */ - lineindex = scan->rs_cindex + 1; - } - - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); - lines = scan->rs_ntuples; - /* page and lineindex now reference the next visible tid */ - - linesleft = lines - lineindex; + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; } - else if (backward) - { - /* backward parallel scan not supported */ - Assert(scan->rs_base.rs_parallel == NULL); - - if (!scan->rs_inited) - { - /* - * return null immediately if relation is empty - */ - if (scan->rs_nblocks == 0 || scan->rs_numblocks == 0) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - /* - * Disable reporting to syncscan logic in a backwards scan; it's - * not very likely anyone else is doing the same thing at the same - * time, and much more likely that we'll just bollix things for - * forward scanners. - */ - scan->rs_base.rs_flags &= ~SO_ALLOW_SYNC; - /* - * Start from last page of the scan. Ensure we take into account - * rs_numblocks if it's been adjusted by heap_setscanlimits(). - */ - if (scan->rs_numblocks != InvalidBlockNumber) - page = (scan->rs_startblock + scan->rs_numblocks - 1) % scan->rs_nblocks; - else if (scan->rs_startblock > 0) - page = scan->rs_startblock - 1; - else - page = scan->rs_nblocks - 1; - heapgetpage((TableScanDesc) scan, page); - } - else - { - /* continue from previously returned page/tuple */ - page = scan->rs_cblock; /* current page */ - } - - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); - lines = scan->rs_ntuples; - - if (!scan->rs_inited) - { - lineindex = lines - 1; - scan->rs_inited = true; - } - else - { - lineindex = scan->rs_cindex - 1; - } - /* page and lineindex now reference the previous visible tid */ - - linesleft = lineindex + 1; - } - else + if (!scan->rs_inited) { + page = heapgettup_initial_page(scan, dir); + scan->rs_inited = true; /* - * ``no movement'' scan direction: refetch prior tuple + * If parallel and other processes have already finished the scan, the + * returned page is expected to be InvalidBlockNumber. In this case, + * ensure that the backend is not sitting on a valid buffer. */ - if (!scan->rs_inited) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - page = ItemPointerGetBlockNumber(&(tuple->t_self)); - if (page != scan->rs_cblock) - heapgetpage((TableScanDesc) scan, page); - - /* Since the tuple was previously fetched, needn't lock page here */ + Assert(page != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); + } + else + { + page = scan->rs_cblock; dp = BufferGetPage(scan->rs_cbuf); TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); - lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); - lpp = PageGetItemId(dp, lineoff); - Assert(ItemIdIsNormal(lpp)); - tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); - tuple->t_len = ItemIdGetLength(lpp); - - /* check that rs_cindex is in sync */ - Assert(scan->rs_cindex < scan->rs_ntuples); - Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]); + lineindex = scan->rs_cindex + dir; + if (ScanDirectionIsForward(dir)) + linesleft = scan->rs_ntuples - lineindex; + else + linesleft = scan->rs_cindex; - return; + /* page and lineindex now reference the next visible tid */ + goto continue_page; } /* * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - for (;;) + while (page != InvalidBlockNumber) { - while (linesleft > 0) + heapgetpage((TableScanDesc) scan, page); + dp = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); + linesleft = scan->rs_ntuples; + lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; + /* page and lineindex now reference the previous visible tid */ + + continue_page: + + for (; linesleft > 0; linesleft--, lineindex += dir) { + ItemId lpp; + OffsetNumber lineoff; + lineoff = scan->rs_vistuples[lineindex]; lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -1028,108 +915,31 @@ heapgettup_pagemode(HeapScanDesc scan, ItemPointerSet(&(tuple->t_self), page, lineoff); /* - * if current tuple qualifies, return it. - */ - if (key != NULL) - { - bool valid; - - valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), - nkeys, key); - if (valid) - { - scan->rs_cindex = lineindex; - return; - } - } - else - { - scan->rs_cindex = lineindex; - return; - } + * if current tuple qualifies, return it. + * otherwise move to the next item on the page + */ + if (key && !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), + nkeys, key)) + continue; - /* - * otherwise move to the next item on the page - */ - --linesleft; - if (backward) - --lineindex; - else - ++lineindex; + scan->rs_cindex = lineindex; + return; } /* * if we get here, it means we've exhausted the items on this page and * it's time to move to the next. */ - if (backward) - { - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); - if (page == 0) - page = scan->rs_nblocks; - page--; - } - else if (scan->rs_base.rs_parallel != NULL) - { - ParallelBlockTableScanDesc pbscan = - (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - ParallelBlockTableScanWorker pbscanwork = - scan->rs_parallelworkerdata; - - page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, - pbscanwork, pbscan); - finished = (page == InvalidBlockNumber); - } - else - { - page++; - if (page >= scan->rs_nblocks) - page = 0; - finished = (page == scan->rs_startblock) || - (scan->rs_numblocks != InvalidBlockNumber ? --scan->rs_numblocks == 0 : false); - - /* - * Report our new scan position for synchronization purposes. We - * don't do that when moving backwards, however. That would just - * mess up any other forward-moving scanners. - * - * Note: we do this before checking for end of scan so that the - * final state of the position hint is back at the start of the - * rel. That's not strictly necessary, but otherwise when you run - * the same query multiple times the starting position would shift - * a little bit backwards on every invocation, which is confusing. - * We don't guarantee any specific ordering in general, though. - */ - if (scan->rs_base.rs_flags & SO_ALLOW_SYNC) - ss_report_location(scan->rs_base.rs_rd, page); - } - - /* - * return NULL if we've exhausted all the pages - */ - if (finished) - { - if (BufferIsValid(scan->rs_cbuf)) - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - scan->rs_cblock = InvalidBlockNumber; - tuple->t_data = NULL; - scan->rs_inited = false; - return; - } - - heapgetpage((TableScanDesc) scan, page); - - dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); - lines = scan->rs_ntuples; - linesleft = lines; - if (backward) - lineindex = lines - 1; - else - lineindex = 0; + page = heapgettup_advance_page(scan, page, dir); } + + /* end of scan */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; } -- 2.37.0