diff --git a/src/backend/commands/vacuumlazy.c b/src/backend/commands/vacuumlazy.c new file mode 100644 index 5ec65ea..1e10488 *** a/src/backend/commands/vacuumlazy.c --- b/src/backend/commands/vacuumlazy.c *************** static void lazy_cleanup_index(Relation *** 142,148 **** IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); --- 142,148 ---- IndexBulkDeleteResult *stats, LVRelStats *vacrelstats); static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer); static void lazy_truncate_heap(Relation onerel, LVRelStats *vacrelstats); static BlockNumber count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats); *************** static void lazy_record_dead_tuple(LVRel *** 151,156 **** --- 151,157 ---- ItemPointer itemptr); static bool lazy_tid_reaped(ItemPointer itemptr, void *state); static int vac_cmp_itemptr(const void *left, const void *right); + static bool heap_page_is_all_visible(Buffer buf, TransactionId *visibility_cutoff_xid); /* *************** lazy_scan_heap(Relation onerel, LVRelSta *** 697,702 **** --- 698,708 ---- hastup = false; prev_dead_count = vacrelstats->num_dead_tuples; maxoff = PageGetMaxOffsetNumber(page); + + /* + * Note: If you change anything in the loop below, also look at + * heap_page_is_all_visible to see if that needs to be changed + */ for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) *************** lazy_scan_heap(Relation onerel, LVRelSta *** 881,887 **** vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats); /* * Forget the now-vacuumed tuples, and press on, but be careful --- 887,893 ---- vacrelstats->num_dead_tuples > 0) { /* Remove tuples from heap */ ! lazy_vacuum_page(onerel, blkno, buf, 0, vacrelstats, &vmbuffer); /* * Forget the now-vacuumed tuples, and press on, but be careful *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1051,1056 **** --- 1057,1063 ---- int tupindex; int npages; PGRUsage ru0; + Buffer vmbuffer = InvalidBuffer; pg_rusage_init(&ru0); npages = 0; *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1074,1080 **** ++tupindex; continue; } ! tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); --- 1081,1088 ---- ++tupindex; continue; } ! tupindex = lazy_vacuum_page(onerel, tblk, buf, tupindex, vacrelstats, ! &vmbuffer); /* Now that we've compacted the page, record its available space */ page = BufferGetPage(buf); *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1085,1090 **** --- 1093,1104 ---- npages++; } + if (BufferIsValid(vmbuffer)) + { + ReleaseBuffer(vmbuffer); + vmbuffer = InvalidBuffer; + } + ereport(elevel, (errmsg("\"%s\": removed %d row versions in %d pages", RelationGetRelationName(onerel), *************** lazy_vacuum_heap(Relation onerel, LVRelS *** 1105,1115 **** */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; START_CRIT_SECTION(); --- 1119,1130 ---- */ static int lazy_vacuum_page(Relation onerel, BlockNumber blkno, Buffer buffer, ! int tupindex, LVRelStats *vacrelstats, Buffer *vmbuffer) { Page page = BufferGetPage(buffer); OffsetNumber unused[MaxOffsetNumber]; int uncnt = 0; + TransactionId visibility_cutoff_xid; START_CRIT_SECTION(); *************** lazy_vacuum_page(Relation onerel, BlockN *** 1130,1135 **** --- 1145,1163 ---- PageRepairFragmentation(page); + /* + * Now that we have removed the dead tuples from the page, once again check + * if the page has become all-visible. + */ + if (!visibilitymap_test(onerel, blkno, vmbuffer) && + heap_page_is_all_visible(buffer, &visibility_cutoff_xid)) + { + Assert(BufferIsValid(*vmbuffer)); + PageSetAllVisible(page); + visibilitymap_set(onerel, blkno, InvalidXLogRecPtr, *vmbuffer, + visibility_cutoff_xid); + } + MarkBufferDirty(buffer); /* XLOG stuff */ *************** vac_cmp_itemptr(const void *left, const *** 1633,1635 **** --- 1661,1760 ---- return 0; } + + /* + * Check if every tuple in the given page is visible to all current and future + * transactions. Also return the visibility_cutoff_xid which is the highest + * xmin amongst the visible tuples + */ + static bool + heap_page_is_all_visible(Buffer buf, TransactionId *visibility_cutoff_xid) + { + Page page = BufferGetPage(buf); + OffsetNumber offnum, + maxoff; + bool all_visible = true; + + *visibility_cutoff_xid = InvalidTransactionId; + + /* + * This is strip down version of the line pointer scan in lazy_scan_heap(). + * So if you change anything there, also check this code + */ + maxoff = PageGetMaxOffsetNumber(page); + for (offnum = FirstOffsetNumber; + offnum <= maxoff && all_visible; + offnum = OffsetNumberNext(offnum)) + { + ItemId itemid; + HeapTupleData tuple; + + itemid = PageGetItemId(page, offnum); + + /* Unused or redirect line pointers are of no interest */ + if (!ItemIdIsUsed(itemid) || ItemIdIsRedirected(itemid)) + continue; + + ItemPointerSet(&(tuple.t_self), BufferGetBlockNumber(buf), offnum); + + /* + * Dead line pointers can have index pointers pointing to them. So they + * can't be treated as visible + */ + if (ItemIdIsDead(itemid)) + { + all_visible = false; + break; + } + + Assert(ItemIdIsNormal(itemid)); + + tuple.t_data = (HeapTupleHeader) PageGetItem(page, itemid); + + switch (HeapTupleSatisfiesVacuum(tuple.t_data, OldestXmin, buf)) + { + case HEAPTUPLE_LIVE: + { + TransactionId xmin; + + /* Check comments in lazy_scan_heap */ + if (!(tuple.t_data->t_infomask & HEAP_XMIN_COMMITTED)) + { + all_visible = false; + break; + } + + /* + * The inserter definitely committed. But is it old + * enough that everyone sees it as committed? + */ + xmin = HeapTupleHeaderGetXmin(tuple.t_data); + if (!TransactionIdPrecedes(xmin, OldestXmin)) + { + all_visible = false; + break; + } + + /* Track newest xmin on page. */ + if (TransactionIdFollows(xmin, *visibility_cutoff_xid)) + *visibility_cutoff_xid = xmin; + } + break; + + case HEAPTUPLE_DEAD: + /* fall through */ + case HEAPTUPLE_RECENTLY_DEAD: + /* fall through */ + case HEAPTUPLE_INSERT_IN_PROGRESS: + /* fall through */ + case HEAPTUPLE_DELETE_IN_PROGRESS: + all_visible = false; + break; + default: + elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result"); + break; + } + } /* scan along page */ + + return all_visible; + }