src/backend/access/transam/clog.c | 52 ++++++++------- src/backend/access/transam/commit_ts.c | 26 +++++--- src/backend/access/transam/multixact.c | 59 ++++++++++------- src/backend/access/transam/slru.c | 113 +++++++++++---------------------- src/backend/access/transam/subtrans.c | 12 ++-- src/backend/commands/async.c | 27 ++++---- src/backend/storage/lmgr/predicate.c | 16 +++-- src/backend/storage/page/bufpage.c | 25 ++++++++ src/bin/pg_checksums/pg_checksums.c | 9 +++ src/bin/pg_resetwal/t/001_basic.pl | 6 +- src/include/access/slru.h | 15 +---- src/include/storage/bufpage.h | 7 ++ src/test/modules/test_slru/test_slru.c | 13 ++-- 13 files changed, 197 insertions(+), 183 deletions(-) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index cc60eab1e2..639bb90a1c 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -41,6 +41,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" +#include "storage/bufpage.h" #include "storage/proc.h" #include "storage/sync.h" @@ -59,7 +60,7 @@ /* We need two bits per xact, so four xacts fit in a byte */ #define CLOG_BITS_PER_XACT 2 #define CLOG_XACTS_PER_BYTE 4 -#define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE) +#define CLOG_XACTS_PER_PAGE (CapacityOfPageContents * CLOG_XACTS_PER_BYTE) #define CLOG_XACT_BITMASK ((1 << CLOG_BITS_PER_XACT) - 1) @@ -77,13 +78,6 @@ TransactionIdToPage(TransactionId xid) #define TransactionIdToByte(xid) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE) #define TransactionIdToBIndex(xid) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE) -/* We store the latest async LSN for each group of transactions */ -#define CLOG_XACTS_PER_LSN_GROUP 32 /* keep this a power of 2 */ -#define CLOG_LSNS_PER_PAGE (CLOG_XACTS_PER_PAGE / CLOG_XACTS_PER_LSN_GROUP) - -#define GetLSNIndex(slotno, xid) ((slotno) * CLOG_LSNS_PER_PAGE + \ - ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP) - /* * The number of subtransactions below which we consider to apply clog group * update optimization. Testing reveals that the number higher than this can @@ -101,7 +95,7 @@ static SlruCtlData XactCtlData; static int ZeroCLOGPage(int64 pageno, bool writeXlog); static bool CLOGPagePrecedes(int64 page1, int64 page2); -static void WriteZeroPageXlogRec(int64 pageno); +static XLogRecPtr WriteZeroPageXlogRec(int64 pageno); static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXact, Oid oldestXactDb); static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids, @@ -583,8 +577,9 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i char *byteptr; char byteval; char curval; + Page page = XactCtl->shared->page_buffer[slotno]; - byteptr = XactCtl->shared->page_buffer[slotno] + byteno; + byteptr = PageGetContents(page) + byteno; curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK; /* @@ -613,7 +608,7 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i *byteptr = byteval; /* - * Update the group LSN if the transaction completion LSN is higher. + * Update the page LSN if the transaction completion LSN is higher. * * Note: lsn will be invalid when supplied during InRecovery processing, * so we don't need to do anything special to avoid LSN updates during @@ -622,10 +617,8 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i */ if (!XLogRecPtrIsInvalid(lsn)) { - int lsnindex = GetLSNIndex(slotno, xid); - - if (XactCtl->shared->group_lsn[lsnindex] < lsn) - XactCtl->shared->group_lsn[lsnindex] = lsn; + if (PageGetLSN(page) < lsn) + PageSetLSN(page, lsn); } } @@ -651,19 +644,19 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) int byteno = TransactionIdToByte(xid); int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; int slotno; - int lsnindex; + Page page; char *byteptr; XidStatus status; /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, xid); - byteptr = XactCtl->shared->page_buffer[slotno] + byteno; + page = XactCtl->shared->page_buffer[slotno]; + byteptr = PageGetContents(page) + byteno; status = (*byteptr >> bshift) & CLOG_XACT_BITMASK; - lsnindex = GetLSNIndex(slotno, xid); - *lsn = XactCtl->shared->group_lsn[lsnindex]; + *lsn = PageGetLSN(page); LWLockRelease(XactSLRULock); @@ -698,14 +691,14 @@ CLOGShmemBuffers(void) Size CLOGShmemSize(void) { - return SimpleLruShmemSize(CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE); + return SimpleLruShmemSize(CLOGShmemBuffers()); } void CLOGShmemInit(void) { XactCtl->PagePrecedes = CLOGPagePrecedes; - SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE, + SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER, SYNC_HANDLER_CLOG, false); SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE); @@ -747,11 +740,17 @@ static int ZeroCLOGPage(int64 pageno, bool writeXlog) { int slotno; + Page page; + XLogRecPtr lsn = 0; slotno = SimpleLruZeroPage(XactCtl, pageno); + page = XactCtl->shared->page_buffer[slotno]; if (writeXlog) - WriteZeroPageXlogRec(pageno); + { + lsn = WriteZeroPageXlogRec(pageno); + PageSetLSN(page, lsn); + } return slotno; } @@ -807,12 +806,12 @@ TrimCLOG(void) char *byteptr; slotno = SimpleLruReadPage(XactCtl, pageno, false, xid); - byteptr = XactCtl->shared->page_buffer[slotno] + byteno; + byteptr = PageGetContents(XactCtl->shared->page_buffer[slotno]) + byteno; /* Zero so-far-unused positions in the current byte */ *byteptr &= (1 << bshift) - 1; /* Zero the rest of the page */ - MemSet(byteptr + 1, 0, BLCKSZ - byteno - 1); + MemSet(byteptr + 1, 0, CapacityOfPageContents - byteno - 1); XactCtl->shared->page_dirty[slotno] = true; } @@ -836,7 +835,6 @@ CheckPointCLOG(void) TRACE_POSTGRESQL_CLOG_CHECKPOINT_DONE(true); } - /* * Make sure that CLOG has room for a newly-allocated XID. * @@ -958,12 +956,12 @@ CLOGPagePrecedes(int64 page1, int64 page2) /* * Write a ZEROPAGE xlog record */ -static void +static XLogRecPtr WriteZeroPageXlogRec(int64 pageno) { XLogBeginInsert(); XLogRegisterData((char *) (&pageno), sizeof(pageno)); - (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); + return XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); } /* diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 7c642f7b59..3f01f10df5 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -31,6 +31,7 @@ #include "funcapi.h" #include "miscadmin.h" #include "pg_trace.h" +#include "storage/bufpage.h" #include "storage/shmem.h" #include "utils/builtins.h" #include "utils/snapmgr.h" @@ -63,7 +64,7 @@ typedef struct CommitTimestampEntry sizeof(RepOriginId)) #define COMMIT_TS_XACTS_PER_PAGE \ - (BLCKSZ / SizeOfCommitTimestampEntry) + (CapacityOfPageContents / SizeOfCommitTimestampEntry) /* @@ -120,7 +121,7 @@ static int ZeroCommitTsPage(int64 pageno, bool writeXlog); static bool CommitTsPagePrecedes(int64 page1, int64 page2); static void ActivateCommitTs(void); static void DeactivateCommitTs(void); -static void WriteZeroPageXlogRec(int64 pageno); +static XLogRecPtr WriteZeroPageXlogRec(int64 pageno); static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid); /* @@ -254,11 +255,12 @@ TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, CommitTimestampEntry entry; Assert(TransactionIdIsNormal(xid)); + Assert(xid == slotno * COMMIT_TS_XACTS_PER_PAGE + entryno); entry.time = ts; entry.nodeid = nodeid; - memcpy(CommitTsCtl->shared->page_buffer[slotno] + + memcpy(PageGetContents(CommitTsCtl->shared->page_buffer[slotno]) + SizeOfCommitTimestampEntry * entryno, &entry, SizeOfCommitTimestampEntry); } @@ -337,7 +339,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid); memcpy(&entry, - CommitTsCtl->shared->page_buffer[slotno] + + PageGetContents(CommitTsCtl->shared->page_buffer[slotno]) + SizeOfCommitTimestampEntry * entryno, SizeOfCommitTimestampEntry); @@ -515,7 +517,7 @@ CommitTsShmemBuffers(void) Size CommitTsShmemSize(void) { - return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) + + return SimpleLruShmemSize(CommitTsShmemBuffers()) + sizeof(CommitTimestampShared); } @@ -529,7 +531,7 @@ CommitTsShmemInit(void) bool found; CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; - SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0, + SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), CommitTsSLRULock, "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER, SYNC_HANDLER_COMMIT_TS, @@ -582,11 +584,17 @@ static int ZeroCommitTsPage(int64 pageno, bool writeXlog) { int slotno; + Page page; + XLogRecPtr lsn = 0; slotno = SimpleLruZeroPage(CommitTsCtl, pageno); + page = CommitTsCtl->shared->page_buffer[slotno]; if (writeXlog) - WriteZeroPageXlogRec(pageno); + { + lsn = WriteZeroPageXlogRec(pageno); + PageSetLSN(page, lsn); + } return slotno; } @@ -946,12 +954,12 @@ CommitTsPagePrecedes(int64 page1, int64 page2) /* * Write a ZEROPAGE xlog record */ -static void +static XLogRecPtr WriteZeroPageXlogRec(int64 pageno) { XLogBeginInsert(); XLogRegisterData((char *) (&pageno), sizeof(pageno)); - (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); + return XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE); } /* diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index db3423f12e..edc56a763e 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -83,6 +83,7 @@ #include "miscadmin.h" #include "pg_trace.h" #include "postmaster/autovacuum.h" +#include "storage/bufpage.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" #include "storage/proc.h" @@ -106,7 +107,7 @@ */ /* We need four bytes per offset */ -#define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(MultiXactOffset)) +#define MULTIXACT_OFFSETS_PER_PAGE (CapacityOfPageContents / sizeof(MultiXactOffset)) #define MultiXactIdToOffsetPage(xid) \ ((xid) / (MultiXactOffset) MULTIXACT_OFFSETS_PER_PAGE) @@ -138,7 +139,7 @@ /* size in bytes of a complete group */ #define MULTIXACT_MEMBERGROUP_SIZE \ (sizeof(TransactionId) * MULTIXACT_MEMBERS_PER_MEMBERGROUP + MULTIXACT_FLAGBYTES_PER_GROUP) -#define MULTIXACT_MEMBERGROUPS_PER_PAGE (BLCKSZ / MULTIXACT_MEMBERGROUP_SIZE) +#define MULTIXACT_MEMBERGROUPS_PER_PAGE (CapacityOfPageContents / MULTIXACT_MEMBERGROUP_SIZE) #define MULTIXACT_MEMBERS_PER_PAGE \ (MULTIXACT_MEMBERGROUPS_PER_PAGE * MULTIXACT_MEMBERS_PER_MEMBERGROUP) @@ -366,7 +367,7 @@ static bool MultiXactOffsetWouldWrap(MultiXactOffset boundary, MultiXactOffset start, uint32 distance); static bool SetOffsetVacuumLimit(bool is_startup); static bool find_multixact_start(MultiXactId multi, MultiXactOffset *result); -static void WriteMZeroPageXlogRec(int64 pageno, uint8 info); +static XLogRecPtr WriteMZeroPageXlogRec(int64 pageno, uint8 info); static void WriteMTruncateXlogRec(Oid oldestMultiDB, MultiXactId startTruncOff, MultiXactId endTruncOff, @@ -884,7 +885,7 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, * take the trouble to generalize the slru.c error reporting code. */ slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); - offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; + offptr = (MultiXactOffset *) PageGetContents(MultiXactOffsetCtl->shared->page_buffer[slotno]); offptr += entryno; *offptr = offset; @@ -921,12 +922,12 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, } memberptr = (TransactionId *) - (MultiXactMemberCtl->shared->page_buffer[slotno] + memberoff); + (PageGetContents(MultiXactMemberCtl->shared->page_buffer[slotno]) + memberoff); *memberptr = members[i].xid; flagsptr = (uint32 *) - (MultiXactMemberCtl->shared->page_buffer[slotno] + flagsoff); + (PageGetContents(MultiXactMemberCtl->shared->page_buffer[slotno]) + flagsoff); flagsval = *flagsptr; flagsval &= ~(((1 << MXACT_MEMBER_BITS_PER_XACT) - 1) << bshift); @@ -1348,7 +1349,7 @@ retry: entryno = MultiXactIdToOffsetEntry(multi); slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); - offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; + offptr = (MultiXactOffset *) PageGetContents(MultiXactOffsetCtl->shared->page_buffer[slotno]); offptr += entryno; offset = *offptr; @@ -1381,7 +1382,7 @@ retry: if (pageno != prev_pageno) slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact); - offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; + offptr = (MultiXactOffset *) PageGetContents(MultiXactOffsetCtl->shared->page_buffer[slotno]); offptr += entryno; nextMXOffset = *offptr; @@ -1424,7 +1425,7 @@ retry: } xactptr = (TransactionId *) - (MultiXactMemberCtl->shared->page_buffer[slotno] + memberoff); + (PageGetContents(MultiXactMemberCtl->shared->page_buffer[slotno]) + memberoff); if (!TransactionIdIsValid(*xactptr)) { @@ -1435,7 +1436,7 @@ retry: flagsoff = MXOffsetToFlagsOffset(offset); bshift = MXOffsetToFlagsBitShift(offset); - flagsptr = (uint32 *) (MultiXactMemberCtl->shared->page_buffer[slotno] + flagsoff); + flagsptr = (uint32 *) (PageGetContents(MultiXactMemberCtl->shared->page_buffer[slotno]) + flagsoff); ptr[truelength].xid = *xactptr; ptr[truelength].status = (*flagsptr >> bshift) & MXACT_MEMBER_XACT_BITMASK; @@ -1834,8 +1835,8 @@ MultiXactShmemSize(void) mul_size(sizeof(MultiXactId) * 2, MaxOldestSlot)) size = SHARED_MULTIXACT_STATE_SIZE; - size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS, 0)); - size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTOFFSET_BUFFERS)); + size = add_size(size, SimpleLruShmemSize(NUM_MULTIXACTMEMBER_BUFFERS)); return size; } @@ -1851,14 +1852,14 @@ MultiXactShmemInit(void) MultiXactMemberCtl->PagePrecedes = MultiXactMemberPagePrecedes; SimpleLruInit(MultiXactOffsetCtl, - "MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, 0, + "MultiXactOffset", NUM_MULTIXACTOFFSET_BUFFERS, MultiXactOffsetSLRULock, "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER, SYNC_HANDLER_MULTIXACT_OFFSET, false); SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE); SimpleLruInit(MultiXactMemberCtl, - "MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, 0, + "MultiXactMember", NUM_MULTIXACTMEMBER_BUFFERS, MultiXactMemberSLRULock, "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER, SYNC_HANDLER_MULTIXACT_MEMBER, @@ -1933,11 +1934,17 @@ static int ZeroMultiXactOffsetPage(int64 pageno, bool writeXlog) { int slotno; + Page page; + XLogRecPtr lsn = 0; slotno = SimpleLruZeroPage(MultiXactOffsetCtl, pageno); + page = MultiXactOffsetCtl->shared->page_buffer[slotno]; if (writeXlog) - WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_OFF_PAGE); + { + lsn = WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_OFF_PAGE); + PageSetLSN(page, lsn); + } return slotno; } @@ -1949,11 +1956,17 @@ static int ZeroMultiXactMemberPage(int64 pageno, bool writeXlog) { int slotno; + Page page; + XLogRecPtr lsn = 0; slotno = SimpleLruZeroPage(MultiXactMemberCtl, pageno); + page = MultiXactMemberCtl->shared->page_buffer[slotno]; if (writeXlog) - WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_MEM_PAGE); + { + lsn = WriteMZeroPageXlogRec(pageno, XLOG_MULTIXACT_ZERO_MEM_PAGE); + PageSetLSN(page, lsn); + } return slotno; } @@ -2071,10 +2084,10 @@ TrimMultiXact(void) MultiXactOffset *offptr; slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact); - offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; + offptr = (MultiXactOffset *) PageGetContents(MultiXactOffsetCtl->shared->page_buffer[slotno]); offptr += entryno; - MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset))); + MemSet(offptr, 0, CapacityOfPageContents - (entryno * sizeof(MultiXactOffset))); MultiXactOffsetCtl->shared->page_dirty[slotno] = true; } @@ -2104,9 +2117,9 @@ TrimMultiXact(void) memberoff = MXOffsetToMemberOffset(offset); slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset); xidptr = (TransactionId *) - (MultiXactMemberCtl->shared->page_buffer[slotno] + memberoff); + (PageGetContents(MultiXactMemberCtl->shared->page_buffer[slotno]) + memberoff); - MemSet(xidptr, 0, BLCKSZ - memberoff); + MemSet(xidptr, 0, CapacityOfPageContents - memberoff); /* * Note: we don't need to zero out the flag bits in the remaining @@ -2758,7 +2771,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result) /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(MultiXactOffsetCtl, pageno, multi); - offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; + offptr = (MultiXactOffset *) PageGetContents(MultiXactOffsetCtl->shared->page_buffer[slotno]); offptr += entryno; offset = *offptr; LWLockRelease(MultiXactOffsetSLRULock); @@ -3192,12 +3205,12 @@ MultiXactOffsetPrecedes(MultiXactOffset offset1, MultiXactOffset offset2) * Write an xlog record reflecting the zeroing of either a MEMBERs or * OFFSETs page (info shows which) */ -static void +static XLogRecPtr WriteMZeroPageXlogRec(int64 pageno, uint8 info) { XLogBeginInsert(); XLogRegisterData((char *) (&pageno), sizeof(pageno)); - (void) XLogInsert(RM_MULTIXACT_ID, info); + return XLogInsert(RM_MULTIXACT_ID, info); } /* diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index 7a371d9034..d7f3deea7d 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -57,6 +57,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "storage/bufpage.h" #include "storage/fd.h" #include "storage/shmem.h" @@ -154,13 +155,13 @@ typedef enum SLRU_WRITE_FAILED, SLRU_FSYNC_FAILED, SLRU_CLOSE_FAILED, + SLRU_DATA_CORRUPTED, } SlruErrorCause; static SlruErrorCause slru_errcause; static int slru_errno; -static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno); static void SimpleLruWaitIO(SlruCtl ctl, int slotno); static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata); static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno); @@ -179,7 +180,7 @@ static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno); */ Size -SimpleLruShmemSize(int nslots, int nlsns) +SimpleLruShmemSize(int nslots) { Size sz; @@ -192,9 +193,6 @@ SimpleLruShmemSize(int nslots, int nlsns) sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ - if (nlsns > 0) - sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ - return BUFFERALIGN(sz) + BLCKSZ * nslots; } @@ -204,14 +202,13 @@ SimpleLruShmemSize(int nslots, int nlsns) * ctl: address of local (unshared) control structure. * name: name of SLRU. (This is user-visible, pick with care!) * nslots: number of page slots to use. - * nlsns: number of LSN groups per page (set to zero if not relevant). * ctllock: LWLock to use to control access to the shared control structure. * subdir: PGDATA-relative subdirectory that will contain the files. * tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks. * sync_handler: which set of functions to use to handle sync requests */ void -SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, +SimpleLruInit(SlruCtl ctl, const char *name, int nslots, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler, bool long_segment_names) { @@ -219,7 +216,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, bool found; shared = (SlruShared) ShmemInitStruct(name, - SimpleLruShmemSize(nslots, nlsns), + SimpleLruShmemSize(nslots), &found); if (!IsUnderPostmaster) @@ -236,7 +233,6 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, shared->ControlLock = ctllock; shared->num_slots = nslots; - shared->lsn_groups_per_page = nlsns; shared->cur_lru_count = 0; @@ -261,12 +257,6 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, shared->buffer_locks = (LWLockPadded *) (ptr + offset); offset += MAXALIGN(nslots * sizeof(LWLockPadded)); - if (nlsns > 0) - { - shared->group_lsn = (XLogRecPtr *) (ptr + offset); - offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); - } - ptr += BUFFERALIGN(offset); for (slotno = 0; slotno < nslots; slotno++) { @@ -281,7 +271,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, } /* Should fit to estimated shmem size */ - Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns)); + Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots)); } else Assert(found); @@ -323,11 +313,8 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno) shared->page_dirty[slotno] = true; SlruRecentlyUsed(shared, slotno); - /* Set the buffer to zeroes */ - MemSet(shared->page_buffer[slotno], 0, BLCKSZ); - - /* Set the LSNs for this new page to zero */ - SimpleLruZeroLSNs(ctl, slotno); + /* Initialize the page. */ + PageInitSLRU(shared->page_buffer[slotno], BLCKSZ, 0); /* Assume this page is now the latest active page */ shared->latest_page_number = pageno; @@ -338,26 +325,6 @@ SimpleLruZeroPage(SlruCtl ctl, int64 pageno) return slotno; } -/* - * Zero all the LSNs we store for this slru page. - * - * This should be called each time we create a new page, and each time we read - * in a page from disk into an existing buffer. (Such an old page cannot - * have any interesting LSNs, since we'd have flushed them before writing - * the page in the first place.) - * - * This assumes that InvalidXLogRecPtr is bitwise-all-0. - */ -static void -SimpleLruZeroLSNs(SlruCtl ctl, int slotno) -{ - SlruShared shared = ctl->shared; - - if (shared->lsn_groups_per_page > 0) - MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0, - shared->lsn_groups_per_page * sizeof(XLogRecPtr)); -} - /* * Wait for any active I/O on a page slot to finish. (This does not * guarantee that new I/O hasn't been started before we return, though. @@ -478,9 +445,6 @@ SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok, /* Do the read */ ok = SlruPhysicalReadPage(ctl, pageno, slotno); - /* Set the LSNs for this newly read-in page to zero */ - SimpleLruZeroLSNs(ctl, slotno); - /* Re-acquire control lock and update page state */ LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); @@ -740,7 +704,7 @@ SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno) ereport(LOG, (errmsg("file \"%s\" doesn't exist, reading as zeroes", path))); - MemSet(shared->page_buffer[slotno], 0, BLCKSZ); + PageInitSLRU(shared->page_buffer[slotno], BLCKSZ, 0); return true; } @@ -763,6 +727,13 @@ SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno) return false; } + if (!PageIsVerifiedExtended(shared->page_buffer[slotno], pageno, PIV_REPORT_STAT)) + { + slru_errcause = SLRU_DATA_CORRUPTED; + slru_errno = 0; + return false; + } + return true; } @@ -789,6 +760,8 @@ SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata) off_t offset = rpageno * BLCKSZ; char path[MAXPGPATH]; int fd = -1; + Page page = shared->page_buffer[slotno]; + XLogRecPtr lsn; /* update the stats counter of written pages */ pgstat_count_slru_page_written(shared->slru_stats_idx); @@ -798,41 +771,18 @@ SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata) * write out data before associated WAL records. This is the same action * performed during FlushBuffer() in the main buffer manager. */ - if (shared->group_lsn != NULL) + lsn = PageGetLSN(page); + if (!XLogRecPtrIsInvalid(lsn)) { /* - * We must determine the largest async-commit LSN for the page. This - * is a bit tedious, but since this entire function is a slow path - * anyway, it seems better to do this here than to maintain a per-page - * LSN variable (which'd need an extra comparison in the - * transaction-commit path). + * As noted above, elog(ERROR) is not acceptable here, so if + * XLogFlush were to fail, we must PANIC. This isn't much of a + * restriction because XLogFlush is just about all critical + * section anyway, but let's make sure. */ - XLogRecPtr max_lsn; - int lsnindex, - lsnoff; - - lsnindex = slotno * shared->lsn_groups_per_page; - max_lsn = shared->group_lsn[lsnindex++]; - for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++) - { - XLogRecPtr this_lsn = shared->group_lsn[lsnindex++]; - - if (max_lsn < this_lsn) - max_lsn = this_lsn; - } - - if (!XLogRecPtrIsInvalid(max_lsn)) - { - /* - * As noted above, elog(ERROR) is not acceptable here, so if - * XLogFlush were to fail, we must PANIC. This isn't much of a - * restriction because XLogFlush is just about all critical - * section anyway, but let's make sure. - */ - START_CRIT_SECTION(); - XLogFlush(max_lsn); - END_CRIT_SECTION(); - } + START_CRIT_SECTION(); + XLogFlush(lsn); + END_CRIT_SECTION(); } /* @@ -899,6 +849,8 @@ SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata) } } + PageSetChecksumInplace(shared->page_buffer[slotno], pageno); + errno = 0; pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE); if (pg_pwrite(fd, shared->page_buffer[slotno], BLCKSZ, offset) != BLCKSZ) @@ -1019,6 +971,13 @@ SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid) errdetail("Could not close file \"%s\": %m.", path))); break; + case SLRU_DATA_CORRUPTED: + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not access status of transaction %u", xid), + errdetail("Invalid page from file \"%s\" at offset %d.", + path, offset))); + break; default: /* can't get here, we trust */ elog(ERROR, "unrecognized SimpleLru error cause: %d", diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 64673eaef6..0e07281979 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -32,6 +32,7 @@ #include "access/subtrans.h" #include "access/transam.h" #include "pg_trace.h" +#include "storage/bufpage.h" #include "utils/snapmgr.h" @@ -49,7 +50,7 @@ */ /* We need four bytes per xact */ -#define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId)) +#define SUBTRANS_XACTS_PER_PAGE (CapacityOfPageContents / sizeof(TransactionId)) /* * Although we return an int64 the actual value can't currently exceed @@ -93,7 +94,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent) LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid); - ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno]; + ptr = (TransactionId *) PageGetContents(SubTransCtl->shared->page_buffer[slotno]); ptr += entryno; /* @@ -133,7 +134,7 @@ SubTransGetParent(TransactionId xid) /* lock is acquired by SimpleLruReadPage_ReadOnly */ slotno = SimpleLruReadPage_ReadOnly(SubTransCtl, pageno, xid); - ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno]; + ptr = (TransactionId *) PageGetContents(SubTransCtl->shared->page_buffer[slotno]); ptr += entryno; parent = *ptr; @@ -193,14 +194,14 @@ SubTransGetTopmostTransaction(TransactionId xid) Size SUBTRANSShmemSize(void) { - return SimpleLruShmemSize(NUM_SUBTRANS_BUFFERS, 0); + return SimpleLruShmemSize(NUM_SUBTRANS_BUFFERS); } void SUBTRANSShmemInit(void) { SubTransCtl->PagePrecedes = SubTransPagePrecedes; - SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, 0, + SimpleLruInit(SubTransCtl, "Subtrans", NUM_SUBTRANS_BUFFERS, SubtransSLRULock, "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE, false); @@ -305,7 +306,6 @@ CheckPointSUBTRANS(void) TRACE_POSTGRESQL_SUBTRANS_CHECKPOINT_DONE(true); } - /* * Make sure that SUBTRANS has room for a newly-allocated XID. * diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 264f25a8f9..6b1ff0f515 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -140,6 +140,7 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "storage/bufpage.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/proc.h" @@ -162,7 +163,7 @@ * than that, so changes in that data structure won't affect user-visible * restrictions. */ -#define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) +#define NOTIFY_PAYLOAD_MAX_LENGTH (CapacityOfPageContents - NAMEDATALEN - 128) /* * Struct representing an entry in the global notify queue @@ -311,7 +312,7 @@ static SlruCtlData NotifyCtlData; #define NotifyCtl (&NotifyCtlData) #define QUEUE_PAGESIZE BLCKSZ - +#define QUEUE_PAGE_CAPACITY (QUEUE_PAGESIZE - MAXALIGN(SizeOfPageHeaderData)) #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ /* @@ -492,7 +493,7 @@ AsyncShmemSize(void) size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); - size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS)); return size; } @@ -541,7 +542,7 @@ AsyncShmemInit(void) * names are used in order to avoid wraparound. */ NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; - SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, + SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, SYNC_HANDLER_NONE, true); @@ -1301,14 +1302,14 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength) * written or read. */ offset += entryLength; - Assert(offset <= QUEUE_PAGESIZE); + Assert(offset <= QUEUE_PAGE_CAPACITY); /* * In a second step check if another entry can possibly be written to the * page. If so, stay here, we have reached the next position. If not, then * we need to move on to the next page. */ - if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) + if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGE_CAPACITY) { pageno++; offset = 0; @@ -1408,7 +1409,7 @@ asyncQueueAddEntries(ListCell *nextNotify) offset = QUEUE_POS_OFFSET(queue_head); /* Check whether the entry really fits on the current page */ - if (offset + qe.length <= QUEUE_PAGESIZE) + if (offset + qe.length <= QUEUE_PAGE_CAPACITY) { /* OK, so advance nextNotify past this item */ nextNotify = lnext(pendingNotifies->events, nextNotify); @@ -1420,14 +1421,14 @@ asyncQueueAddEntries(ListCell *nextNotify) * only check dboid and since it won't match any reader's database * OID, they will ignore this entry and move on. */ - qe.length = QUEUE_PAGESIZE - offset; + qe.length = QUEUE_PAGE_CAPACITY - offset; qe.dboid = InvalidOid; qe.data[0] = '\0'; /* empty channel */ qe.data[1] = '\0'; /* empty payload */ } /* Now copy qe into the shared buffer page */ - memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + memcpy(PageGetContents(NotifyCtl->shared->page_buffer[slotno]) + offset, &qe, qe.length); @@ -1947,10 +1948,10 @@ asyncQueueReadAllNotifications(void) else { /* fetch all the rest of the page */ - copysize = QUEUE_PAGESIZE - curoffset; + copysize = QUEUE_PAGE_CAPACITY - curoffset; } - memcpy(page_buffer.buf + curoffset, - NotifyCtl->shared->page_buffer[slotno] + curoffset, + memcpy(PageGetContents(page_buffer.buf) + curoffset, + PageGetContents(NotifyCtl->shared->page_buffer[slotno]) + curoffset, copysize); /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ LWLockRelease(NotifySLRULock); @@ -2021,7 +2022,7 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, if (QUEUE_POS_EQUAL(thisentry, stop)) break; - qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); + qe = (AsyncQueueEntry *) (PageGetContents(page_buffer) + QUEUE_POS_OFFSET(thisentry)); /* * Advance *current over this message, possibly to the next page. As diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index ff8df7c0bc..39129bce98 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -204,6 +204,7 @@ #include "pgstat.h" #include "port/pg_lfind.h" #include "storage/bufmgr.h" +#include "storage/bufpage.h" #include "storage/predicate.h" #include "storage/predicate_internals.h" #include "storage/proc.h" @@ -322,8 +323,8 @@ static SlruCtlData SerialSlruCtlData; #define SerialSlruCtl (&SerialSlruCtlData) #define SERIAL_PAGESIZE BLCKSZ -#define SERIAL_ENTRYSIZE sizeof(SerCommitSeqNo) -#define SERIAL_ENTRIESPERPAGE (SERIAL_PAGESIZE / SERIAL_ENTRYSIZE) +#define SERIAL_ENTRYSIZE sizeof(SerCommitSeqNo) +#define SERIAL_ENTRIESPERPAGE (SERIAL_PAGESIZE - MAXALIGN(SizeOfPageHeaderData) / SERIAL_ENTRYSIZE) /* * Set maximum pages based on the number needed to track all transactions. @@ -333,7 +334,7 @@ static SlruCtlData SerialSlruCtlData; #define SerialNextPage(page) (((page) >= SERIAL_MAX_PAGE) ? 0 : (page) + 1) #define SerialValue(slotno, xid) (*((SerCommitSeqNo *) \ - (SerialSlruCtl->shared->page_buffer[slotno] + \ + (PageGetContents(SerialSlruCtl->shared->page_buffer[slotno]) + \ ((((uint32) (xid)) % SERIAL_ENTRIESPERPAGE) * SERIAL_ENTRYSIZE)))) #define SerialPage(xid) (((uint32) (xid)) / SERIAL_ENTRIESPERPAGE) @@ -785,10 +786,13 @@ SerialPagePrecedesLogicallyUnitTests(void) * requires burning ~2B XIDs in single-user mode, a negligible * possibility. Moreover, if it does happen, the consequence would be * mild, namely a new transaction failing in SimpleLruReadPage(). + * + * NOTE: After adding the page header, the defect affects two pages. + * We now assert correct treatment of its second to prior page. */ headPage = oldestPage; targetPage = newestPage; - Assert(SerialPagePrecedesLogically(headPage, targetPage - 1)); + Assert(SerialPagePrecedesLogically(headPage, targetPage - 2)); #if 0 Assert(SerialPagePrecedesLogically(headPage, targetPage)); #endif @@ -808,7 +812,7 @@ SerialInit(void) */ SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically; SimpleLruInit(SerialSlruCtl, "Serial", - NUM_SERIAL_BUFFERS, 0, SerialSLRULock, "pg_serial", + NUM_SERIAL_BUFFERS, SerialSLRULock, "pg_serial", LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE, false); #ifdef USE_ASSERT_CHECKING @@ -1348,7 +1352,7 @@ PredicateLockShmemSize(void) /* Shared memory structures for SLRU tracking of old committed xids. */ size = add_size(size, sizeof(SerialControlData)); - size = add_size(size, SimpleLruShmemSize(NUM_SERIAL_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(NUM_SERIAL_BUFFERS)); return size; } diff --git a/src/backend/storage/page/bufpage.c b/src/backend/storage/page/bufpage.c index 9a302ddc30..4c002490a2 100644 --- a/src/backend/storage/page/bufpage.c +++ b/src/backend/storage/page/bufpage.c @@ -59,6 +59,31 @@ PageInit(Page page, Size pageSize, Size specialSize) /* p->pd_prune_xid = InvalidTransactionId; done by above MemSet */ } +/* + * PageInitSLRU + * Initializes the contents of an SLRU page. + * Note that we don't calculate an initial checksum here; that's not done + * until it's time to write. + */ +void +PageInitSLRU(Page page, Size pageSize, Size specialSize) +{ + PageHeader p = (PageHeader) page; + + specialSize = MAXALIGN(specialSize); + + Assert(pageSize == BLCKSZ); + Assert(pageSize > specialSize + SizeOfPageHeaderData); + + /* Make sure all fields of page are zero, as well as unused space */ + MemSet(p, 0, pageSize); + + p->pd_flags = 0; + p->pd_lower = SizeOfPageHeaderData; + p->pd_upper = pageSize - specialSize; + p->pd_special = pageSize - specialSize; + PageSetPageSizeAndVersion(page, pageSize, PG_METAPAGE_LAYOUT_VERSION); +} /* * PageIsVerifiedExtended diff --git a/src/bin/pg_checksums/pg_checksums.c b/src/bin/pg_checksums/pg_checksums.c index 6543d9ce08..cfbc239843 100644 --- a/src/bin/pg_checksums/pg_checksums.c +++ b/src/bin/pg_checksums/pg_checksums.c @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -589,12 +590,20 @@ main(int argc, char *argv[]) { total_size = scan_directory(DataDir, "global", true); total_size += scan_directory(DataDir, "base", true); + total_size += scan_directory(DataDir, "pg_commit_ts", true); + total_size += scan_directory(DataDir, "pg_multixact", true); + total_size += scan_directory(DataDir, "pg_serial", true); total_size += scan_directory(DataDir, "pg_tblspc", true); + total_size += scan_directory(DataDir, "pg_xact", true); } (void) scan_directory(DataDir, "global", false); (void) scan_directory(DataDir, "base", false); + (void) scan_directory(DataDir, "pg_commit_ts", false); + (void) scan_directory(DataDir, "pg_multixact", false); + (void) scan_directory(DataDir, "pg_serial", false); (void) scan_directory(DataDir, "pg_tblspc", false); + (void) scan_directory(DataDir, "pg_xact", false); if (showprogress) progress_report(true); diff --git a/src/bin/pg_resetwal/t/001_basic.pl b/src/bin/pg_resetwal/t/001_basic.pl index 18d0882cb1..ae74828e44 100644 --- a/src/bin/pg_resetwal/t/001_basic.pl +++ b/src/bin/pg_resetwal/t/001_basic.pl @@ -206,7 +206,7 @@ push @cmd, sprintf("%d,%d", hex($files[0]) == 0 ? 3 : hex($files[0]), hex($files[-1])); @files = get_slru_files('pg_multixact/offsets'); -$mult = 32 * $blcksz / 4; +$mult = 32 * ($blcksz - 24) / 4; # -m argument is "new,old" push @cmd, '-m', sprintf("%d,%d", @@ -214,11 +214,11 @@ push @cmd, '-m', hex($files[0]) == 0 ? 1 : hex($files[0] * $mult)); @files = get_slru_files('pg_multixact/members'); -$mult = 32 * int($blcksz / 20) * 4; +$mult = 32 * int(($blcksz - 24) / 20) * 4; push @cmd, '-O', (hex($files[-1]) + 1) * $mult; @files = get_slru_files('pg_xact'); -$mult = 32 * $blcksz * 4; +$mult = 32 * ($blcksz - 24) * 4; push @cmd, '-u', (hex($files[0]) == 0 ? 3 : hex($files[0]) * $mult), '-x', ((hex($files[-1]) + 1) * $mult); diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 091e2202c9..24733166b8 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -68,17 +68,6 @@ typedef struct SlruSharedData int *page_lru_count; LWLockPadded *buffer_locks; - /* - * Optional array of WAL flush LSNs associated with entries in the SLRU - * pages. If not zero/NULL, we must flush WAL before writing pages (true - * for pg_xact, false for multixact, pg_subtrans, pg_notify). group_lsn[] - * has lsn_groups_per_page entries per buffer slot, each containing the - * highest LSN known for a contiguous group of SLRU entries on that slot's - * page. - */ - XLogRecPtr *group_lsn; - int lsn_groups_per_page; - /*---------- * We mark a page "most recently used" by setting * page_lru_count[slotno] = ++cur_lru_count; @@ -147,8 +136,8 @@ typedef struct SlruCtlData typedef SlruCtlData *SlruCtl; -extern Size SimpleLruShmemSize(int nslots, int nlsns); -extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, +extern Size SimpleLruShmemSize(int nslots); +extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, LWLock *ctllock, const char *subdir, int tranche_id, SyncRequestHandler sync_handler, bool long_segment_names); diff --git a/src/include/storage/bufpage.h b/src/include/storage/bufpage.h index 424ecba028..251d9523fa 100644 --- a/src/include/storage/bufpage.h +++ b/src/include/storage/bufpage.h @@ -201,6 +201,7 @@ typedef PageHeaderData *PageHeader; * handling pages. */ #define PG_PAGE_LAYOUT_VERSION 4 +#define PG_SLRU_PAGE_LAYOUT_VERSION 1 #define PG_DATA_CHECKSUM_VERSION 1 /* ---------------------------------------------------------------- @@ -257,6 +258,11 @@ PageGetContents(Page page) return (char *) page + MAXALIGN(SizeOfPageHeaderData); } +/* + * Space available for storing page contents. + */ +#define SizeOfPageContents (BLCKSZ - MAXALIGN(SizeOfPageHeaderData)) + /* ---------------- * functions to access page size info * ---------------- @@ -486,6 +492,7 @@ StaticAssertDecl(BLCKSZ == ((BLCKSZ / sizeof(size_t)) * sizeof(size_t)), "BLCKSZ has to be a multiple of sizeof(size_t)"); extern void PageInit(Page page, Size pageSize, Size specialSize); +extern void PageInitSLRU(Page page, Size pageSize, Size specialSize); extern bool PageIsVerifiedExtended(Page page, BlockNumber blkno, int flags); extern OffsetNumber PageAddItemExtended(Page page, Item item, Size size, OffsetNumber offsetNumber, int flags); diff --git a/src/test/modules/test_slru/test_slru.c b/src/test/modules/test_slru/test_slru.c index d0fb9444e8..fea12e0080 100644 --- a/src/test/modules/test_slru/test_slru.c +++ b/src/test/modules/test_slru/test_slru.c @@ -17,6 +17,7 @@ #include "access/slru.h" #include "access/transam.h" #include "miscadmin.h" +#include "storage/bufpage.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/shmem.h" @@ -76,8 +77,8 @@ test_slru_page_write(PG_FUNCTION_ARGS) TestSlruCtl->shared->page_status[slotno] = SLRU_PAGE_VALID; /* write given data to the page, up to the limit of the page */ - strncpy(TestSlruCtl->shared->page_buffer[slotno], data, - BLCKSZ - 1); + strncpy(PageGetContents(TestSlruCtl->shared->page_buffer[slotno]), data, + CapacityOfPageContents - 1); SimpleLruWritePage(TestSlruCtl, slotno); LWLockRelease(TestSLRULock); @@ -104,7 +105,7 @@ test_slru_page_read(PG_FUNCTION_ARGS) LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(TestSlruCtl, pageno, write_ok, InvalidTransactionId); - data = (char *) TestSlruCtl->shared->page_buffer[slotno]; + data = (char *) PageGetContents(TestSlruCtl->shared->page_buffer[slotno]); LWLockRelease(TestSLRULock); PG_RETURN_TEXT_P(cstring_to_text(data)); @@ -122,7 +123,7 @@ test_slru_page_readonly(PG_FUNCTION_ARGS) pageno, InvalidTransactionId); Assert(LWLockHeldByMe(TestSLRULock)); - data = (char *) TestSlruCtl->shared->page_buffer[slotno]; + data = (char *) PageGetContents(TestSlruCtl->shared->page_buffer[slotno]); LWLockRelease(TestSLRULock); PG_RETURN_TEXT_P(cstring_to_text(data)); @@ -202,7 +203,7 @@ test_slru_shmem_request(void) prev_shmem_request_hook(); /* reserve shared memory for the test SLRU */ - RequestAddinShmemSpace(SimpleLruShmemSize(NUM_TEST_BUFFERS, 0)); + RequestAddinShmemSpace(SimpleLruShmemSize(NUM_TEST_BUFFERS)); } static bool @@ -238,7 +239,7 @@ test_slru_shmem_startup(void) TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically; SimpleLruInit(TestSlruCtl, "TestSLRU", - NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name, + NUM_TEST_BUFFERS, TestSLRULock, slru_dir_name, test_tranche_id, SYNC_HANDLER_NONE, long_segment_names); }