From 7ee180fd289863086e1923ae82730410493f8486 Mon Sep 17 00:00:00 2001 From: Melanie Plageman Date: Wed, 30 Nov 2022 15:38:44 -0500 Subject: [PATCH v4 6/6] Refactor heapgettup() and heapgettup_pagemode() --- src/backend/access/heap/heapam.c | 252 ++++++++++++------------------- 1 file changed, 93 insertions(+), 159 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 39b12b198d..a06c870ecc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -732,13 +732,10 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - Snapshot snapshot = scan->rs_base.rs_snapshot; - bool backward = ScanDirectionIsBackward(dir); BlockNumber block; Page page; OffsetNumber lineoff; int linesleft; - ItemId lpp; if (unlikely(ScanDirectionIsNoMovement(dir))) { @@ -750,32 +747,32 @@ heapgettup(HeapScanDesc scan, { block = heapgettup_initial_page(scan, dir); - if (block == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - heapgetpage((TableScanDesc) scan, block); - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - page = heapgettup_start_page(scan, block, dir, &linesleft, &lineoff); + /* + * If parallel and other processes have already finished the scan, the + * returned block is expected to be InvalidBlockNumber. In this case, + * ensure that the backend is not sitting on a valid buffer. + */ + Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); } else { block = scan->rs_cblock; /* current page */ - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); page = heapgettup_continue_page(scan, block, dir, &linesleft, &lineoff); + goto continue_page; } /* * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - lpp = PageGetItemId(page, lineoff); - for (;;) + while (block != InvalidBlockNumber) { + heapgetpage((TableScanDesc) scan, block); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + page = heapgettup_start_page(scan, block, dir, &linesleft, &lineoff); + continue_page: + /* * Only continue scanning the page while we have lines left. * @@ -783,53 +780,40 @@ heapgettup(HeapScanDesc scan, * PageGetMaxOffsetNumber(); both for forward scans when we resume the * table scan, and for when we start scanning a new page. */ - while (linesleft > 0) + for (; linesleft > 0; linesleft--, lineoff += dir) { - if (ItemIdIsNormal(lpp)) - { - bool valid; + bool visible; + ItemId lpp = PageGetItemId(page, lineoff); - tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); - tuple->t_len = ItemIdGetLength(lpp); - ItemPointerSet(&(tuple->t_self), block, lineoff); - - /* - * if current tuple qualifies, return it. - */ - valid = HeapTupleSatisfiesVisibility(tuple, - snapshot, - scan->rs_cbuf); + if (!ItemIdIsNormal(lpp)) + continue; - HeapCheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, - tuple, scan->rs_cbuf, - snapshot); + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), scan->rs_cblock, lineoff); - if (valid && key != NULL) - valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), - nkeys, key); + /* + * if current tuple qualifies, return it. + * otherwise move to the next item on the block + */ + visible = HeapTupleSatisfiesVisibility(tuple, + scan->rs_base.rs_snapshot, + scan->rs_cbuf); + + HeapCheckForSerializableConflictOut(visible, scan->rs_base.rs_rd, + tuple, scan->rs_cbuf, + scan->rs_base.rs_snapshot); + + if (!visible) + continue; - if (valid) - { - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); - scan->rs_cindex = lineoff; - return; - } - } + if (key && !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), + nkeys, key)) + continue; - /* - * otherwise move to the next item on the page - */ - --linesleft; - if (backward) - { - --lpp; /* move back in this page's ItemId array */ - --lineoff; - } - else - { - ++lpp; /* move forward in this page's ItemId array */ - ++lineoff; - } + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); + scan->rs_cindex = lineoff; + return; } /* @@ -839,39 +823,16 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); block = heapgettup_advance_page(scan, block, dir); + } - /* - * return NULL if we've exhausted all the pages - */ - if (block == InvalidBlockNumber) - { - if (BufferIsValid(scan->rs_cbuf)) - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - scan->rs_cblock = InvalidBlockNumber; - tuple->t_data = NULL; - scan->rs_inited = false; - return; - } - - heapgetpage((TableScanDesc) scan, block); - - LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); + /* end of scan */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); - page = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, page); - linesleft = PageGetMaxOffsetNumber(page); - if (backward) - { - lineoff = linesleft; - lpp = PageGetItemId(page, linesleft); - } - else - { - lineoff = FirstOffsetNumber; - lpp = PageGetItemId(page, FirstOffsetNumber); - } - } + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; } /* ---------------- @@ -894,13 +855,10 @@ heapgettup_pagemode(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - bool backward = ScanDirectionIsBackward(dir); BlockNumber block; Page page; int lineindex; - OffsetNumber lineoff; int linesleft; - ItemId lpp; if (unlikely(ScanDirectionIsNoMovement(dir))) { @@ -912,18 +870,12 @@ heapgettup_pagemode(HeapScanDesc scan, { block = heapgettup_initial_page(scan, dir); - if (block == InvalidBlockNumber) - { - Assert(!BufferIsValid(scan->rs_cbuf)); - tuple->t_data = NULL; - return; - } - - heapgetpage((TableScanDesc) scan, block); - page = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); - linesleft = scan->rs_ntuples; - lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; + /* + * If parallel and other processes have already finished the scan, the + * returned block is expected to be InvalidBlockNumber. In this case, + * ensure that the backend is not sitting on a valid buffer. + */ + Assert(block != InvalidBlockNumber || !BufferIsValid(scan->rs_cbuf)); } else { @@ -938,84 +890,66 @@ heapgettup_pagemode(HeapScanDesc scan, linesleft = scan->rs_ntuples - lineindex; else linesleft = scan->rs_cindex; - } - /* - * block and lineindex now reference the next or previous visible tid - */ + /* block and lineindex now reference the next or previous visible tid */ + goto continue_page; + } /* * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ - for (;;) + while (block != InvalidBlockNumber) { - while (linesleft > 0) + heapgetpage((TableScanDesc) scan, block); + page = BufferGetPage(scan->rs_cbuf); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); + linesleft = scan->rs_ntuples; + lineindex = ScanDirectionIsForward(dir) ? 0 : linesleft - 1; + + /* block and lineindex now reference the next or previous visible tid */ + + continue_page: + + for (; linesleft > 0; linesleft--, lineindex += dir) { + ItemId lpp; + OffsetNumber lineoff; + lineoff = scan->rs_vistuples[lineindex]; lpp = PageGetItemId(page, lineoff); Assert(ItemIdIsNormal(lpp)); - tuple->t_data = (HeapTupleHeader) PageGetItem(page, lpp); + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) page, lpp); tuple->t_len = ItemIdGetLength(lpp); ItemPointerSet(&(tuple->t_self), block, lineoff); /* - * if current tuple qualifies, return it. - */ - if (key != NULL) - { - bool valid; - - valid = HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), - nkeys, key); - if (valid) - { - scan->rs_cindex = lineindex; - return; - } - } - else - { - scan->rs_cindex = lineindex; - return; - } + * if current tuple qualifies, return it. + * otherwise move to the next item on the block + */ + if (key && !HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), + nkeys, key)) + continue; - /* - * otherwise move to the next item on the page - */ - --linesleft; - if (backward) - --lineindex; - else - ++lineindex; + scan->rs_cindex = lineindex; + return; } - block = heapgettup_advance_page(scan, block, dir); /* - * return NULL if we've exhausted all the pages + * if we get here, it means we've exhausted the items on this page and + * it's time to move to the next. */ - if (block == InvalidBlockNumber) - { - if (BufferIsValid(scan->rs_cbuf)) - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - scan->rs_cblock = InvalidBlockNumber; - tuple->t_data = NULL; - scan->rs_inited = false; - return; - } - - heapgetpage((TableScanDesc) scan, block); - - page = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, page); - linesleft = scan->rs_ntuples; - if (backward) - lineindex = linesleft - 1; - else - lineindex = 0; + block = heapgettup_advance_page(scan, block, dir); } + + /* end of scan */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; } -- 2.38.1