From 6b2f662dfe0794dce613c33a21f8f740cf8229e3 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 30 Oct 2023 11:06:12 +0530 Subject: [PATCH v3 3/5] Bank wise slru locks The previous patch has divided SLRU buffer pool into associative banks. And this patch is further optimizing it by introducing bank wise slru locks instead of a common centralized lock this will reduce the contention on the slru control lock. Dilip Kumar with some design inpur from Robert Haas and review by Alvaro Herrera --- src/backend/access/transam/clog.c | 114 ++++++++++----- src/backend/access/transam/commit_ts.c | 43 +++--- src/backend/access/transam/multixact.c | 177 ++++++++++++++++------- src/backend/access/transam/slru.c | 148 +++++++++++++++---- src/backend/access/transam/subtrans.c | 58 ++++++-- src/backend/commands/async.c | 32 ++-- src/backend/storage/lmgr/lwlock.c | 14 ++ src/backend/storage/lmgr/lwlocknames.txt | 14 +- src/backend/storage/lmgr/predicate.c | 33 +++-- src/include/access/slru.h | 32 +++- src/include/storage/lwlock.h | 7 + src/test/modules/test_slru/test_slru.c | 32 ++-- 12 files changed, 494 insertions(+), 210 deletions(-) diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 6ef9aacb0e..830d8bcdf5 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -274,14 +274,19 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, XLogRecPtr lsn, int pageno, bool all_xact_same_page) { + LWLock *lock; + /* Can't use group update when PGPROC overflows. */ StaticAssertDecl(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS, "group clog threshold less than PGPROC cached subxids"); + /* Get the SLRU bank lock w.r.t. the page we are going to access. */ + lock = SimpleLruGetSLRUBankLock(XactCtl, pageno); + /* - * When there is contention on XactSLRULock, we try to group multiple + * When there is contention on SLRU lock, we try to group multiple * updates; a single leader process will perform transaction status - * updates for multiple backends so that the number of times XactSLRULock + * updates for multiple backends so that the number of times the SLRU lock * needs to be acquired is reduced. * * For this optimization to be safe, the XID and subxids in MyProc must be @@ -300,17 +305,17 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, nsubxids * sizeof(TransactionId)) == 0)) { /* - * If we can immediately acquire XactSLRULock, we update the status of + * If we can immediately acquire SLRU lock, we update the status of * our own XID and release the lock. If not, try use group XID * update. If that doesn't work out, fall back to waiting for the * lock to perform an update for this transaction only. */ - if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE)) + if (LWLockConditionalAcquire(lock, LW_EXCLUSIVE)) { /* Got the lock without waiting! Do the update. */ TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); return; } else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno)) @@ -323,10 +328,10 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, } /* Group update not applicable, or couldn't accept this page number. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status, lsn, pageno); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -345,7 +350,8 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, Assert(status == TRANSACTION_STATUS_COMMITTED || status == TRANSACTION_STATUS_ABORTED || (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid))); - Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE)); + Assert(LWLockHeldByMeInMode(SimpleLruGetSLRUBankLock(XactCtl, pageno), + LW_EXCLUSIVE)); /* * If we're doing an async commit (ie, lsn is valid), then we must wait @@ -396,14 +402,13 @@ TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids, } /* - * When we cannot immediately acquire XactSLRULock in exclusive mode at + * When we cannot immediately acquire SLRU bank lock in exclusive mode at * commit time, add ourselves to a list of processes that need their XIDs * status update. The first process to add itself to the list will acquire - * XactSLRULock in exclusive mode and set transaction status as required - * on behalf of all group members. This avoids a great deal of contention - * around XactSLRULock when many processes are trying to commit at once, - * since the lock need not be repeatedly handed off from one committing - * process to the next. + * the lock in exclusive mode and set transaction status as required on behalf + * of all group members. This avoids a great deal of contention when many + * processes are trying to commit at once, since the lock need not be + * repeatedly handed off from one committing process to the next. * * Returns true when transaction status has been updated in clog; returns * false if we decided against applying the optimization because the page @@ -417,6 +422,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, PGPROC *proc = MyProc; uint32 nextidx; uint32 wakeidx; + int prevpageno; + LWLock *prevlock = NULL; /* We should definitely have an XID whose status needs to be updated. */ Assert(TransactionIdIsValid(xid)); @@ -497,13 +504,10 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, return true; } - /* We are the leader. Acquire the lock on behalf of everyone. */ - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); - /* - * Now that we've got the lock, clear the list of processes waiting for - * group XID status update, saving a pointer to the head of the list. - * Trying to pop elements one at a time could lead to an ABA problem. + * We are leader so clear the list of processes waiting for group XID + * status update, saving a pointer to the head of the list. Trying to pop + * elements one at a time could lead to an ABA problem. */ nextidx = pg_atomic_exchange_u32(&procglobal->clogGroupFirst, INVALID_PGPROCNO); @@ -511,10 +515,38 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, /* Remember head of list so we can perform wakeups after dropping lock. */ wakeidx = nextidx; + /* Acquire the SLRU bank lock w.r.t. the first page in the group. */ + prevpageno = ProcGlobal->allProcs[nextidx].clogGroupMemberPage; + prevlock = SimpleLruGetSLRUBankLock(XactCtl, prevpageno); + LWLockAcquire(prevlock, LW_EXCLUSIVE); + /* Walk the list and update the status of all XIDs. */ while (nextidx != INVALID_PGPROCNO) { PGPROC *nextproc = &ProcGlobal->allProcs[nextidx]; + int thispageno = nextproc->clogGroupMemberPage; + + /* + * Although we are trying our best to keep same page in a group, there + * are cases where we might get different pages as well for detail + * refer comment in above while loop where we are adding this process + * for group update. So if the current page we are going to access is + * not in the same slru bank in which we updated the last page then we + * need to release the lock on the previous bank and acquire lock on + * the bank w.r.t. the page we are going to update now. + */ + if (thispageno != prevpageno) + { + LWLock *lock = SimpleLruGetSLRUBankLock(XactCtl, thispageno); + + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + } + prevlock = lock; + prevpageno = thispageno; + } /* * Transactions with more than THRESHOLD_SUBTRANS_CLOG_OPT sub-XIDs @@ -534,7 +566,8 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, } /* We're done with the lock now. */ - LWLockRelease(XactSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); /* * Now that we've released the lock, go back and wake everybody up. We @@ -563,10 +596,11 @@ TransactionGroupUpdateXidStatus(TransactionId xid, XidStatus status, /* * Sets the commit status of a single transaction. * - * Must be called with XactSLRULock held + * Must be called with slot specific SLRU bank's lock held */ static void -TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno) +TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, + int slotno) { int byteno = TransactionIdToByte(xid); int bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; @@ -655,7 +689,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) lsnindex = GetLSNIndex(slotno, xid); *lsn = XactCtl->shared->group_lsn[lsnindex]; - LWLockRelease(XactSLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(XactCtl, pageno)); return status; } @@ -689,8 +723,8 @@ CLOGShmemInit(void) { XactCtl->PagePrecedes = CLOGPagePrecedes; SimpleLruInit(XactCtl, "Xact", CLOGShmemBuffers(), CLOG_LSNS_PER_PAGE, - XactSLRULock, "pg_xact", LWTRANCHE_XACT_BUFFER, - SYNC_HANDLER_CLOG); + "pg_xact", LWTRANCHE_XACT_BUFFER, + LWTRANCHE_XACT_SLRU, SYNC_HANDLER_CLOG); SlruPagePrecedesUnitTests(XactCtl, CLOG_XACTS_PER_PAGE); } @@ -704,8 +738,9 @@ void BootStrapCLOG(void) { int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(XactCtl, 0); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the commit log */ slotno = ZeroCLOGPage(0, false); @@ -714,7 +749,7 @@ BootStrapCLOG(void) SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -749,14 +784,10 @@ StartupCLOG(void) TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); int pageno = TransactionIdToPage(xid); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); - /* * Initialize our idea of the latest page number. */ - XactCtl->shared->latest_page_number = pageno; - - LWLockRelease(XactSLRULock); + pg_atomic_init_u32(&XactCtl->shared->latest_page_number, pageno); } /* @@ -767,8 +798,9 @@ TrimCLOG(void) { TransactionId xid = XidFromFullTransactionId(ShmemVariableCache->nextXid); int pageno = TransactionIdToPage(xid); + LWLock *lock = SimpleLruGetSLRUBankLock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* * Zero out the remainder of the current clog page. Under normal @@ -800,7 +832,7 @@ TrimCLOG(void) XactCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } /* @@ -832,6 +864,7 @@ void ExtendCLOG(TransactionId newestXact) { int pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -842,13 +875,14 @@ ExtendCLOG(TransactionId newestXact) return; pageno = TransactionIdToPage(newestXact); + lock = SimpleLruGetSLRUBankLock(XactCtl, pageno); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCLOGPage(pageno, true); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } @@ -986,16 +1020,18 @@ clog_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - LWLockAcquire(XactSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(XactCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCLOGPage(pageno, false); SimpleLruWritePage(XactCtl, slotno); Assert(!XactCtl->shared->page_dirty[slotno]); - LWLockRelease(XactSLRULock); + LWLockRelease(lock); } else if (info == CLOG_TRUNCATE) { diff --git a/src/backend/access/transam/commit_ts.c b/src/backend/access/transam/commit_ts.c index 48826672ea..204341da53 100644 --- a/src/backend/access/transam/commit_ts.c +++ b/src/backend/access/transam/commit_ts.c @@ -218,8 +218,9 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, { int slotno; int i; + LWLock *lock = SimpleLruGetSLRUBankLock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid); @@ -229,13 +230,13 @@ SetXidCommitTsInPage(TransactionId xid, int nsubxids, CommitTsCtl->shared->page_dirty[slotno] = true; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* * Sets the commit timestamp of a single transaction. * - * Must be called with CommitTsSLRULock held + * Must be called with slot specific SLRU bank's Lock held */ static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts, @@ -336,7 +337,7 @@ TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts, if (nodeid) *nodeid = entry.nodeid; - LWLockRelease(CommitTsSLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(CommitTsCtl, pageno)); return *ts != 0; } @@ -526,9 +527,8 @@ CommitTsShmemInit(void) CommitTsCtl->PagePrecedes = CommitTsPagePrecedes; SimpleLruInit(CommitTsCtl, "CommitTs", CommitTsShmemBuffers(), 0, - CommitTsSLRULock, "pg_commit_ts", - LWTRANCHE_COMMITTS_BUFFER, - SYNC_HANDLER_COMMIT_TS); + "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER, + LWTRANCHE_COMMITTS_SLRU, SYNC_HANDLER_COMMIT_TS); SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE); commitTsShared = ShmemInitStruct("CommitTs shared", @@ -684,9 +684,7 @@ ActivateCommitTs(void) /* * Re-Initialize our idea of the latest page number. */ - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); - CommitTsCtl->shared->latest_page_number = pageno; - LWLockRelease(CommitTsSLRULock); + pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number, pageno); /* * If CommitTs is enabled, but it wasn't in the previous server run, we @@ -713,12 +711,13 @@ ActivateCommitTs(void) if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno)) { int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* Change the activation status in shared memory. */ @@ -767,9 +766,9 @@ DeactivateCommitTs(void) * be overwritten anyway when we wrap around, but it seems better to be * tidy.) */ - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + SimpleLruAcquireAllBankLock(CommitTsCtl, LW_EXCLUSIVE); (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL); - LWLockRelease(CommitTsSLRULock); + SimpleLruReleaseAllBankLock(CommitTsCtl); } /* @@ -801,6 +800,7 @@ void ExtendCommitTs(TransactionId newestXact) { int pageno; + LWLock *lock; /* * Nothing to do if module not enabled. Note we do an unlocked read of @@ -821,12 +821,14 @@ ExtendCommitTs(TransactionId newestXact) pageno = TransactionIdToCTsPage(newestXact); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(CommitTsCtl, pageno); + + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroCommitTsPage(pageno, !InRecovery); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } /* @@ -980,16 +982,18 @@ commit_ts_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); + lock = SimpleLruGetSLRUBankLock(CommitTsCtl, pageno); - LWLockAcquire(CommitTsSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroCommitTsPage(pageno, false); SimpleLruWritePage(CommitTsCtl, slotno); Assert(!CommitTsCtl->shared->page_dirty[slotno]); - LWLockRelease(CommitTsSLRULock); + LWLockRelease(lock); } else if (info == COMMIT_TS_TRUNCATE) { @@ -1001,7 +1005,8 @@ commit_ts_redo(XLogReaderState *record) * During XLOG replay, latest_page_number isn't set up yet; insert a * suitable value to bypass the sanity test in SimpleLruTruncate. */ - CommitTsCtl->shared->latest_page_number = trunc->pageno; + pg_atomic_write_u32(&CommitTsCtl->shared->latest_page_number, + trunc->pageno); SimpleLruTruncate(CommitTsCtl, trunc->pageno); } diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 62709fcd07..3284900e02 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -192,10 +192,10 @@ static SlruCtlData MultiXactMemberCtlData; /* * MultiXact state shared across all backends. All this state is protected - * by MultiXactGenLock. (We also use MultiXactOffsetSLRULock and - * MultiXactMemberSLRULock to guard accesses to the two sets of SLRU - * buffers. For concurrency's sake, we avoid holding more than one of these - * locks at a time.) + * by MultiXactGenLock. (We also use SLRU bank's lock of MultiXactOffset and + * MultiXactMember to guard accesses to the two sets of SLRU buffers. For + * concurrency's sake, we avoid holding more than one of these locks at a + * time.) */ typedef struct MultiXactStateData { @@ -870,12 +870,15 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, int slotno; MultiXactOffset *offptr; int i; - - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLock *lock; + LWLock *prevlock = NULL; pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); + /* * Note: we pass the MultiXactId to SimpleLruReadPage as the "transaction" * to complain about if there's any I/O error. This is kinda bogus, but @@ -891,10 +894,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactOffsetCtl->shared->page_dirty[slotno] = true; - /* Exchange our lock */ - LWLockRelease(MultiXactOffsetSLRULock); - - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + /* Release MultiXactOffset SLRU lock. */ + LWLockRelease(lock); prev_pageno = -1; @@ -916,6 +917,20 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, if (pageno != prev_pageno) { + /* + * MultiXactMember SLRU page is changed so check if this new page + * fall into the different SLRU bank then release the old bank's + * lock and acquire lock on the new bank. + */ + lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -936,7 +951,8 @@ RecordNewMultiXact(MultiXactId multi, MultiXactOffset offset, MultiXactMemberCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock != NULL) + LWLockRelease(prevlock); } /* @@ -1239,6 +1255,8 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, MultiXactId tmpMXact; MultiXactOffset nextOffset; MultiXactMember *ptr; + LWLock *lock; + LWLock *prevlock = NULL; debug_elog3(DEBUG2, "GetMembers: asked for %u", multi); @@ -1342,11 +1360,23 @@ GetMultiXactIdMembers(MultiXactId multi, MultiXactMember **members, * time on every multixact creation. */ retry: - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); - pageno = MultiXactIdToOffsetPage(multi); entryno = MultiXactIdToOffsetEntry(multi); + /* + * If the page is on the different SLRU bank then release the lock on the + * previous bank if we are already holding one and acquire the lock on the + * new bank. + */ + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); + if (lock != prevlock) + { + if (prevlock != NULL) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, multi); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1379,7 +1409,22 @@ retry: entryno = MultiXactIdToOffsetEntry(tmpMXact); if (pageno != prev_pageno) + { + /* + * SLRU pageno is changed so check whether this page is falling in + * the different slru bank than on which we are already holding + * the lock and if so release the lock on the old bank and acquire + * that on the new bank. + */ + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, tmpMXact); + } offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -1388,7 +1433,8 @@ retry: if (nextMXOffset == 0) { /* Corner case 2: next multixact is still being filled in */ - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; CHECK_FOR_INTERRUPTS(); pg_usleep(1000L); goto retry; @@ -1397,13 +1443,11 @@ retry: length = nextMXOffset - offset; } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(prevlock); + prevlock = NULL; ptr = (MultiXactMember *) palloc(length * sizeof(MultiXactMember)); - /* Now get the members themselves. */ - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); - truelength = 0; prev_pageno = -1; for (i = 0; i < length; i++, offset++) @@ -1419,6 +1463,20 @@ retry: if (pageno != prev_pageno) { + /* + * MultiXactMember SLRU page is changed so check if this new page + * fall into the different SLRU bank then release the old bank's + * lock and acquire lock on the new bank. + */ + lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, pageno); + if (lock != prevlock) + { + if (prevlock) + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, multi); prev_pageno = pageno; } @@ -1442,7 +1500,8 @@ retry: truelength++; } - LWLockRelease(MultiXactMemberSLRULock); + if (prevlock) + LWLockRelease(prevlock); /* A multixid with zero members should not happen */ Assert(truelength > 0); @@ -1852,14 +1911,14 @@ MultiXactShmemInit(void) SimpleLruInit(MultiXactOffsetCtl, "MultiXactOffset", multixact_offsets_buffers, 0, - MultiXactOffsetSLRULock, "pg_multixact/offsets", - LWTRANCHE_MULTIXACTOFFSET_BUFFER, + "pg_multixact/offsets", LWTRANCHE_MULTIXACTOFFSET_BUFFER, + LWTRANCHE_MULTIXACTOFFSET_SLRU, SYNC_HANDLER_MULTIXACT_OFFSET); SlruPagePrecedesUnitTests(MultiXactOffsetCtl, MULTIXACT_OFFSETS_PER_PAGE); SimpleLruInit(MultiXactMemberCtl, "MultiXactMember", multixact_members_buffers, 0, - MultiXactMemberSLRULock, "pg_multixact/members", - LWTRANCHE_MULTIXACTMEMBER_BUFFER, + "pg_multixact/members", LWTRANCHE_MULTIXACTMEMBER_BUFFER, + LWTRANCHE_MULTIXACTMEMBER_SLRU, SYNC_HANDLER_MULTIXACT_MEMBER); /* doesn't call SimpleLruTruncate() or meet criteria for unit tests */ @@ -1894,8 +1953,10 @@ void BootStrapMultiXact(void) { int slotno; + LWLock *lock; - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the offsets log */ slotno = ZeroMultiXactOffsetPage(0, false); @@ -1904,9 +1965,10 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, 0); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the members log */ slotno = ZeroMultiXactMemberPage(0, false); @@ -1915,7 +1977,7 @@ BootStrapMultiXact(void) SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -1975,10 +2037,12 @@ static void MaybeExtendOffsetSlru(void) { int pageno; + LWLock *lock; pageno = MultiXactIdToOffsetPage(MultiXactState->nextMXact); + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); if (!SimpleLruDoesPhysicalPageExist(MultiXactOffsetCtl, pageno)) { @@ -1993,7 +2057,7 @@ MaybeExtendOffsetSlru(void) SimpleLruWritePage(MultiXactOffsetCtl, slotno); } - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2015,13 +2079,15 @@ StartupMultiXact(void) * Initialize offset's idea of the latest page number. */ pageno = MultiXactIdToOffsetPage(multi); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_init_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); /* * Initialize member's idea of the latest page number. */ pageno = MXOffsetToMemberPage(offset); - MultiXactMemberCtl->shared->latest_page_number = pageno; + pg_atomic_init_u32(&MultiXactMemberCtl->shared->latest_page_number, + pageno); } /* @@ -2046,13 +2112,13 @@ TrimMultiXact(void) LWLockRelease(MultiXactGenLock); /* Clean up offsets state */ - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); /* * (Re-)Initialize our idea of the latest page number for offsets. */ pageno = MultiXactIdToOffsetPage(nextMXact); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); /* * Zero out the remainder of the current offsets page. See notes in @@ -2067,7 +2133,9 @@ TrimMultiXact(void) { int slotno; MultiXactOffset *offptr; + LWLock *lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(MultiXactOffsetCtl, pageno, true, nextMXact); offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; @@ -2075,18 +2143,17 @@ TrimMultiXact(void) MemSet(offptr, 0, BLCKSZ - (entryno * sizeof(MultiXactOffset))); MultiXactOffsetCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactOffsetSLRULock); - /* And the same for members */ - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); /* * (Re-)Initialize our idea of the latest page number for members. */ pageno = MXOffsetToMemberPage(offset); - MultiXactMemberCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactMemberCtl->shared->latest_page_number, + pageno); /* * Zero out the remainder of the current members page. See notes in @@ -2098,7 +2165,9 @@ TrimMultiXact(void) int slotno; TransactionId *xidptr; int memberoff; + LWLock *lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); memberoff = MXOffsetToMemberOffset(offset); slotno = SimpleLruReadPage(MultiXactMemberCtl, pageno, true, offset); xidptr = (TransactionId *) @@ -2113,10 +2182,9 @@ TrimMultiXact(void) */ MultiXactMemberCtl->shared->page_dirty[slotno] = true; + LWLockRelease(lock); } - LWLockRelease(MultiXactMemberSLRULock); - /* signal that we're officially up */ LWLockAcquire(MultiXactGenLock, LW_EXCLUSIVE); MultiXactState->finishedStartup = true; @@ -2404,6 +2472,7 @@ static void ExtendMultiXactOffset(MultiXactId multi) { int pageno; + LWLock *lock; /* * No work except at first MultiXactId of a page. But beware: just after @@ -2414,13 +2483,14 @@ ExtendMultiXactOffset(MultiXactId multi) return; pageno = MultiXactIdToOffsetPage(multi); + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactOffsetPage(pageno, true); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } /* @@ -2453,15 +2523,17 @@ ExtendMultiXactMember(MultiXactOffset offset, int nmembers) if (flagsoff == 0 && flagsbit == 0) { int pageno; + LWLock *lock; pageno = MXOffsetToMemberPage(offset); + lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, pageno); - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page and make an XLOG entry about it */ ZeroMultiXactMemberPage(pageno, true); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } /* @@ -2759,7 +2831,7 @@ find_multixact_start(MultiXactId multi, MultiXactOffset *result) offptr = (MultiXactOffset *) MultiXactOffsetCtl->shared->page_buffer[slotno]; offptr += entryno; offset = *offptr; - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno)); *result = offset; return true; @@ -3241,31 +3313,33 @@ multixact_redo(XLogReaderState *record) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - - LWLockAcquire(MultiXactOffsetSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(MultiXactOffsetCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactOffsetPage(pageno, false); SimpleLruWritePage(MultiXactOffsetCtl, slotno); Assert(!MultiXactOffsetCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactOffsetSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_ZERO_MEM_PAGE) { int pageno; int slotno; + LWLock *lock; memcpy(&pageno, XLogRecGetData(record), sizeof(int)); - - LWLockAcquire(MultiXactMemberSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(MultiXactMemberCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = ZeroMultiXactMemberPage(pageno, false); SimpleLruWritePage(MultiXactMemberCtl, slotno); Assert(!MultiXactMemberCtl->shared->page_dirty[slotno]); - LWLockRelease(MultiXactMemberSLRULock); + LWLockRelease(lock); } else if (info == XLOG_MULTIXACT_CREATE_ID) { @@ -3331,7 +3405,8 @@ multixact_redo(XLogReaderState *record) * SimpleLruTruncate. */ pageno = MultiXactIdToOffsetPage(xlrec.endTruncOff); - MultiXactOffsetCtl->shared->latest_page_number = pageno; + pg_atomic_write_u32(&MultiXactOffsetCtl->shared->latest_page_number, + pageno); PerformOffsetsTruncation(xlrec.startTruncOff, xlrec.endTruncOff); LWLockRelease(MultiXactTruncationLock); diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index c339e0a7e4..cf215627ea 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -159,7 +159,7 @@ static int SlruSelectLRUPage(SlruCtl ctl, int pageno); static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int segpage, void *data); static void SlruInternalDeleteSegment(SlruCtl ctl, int segno); -static void SlruAdjustNSlots(int *nslots, int *banksize, int *bankmask); +static int SlruAdjustNSlots(int *nslots, int *banksize, int *bankmask); /* * Initialization of shared memory @@ -171,8 +171,9 @@ SimpleLruShmemSize(int nslots, int nlsns) Size sz; int bankmask_ignore; int banksize_ignore; + int nbanks; - SlruAdjustNSlots(&nslots, &banksize_ignore, &bankmask_ignore); + nbanks = SlruAdjustNSlots(&nslots, &banksize_ignore, &bankmask_ignore); /* we assume nslots isn't so large as to risk overflow */ sz = MAXALIGN(sizeof(SlruSharedData)); @@ -182,6 +183,7 @@ SimpleLruShmemSize(int nslots, int nlsns) sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ + sz += MAXALIGN(nbanks * sizeof(LWLockPadded)); /* bank_locks[] */ if (nlsns > 0) sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ @@ -198,20 +200,22 @@ SimpleLruShmemSize(int nslots, int nlsns) * nlsns: number of LSN groups per page (set to zero if not relevant). * ctllock: LWLock to use to control access to the shared control structure. * subdir: PGDATA-relative subdirectory that will contain the files. - * tranche_id: LWLock tranche ID to use for the SLRU's per-buffer LWLocks. + * buffer_tranche_id: tranche ID to use for the SLRU's per-buffer LWLocks. + * bank_tranche_id: tranche ID to use for the SLRU's per-bank LWLocks. * sync_handler: which set of functions to use to handle sync requests */ void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, - LWLock *ctllock, const char *subdir, int tranche_id, + const char *subdir, int buffer_tranche_id, int bank_tranche_id, SyncRequestHandler sync_handler) { SlruShared shared; bool found; int bankmask; int banksize; + int nbanks; - SlruAdjustNSlots(&nslots, &banksize, &bankmask); + nbanks = SlruAdjustNSlots(&nslots, &banksize, &bankmask); shared = (SlruShared) ShmemInitStruct(name, SimpleLruShmemSize(nslots, nlsns), @@ -223,13 +227,12 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, char *ptr; Size offset; int slotno; + int bankno; Assert(!found); memset(shared, 0, sizeof(SlruSharedData)); - shared->ControlLock = ctllock; - shared->num_slots = nslots; shared->lsn_groups_per_page = nlsns; @@ -255,6 +258,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, /* Initialize LWLocks */ shared->buffer_locks = (LWLockPadded *) (ptr + offset); offset += MAXALIGN(nslots * sizeof(LWLockPadded)); + shared->bank_locks = (LWLockPadded *) (ptr + offset); + offset += MAXALIGN(nbanks * sizeof(LWLockPadded)); if (nlsns > 0) { @@ -266,7 +271,7 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, for (slotno = 0; slotno < nslots; slotno++) { LWLockInitialize(&shared->buffer_locks[slotno].lock, - tranche_id); + buffer_tranche_id); shared->page_buffer[slotno] = ptr; shared->page_status[slotno] = SLRU_PAGE_EMPTY; @@ -274,6 +279,10 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, shared->page_lru_count[slotno] = 0; ptr += BLCKSZ; } + /* Initialize bank locks for each buffer bank. */ + for (bankno = 0; bankno < nbanks; bankno++) + LWLockInitialize(&shared->bank_locks[bankno].lock, + bank_tranche_id); /* Should fit to estimated shmem size */ Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns)); @@ -329,7 +338,7 @@ SimpleLruZeroPage(SlruCtl ctl, int pageno) SimpleLruZeroLSNs(ctl, slotno); /* Assume this page is now the latest active page */ - shared->latest_page_number = pageno; + pg_atomic_write_u32(&shared->latest_page_number, pageno); /* update the stats counter of zeroed pages */ pgstat_count_slru_page_zeroed(shared->slru_stats_idx); @@ -368,12 +377,13 @@ static void SimpleLruWaitIO(SlruCtl ctl, int slotno) { SlruShared shared = ctl->shared; + int bankno = slotno / ctl->bank_size; /* See notes at top of file */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED); LWLockRelease(&shared->buffer_locks[slotno].lock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); /* * If the slot is still in an io-in-progress state, then either someone @@ -428,6 +438,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, for (;;) { int slotno; + int bankno; bool ok; /* See if page already is in memory; if not, pick victim slot */ @@ -470,9 +481,10 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); + bankno = slotno / ctl->bank_size; /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the read */ ok = SlruPhysicalReadPage(ctl, pageno, slotno); @@ -481,7 +493,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, SimpleLruZeroLSNs(ctl, slotno); /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS && @@ -523,11 +535,12 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) { SlruShared shared = ctl->shared; int slotno; - int bankstart = (pageno & ctl->bank_mask) * ctl->bank_size; + int bankno = pageno & ctl->bank_mask; + int bankstart = bankno * ctl->bank_size; int bankend = bankstart + ctl->bank_size; /* Try to find the page while holding only shared lock */ - LWLockAcquire(shared->ControlLock, LW_SHARED); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_SHARED); /* See if page is already in a buffer */ for (slotno = bankstart; slotno < bankend; slotno++) @@ -547,8 +560,8 @@ SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) } /* No luck, so switch to normal exclusive lock and do regular read */ - LWLockRelease(shared->ControlLock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockRelease(&shared->bank_locks[bankno].lock); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); return SimpleLruReadPage(ctl, pageno, true, xid); } @@ -570,6 +583,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) SlruShared shared = ctl->shared; int pageno = shared->page_number[slotno]; bool ok; + int bankno = slotno / ctl->bank_size; /* If a write is in progress, wait for it to finish */ while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS && @@ -598,7 +612,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); /* Release control lock while doing I/O */ - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[bankno].lock); /* Do the write */ ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata); @@ -613,7 +627,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata) } /* Re-acquire control lock and update page state */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE); Assert(shared->page_number[slotno] == pageno && shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS); @@ -1118,7 +1132,7 @@ SlruSelectLRUPage(SlruCtl ctl, int pageno) this_delta = 0; } this_page_number = shared->page_number[slotno]; - if (this_page_number == shared->latest_page_number) + if (this_page_number == pg_atomic_read_u32(&shared->latest_page_number)) continue; if (shared->page_status[slotno] == SLRU_PAGE_VALID) { @@ -1192,6 +1206,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) int slotno; int pageno = 0; int i; + int lastbankno = 0; bool ok; /* update the stats counter of flushes */ @@ -1202,10 +1217,19 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) */ fdata.num_files = 0; - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[0].lock, LW_EXCLUSIVE); for (slotno = 0; slotno < shared->num_slots; slotno++) { + int curbankno = slotno / ctl->bank_size; + + if (curbankno != lastbankno) + { + LWLockRelease(&shared->bank_locks[lastbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + lastbankno = curbankno; + } + SlruInternalWritePage(ctl, slotno, &fdata); /* @@ -1219,7 +1243,7 @@ SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied) !shared->page_dirty[slotno])); } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[lastbankno].lock); /* * Now close any files that were open @@ -1259,6 +1283,7 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage) { SlruShared shared = ctl->shared; int slotno; + int prevbankno; /* update the stats counter of truncates */ pgstat_count_slru_truncate(shared->slru_stats_idx); @@ -1269,25 +1294,38 @@ SimpleLruTruncate(SlruCtl ctl, int cutoffPage) * or just after a checkpoint, any dirty pages should have been flushed * already ... we're just being extra careful here.) */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); - restart: /* * While we are holding the lock, make an important safety check: the * current endpoint page must not be eligible for removal. */ - if (ctl->PagePrecedes(shared->latest_page_number, cutoffPage)) + if (ctl->PagePrecedes(pg_atomic_read_u32(&shared->latest_page_number), + cutoffPage)) { - LWLockRelease(shared->ControlLock); ereport(LOG, (errmsg("could not truncate directory \"%s\": apparent wraparound", ctl->Dir))); return; } + prevbankno = 0; + LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE); for (slotno = 0; slotno < shared->num_slots; slotno++) { + int curbankno = slotno / ctl->bank_size; + + /* + * If the curbankno is not same as prevbankno then release the lock on + * the prevbankno and acquire the lock on the curbankno. + */ + if (curbankno != prevbankno) + { + LWLockRelease(&shared->bank_locks[prevbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + prevbankno = curbankno; + } + if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage)) @@ -1317,10 +1355,12 @@ restart: SlruInternalWritePage(ctl, slotno, NULL); else SimpleLruWaitIO(ctl, slotno); + + LWLockRelease(&shared->bank_locks[prevbankno].lock); goto restart; } - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbankno].lock); /* Now we can remove the old segment(s) */ (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage); @@ -1361,15 +1401,31 @@ SlruDeleteSegment(SlruCtl ctl, int segno) SlruShared shared = ctl->shared; int slotno; bool did_write; + int prevbankno = 0; /* Clean out any possibly existing references to the segment. */ - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + LWLockAcquire(&shared->bank_locks[prevbankno].lock, LW_EXCLUSIVE); restart: did_write = false; for (slotno = 0; slotno < shared->num_slots; slotno++) { - int pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; + int pagesegno; + int curbankno; + + curbankno = slotno / ctl->bank_size; + + /* + * If the curbankno is not same as prevbankno then release the lock on + * the prevbankno and acquire the lock on the curbankno. + */ + if (curbankno != prevbankno) + { + LWLockRelease(&shared->bank_locks[prevbankno].lock); + LWLockAcquire(&shared->bank_locks[curbankno].lock, LW_EXCLUSIVE); + prevbankno = curbankno; + } + pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT; if (shared->page_status[slotno] == SLRU_PAGE_EMPTY) continue; @@ -1403,7 +1459,7 @@ restart: SlruInternalDeleteSegment(ctl, segno); - LWLockRelease(shared->ControlLock); + LWLockRelease(&shared->bank_locks[prevbankno].lock); } /* @@ -1651,7 +1707,7 @@ SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path) * We expect the bank number to be picked from the lowest bits of the requested * pageno. Thus we want the number of banks to be the power of 2. */ -static void +static int SlruAdjustNSlots(int *nslots, int *banksize, int *bankmask) { int nbanks = 1; @@ -1677,4 +1733,34 @@ SlruAdjustNSlots(int *nslots, int *banksize, int *bankmask) *nslots = *banksize * nbanks; *bankmask = (*nslots / *banksize) - 1; + + return nbanks; +} + +/* + * Function to acquire all bank's lock of the given SlruCtl + */ +void +SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode) +{ + SlruShared shared = ctl->shared; + int bankno; + int nbanks = shared->num_slots / ctl->bank_size; + + for (bankno = 0; bankno < nbanks; bankno++) + LWLockAcquire(&shared->bank_locks[bankno].lock, mode); +} + +/* + * Function to release all bank's lock of the given SlruCtl + */ +void +SimpleLruReleaseAllBankLock(SlruCtl ctl) +{ + SlruShared shared = ctl->shared; + int bankno; + int nbanks = shared->num_slots / ctl->bank_size; + + for (bankno = 0; bankno < nbanks; bankno++) + LWLockRelease(&shared->bank_locks[bankno].lock); } diff --git a/src/backend/access/transam/subtrans.c b/src/backend/access/transam/subtrans.c index 0dd48f40f3..4e3fc5fc51 100644 --- a/src/backend/access/transam/subtrans.c +++ b/src/backend/access/transam/subtrans.c @@ -77,12 +77,14 @@ SubTransSetParent(TransactionId xid, TransactionId parent) int pageno = TransactionIdToPage(xid); int entryno = TransactionIdToEntry(xid); int slotno; + LWLock *lock; TransactionId *ptr; Assert(TransactionIdIsValid(parent)); Assert(TransactionIdFollows(xid, parent)); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(SubTransCtl, pageno, true, xid); ptr = (TransactionId *) SubTransCtl->shared->page_buffer[slotno]; @@ -100,7 +102,7 @@ SubTransSetParent(TransactionId xid, TransactionId parent) SubTransCtl->shared->page_dirty[slotno] = true; } - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -130,7 +132,7 @@ SubTransGetParent(TransactionId xid) parent = *ptr; - LWLockRelease(SubtransSLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(SubTransCtl, pageno)); return parent; } @@ -193,8 +195,9 @@ SUBTRANSShmemInit(void) { SubTransCtl->PagePrecedes = SubTransPagePrecedes; SimpleLruInit(SubTransCtl, "Subtrans", subtrans_buffers, 0, - SubtransSLRULock, "pg_subtrans", - LWTRANCHE_SUBTRANS_BUFFER, SYNC_HANDLER_NONE); + "pg_subtrans", LWTRANCHE_SUBTRANS_BUFFER, + LWTRANCHE_SUBTRANS_SLRU, + SYNC_HANDLER_NONE); SlruPagePrecedesUnitTests(SubTransCtl, SUBTRANS_XACTS_PER_PAGE); } @@ -212,8 +215,9 @@ void BootStrapSUBTRANS(void) { int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(SubTransCtl, 0); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Create and zero the first page of the subtrans log */ slotno = ZeroSUBTRANSPage(0); @@ -222,7 +226,7 @@ BootStrapSUBTRANS(void) SimpleLruWritePage(SubTransCtl, slotno); Assert(!SubTransCtl->shared->page_dirty[slotno]); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } /* @@ -252,6 +256,8 @@ StartupSUBTRANS(TransactionId oldestActiveXID) FullTransactionId nextXid; int startPage; int endPage; + LWLock *prevlock; + LWLock *lock; /* * Since we don't expect pg_subtrans to be valid across crashes, we @@ -259,23 +265,47 @@ StartupSUBTRANS(TransactionId oldestActiveXID) * Whenever we advance into a new page, ExtendSUBTRANS will likewise zero * the new page without regard to whatever was previously on disk. */ - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); - startPage = TransactionIdToPage(oldestActiveXID); nextXid = ShmemVariableCache->nextXid; endPage = TransactionIdToPage(XidFromFullTransactionId(nextXid)); + prevlock = SimpleLruGetSLRUBankLock(SubTransCtl, startPage); + LWLockAcquire(prevlock, LW_EXCLUSIVE); while (startPage != endPage) { + lock = SimpleLruGetSLRUBankLock(SubTransCtl, startPage); + + /* + * Check if we need to acquire the lock on the new bank then release + * the lock on the old bank and acquire on the new bank. + */ + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + prevlock = lock; + } + (void) ZeroSUBTRANSPage(startPage); startPage++; /* must account for wraparound */ if (startPage > TransactionIdToPage(MaxTransactionId)) startPage = 0; } - (void) ZeroSUBTRANSPage(startPage); - LWLockRelease(SubtransSLRULock); + lock = SimpleLruGetSLRUBankLock(SubTransCtl, startPage); + + /* + * Check if we need to acquire the lock on the new bank then release the + * lock on the old bank and acquire on the new bank. + */ + if (prevlock != lock) + { + LWLockRelease(prevlock); + LWLockAcquire(lock, LW_EXCLUSIVE); + } + (void) ZeroSUBTRANSPage(startPage); + LWLockRelease(lock); } /* @@ -309,6 +339,7 @@ void ExtendSUBTRANS(TransactionId newestXact) { int pageno; + LWLock *lock; /* * No work except at first XID of a page. But beware: just after @@ -320,12 +351,13 @@ ExtendSUBTRANS(TransactionId newestXact) pageno = TransactionIdToPage(newestXact); - LWLockAcquire(SubtransSLRULock, LW_EXCLUSIVE); + lock = SimpleLruGetSLRUBankLock(SubTransCtl, pageno); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Zero the page */ ZeroSUBTRANSPage(pageno); - LWLockRelease(SubtransSLRULock); + LWLockRelease(lock); } diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bdbbe5cc0..9f14faed78 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -267,9 +267,10 @@ typedef struct QueueBackendStatus * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends * can change the tail pointers. * - * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. + * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as + * the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need multiple locks, we first get - * NotifyQueueTailLock, then NotifyQueueLock, and lastly NotifySLRULock. + * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. * * Each backend uses the backend[] array entry with index equal to its * BackendId (which can range from 1 to MaxBackends). We rely on this to make @@ -570,7 +571,7 @@ AsyncShmemInit(void) */ NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; SimpleLruInit(NotifyCtl, "Notify", notify_buffers, 0, - NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER, + "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, SYNC_HANDLER_NONE); if (!found) @@ -1402,7 +1403,7 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * Eventually we will return NULL indicating all is done. * * We are holding NotifyQueueLock already from the caller and grab - * NotifySLRULock locally in this function. + * page specific SLRU bank lock locally in this function. */ static ListCell * asyncQueueAddEntries(ListCell *nextNotify) @@ -1412,9 +1413,7 @@ asyncQueueAddEntries(ListCell *nextNotify) int pageno; int offset; int slotno; - - /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ - LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); + LWLock *lock; /* * We work with a local copy of QUEUE_HEAD, which we write back to shared @@ -1438,6 +1437,11 @@ asyncQueueAddEntries(ListCell *nextNotify) * wrapped around, but re-zeroing the page is harmless in that case.) */ pageno = QUEUE_POS_PAGE(queue_head); + lock = SimpleLruGetSLRUBankLock(NotifyCtl, pageno); + + /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ + LWLockAcquire(lock, LW_EXCLUSIVE); + if (QUEUE_POS_IS_ZERO(queue_head)) slotno = SimpleLruZeroPage(NotifyCtl, pageno); else @@ -1509,7 +1513,7 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Success, so update the global QUEUE_HEAD */ QUEUE_HEAD = queue_head; - LWLockRelease(NotifySLRULock); + LWLockRelease(lock); return nextNotify; } @@ -1988,9 +1992,9 @@ asyncQueueReadAllNotifications(void) /* * We copy the data from SLRU into a local buffer, so as to avoid - * holding the NotifySLRULock while we are examining the entries - * and possibly transmitting them to our frontend. Copy only the - * part of the page we will actually inspect. + * holding the SLRU lock while we are examining the entries and + * possibly transmitting them to our frontend. Copy only the part + * of the page we will actually inspect. */ slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, InvalidTransactionId); @@ -2010,7 +2014,7 @@ asyncQueueReadAllNotifications(void) NotifyCtl->shared->page_buffer[slotno] + curoffset, copysize); /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(NotifySLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(NotifyCtl, curpage)); /* * Process messages up to the stop position, end of page, or an @@ -2051,7 +2055,7 @@ asyncQueueReadAllNotifications(void) * * The current page must have been fetched into page_buffer from shared * memory. (We could access the page right in shared memory, but that - * would imply holding the NotifySLRULock throughout this routine.) + * would imply holding the SLRU bank lock throughout this routine.) * * We stop if we reach the "stop" position, or reach a notification from an * uncommitted transaction, or reach the end of the page. @@ -2204,7 +2208,7 @@ asyncQueueAdvanceTail(void) if (asyncQueuePagePrecedes(oldtailpage, boundary)) { /* - * SimpleLruTruncate() will ask for NotifySLRULock but will also + * SimpleLruTruncate() will ask for SLRU bank locks but will also * release the lock again. */ SimpleLruTruncate(NotifyCtl, newtailpage); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..1261af0548 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,20 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_XACT_SLRU: */ + "XactSLRU", + /* LWTRANCHE_COMMITTS_SLRU: */ + "CommitTSSLRU", + /* LWTRANCHE_SUBTRANS_SLRU: */ + "SubtransSLRU", + /* LWTRANCHE_MULTIXACTOFFSET_SLRU: */ + "MultixactOffsetSLRU", + /* LWTRANCHE_MULTIXACTMEMBER_SLRU: */ + "MultixactMemberSLRU", + /* LWTRANCHE_NOTIFY_SLRU: */ + "NotifySLRU", + /* LWTRANCHE_SERIAL_SLRU: */ + "SerialSLRU" }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..9e66ecd1ed 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -16,11 +16,11 @@ WALBufMappingLock 7 WALWriteLock 8 ControlFileLock 9 # 10 was CheckpointLock -XactSLRULock 11 -SubtransSLRULock 12 +# 11 was XactSLRULock +# 12 was SubtransSLRULock MultiXactGenLock 13 -MultiXactOffsetSLRULock 14 -MultiXactMemberSLRULock 15 +# 14 was MultiXactOffsetSLRULock +# 15 was MultiXactMemberSLRULock RelCacheInitLock 16 CheckpointerCommLock 17 TwoPhaseStateLock 18 @@ -31,19 +31,19 @@ AutovacuumLock 22 AutovacuumScheduleLock 23 SyncScanLock 24 RelationMappingLock 25 -NotifySLRULock 26 +#26 was NotifySLRULock NotifyQueueLock 27 SerializableXactHashLock 28 SerializableFinishedListLock 29 SerializablePredicateListLock 30 -SerialSLRULock 31 +SerialControlLock 31 SyncRepLock 32 BackgroundWorkerLock 33 DynamicSharedMemoryControlLock 34 AutoFileLock 35 ReplicationSlotAllocationLock 36 ReplicationSlotControlLock 37 -CommitTsSLRULock 38 +#38 was CommitTsSLRULock CommitTsLock 39 ReplicationOriginLock 40 MultiXactTruncationLock 41 diff --git a/src/backend/storage/lmgr/predicate.c b/src/backend/storage/lmgr/predicate.c index 18ea18316d..4098a056e5 100644 --- a/src/backend/storage/lmgr/predicate.c +++ b/src/backend/storage/lmgr/predicate.c @@ -808,8 +808,9 @@ SerialInit(void) */ SerialSlruCtl->PagePrecedes = SerialPagePrecedesLogically; SimpleLruInit(SerialSlruCtl, "Serial", - serial_buffers, 0, SerialSLRULock, "pg_serial", - LWTRANCHE_SERIAL_BUFFER, SYNC_HANDLER_NONE); + serial_buffers, 0, "pg_serial", + LWTRANCHE_SERIAL_BUFFER, LWTRANCHE_SERIAL_SLRU, + SYNC_HANDLER_NONE); #ifdef USE_ASSERT_CHECKING SerialPagePrecedesLogicallyUnitTests(); #endif @@ -846,12 +847,14 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) int slotno; int firstZeroPage; bool isNewPage; + LWLock *lock; Assert(TransactionIdIsValid(xid)); targetPage = SerialPage(xid); + lock = SimpleLruGetSLRUBankLock(SerialSlruCtl, targetPage); - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* * If no serializable transactions are active, there shouldn't be anything @@ -901,7 +904,7 @@ SerialAdd(TransactionId xid, SerCommitSeqNo minConflictCommitSeqNo) SerialValue(slotno, xid) = minConflictCommitSeqNo; SerialSlruCtl->shared->page_dirty[slotno] = true; - LWLockRelease(SerialSLRULock); + LWLockRelease(lock); } /* @@ -919,10 +922,10 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) Assert(TransactionIdIsValid(xid)); - LWLockAcquire(SerialSLRULock, LW_SHARED); + LWLockAcquire(SerialControlLock, LW_SHARED); headXid = serialControl->headXid; tailXid = serialControl->tailXid; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); if (!TransactionIdIsValid(headXid)) return 0; @@ -934,13 +937,13 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) return 0; /* - * The following function must be called without holding SerialSLRULock, + * The following function must be called without holding SLRU bank lock, * but will return with that lock held, which must then be released. */ slotno = SimpleLruReadPage_ReadOnly(SerialSlruCtl, SerialPage(xid), xid); val = SerialValue(slotno, xid); - LWLockRelease(SerialSLRULock); + LWLockRelease(SimpleLruGetSLRUBankLock(SerialSlruCtl, SerialPage(xid))); return val; } @@ -953,7 +956,7 @@ SerialGetMinConflictCommitSeqNo(TransactionId xid) static void SerialSetActiveSerXmin(TransactionId xid) { - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(SerialControlLock, LW_EXCLUSIVE); /* * When no sxacts are active, nothing overlaps, set the xid values to @@ -965,7 +968,7 @@ SerialSetActiveSerXmin(TransactionId xid) { serialControl->tailXid = InvalidTransactionId; serialControl->headXid = InvalidTransactionId; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -983,7 +986,7 @@ SerialSetActiveSerXmin(TransactionId xid) { serialControl->tailXid = xid; } - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -992,7 +995,7 @@ SerialSetActiveSerXmin(TransactionId xid) serialControl->tailXid = xid; - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); } /* @@ -1006,12 +1009,12 @@ CheckPointPredicate(void) { int truncateCutoffPage; - LWLockAcquire(SerialSLRULock, LW_EXCLUSIVE); + LWLockAcquire(SerialControlLock, LW_EXCLUSIVE); /* Exit quickly if the SLRU is currently not in use. */ if (serialControl->headPage < 0) { - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); return; } @@ -1071,7 +1074,7 @@ CheckPointPredicate(void) serialControl->headPage = -1; } - LWLockRelease(SerialSLRULock); + LWLockRelease(SerialControlLock); /* Truncate away pages that are no longer required */ SimpleLruTruncate(SerialSlruCtl, truncateCutoffPage); diff --git a/src/include/access/slru.h b/src/include/access/slru.h index c3fd58185a..f3545d5f5d 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -57,8 +57,6 @@ typedef enum */ typedef struct SlruSharedData { - LWLock *ControlLock; - /* Number of buffers managed by this SLRU structure */ int num_slots; @@ -73,6 +71,13 @@ typedef struct SlruSharedData int *page_lru_count; LWLockPadded *buffer_locks; + /* + * Locks to protect the in memory buffer slot access in per SLRU bank. The + * buffer_locks protects the I/O on each buffer slots whereas this lock + * protect the in memory operation on the buffer within one SLRU bank. + */ + LWLockPadded *bank_locks; + /* * Optional array of WAL flush LSNs associated with entries in the SLRU * pages. If not zero/NULL, we must flush WAL before writing pages (true @@ -100,7 +105,7 @@ typedef struct SlruSharedData * this is not critical data, since we use it only to avoid swapping out * the latest page. */ - int latest_page_number; + pg_atomic_uint32 latest_page_number; /* SLRU's index for statistics purposes (might not be unique) */ int slru_stats_idx; @@ -149,11 +154,24 @@ typedef struct SlruCtlData typedef SlruCtlData *SlruCtl; +/* + * Get the SLRU bank lock for given SlruCtl and the pageno. + * + * This lock needs to be acquire in order to access the slru buffer slots in + * the respective bank. For more details refer comments in SlruSharedData. + */ +static inline LWLock * +SimpleLruGetSLRUBankLock(SlruCtl ctl, int pageno) +{ + int bankno = (pageno & ctl->bank_mask); + + return &(ctl->shared->bank_locks[bankno].lock); +} extern Size SimpleLruShmemSize(int nslots, int nlsns); extern void SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, - LWLock *ctllock, const char *subdir, int tranche_id, - SyncRequestHandler sync_handler); + const char *subdir, int buffer_tranche_id, + int bank_tranche_id, SyncRequestHandler sync_handler); extern int SimpleLruZeroPage(SlruCtl ctl, int pageno); extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); @@ -181,5 +199,7 @@ extern bool SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int segpage, void *data); extern bool SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int segpage, void *data); - +extern LWLock *SimpleLruGetSLRUBankLock(SlruCtl ctl, int pageno); +extern void SimpleLruAcquireAllBankLock(SlruCtl ctl, LWLockMode mode); +extern void SimpleLruReleaseAllBankLock(SlruCtl ctl); #endif /* SLRU_H */ diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index b038e599c0..87cb812b84 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,13 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_XACT_SLRU, + LWTRANCHE_COMMITTS_SLRU, + LWTRANCHE_SUBTRANS_SLRU, + LWTRANCHE_MULTIXACTOFFSET_SLRU, + LWTRANCHE_MULTIXACTMEMBER_SLRU, + LWTRANCHE_NOTIFY_SLRU, + LWTRANCHE_SERIAL_SLRU, LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/test/modules/test_slru/test_slru.c b/src/test/modules/test_slru/test_slru.c index ae21444c47..9a02f33933 100644 --- a/src/test/modules/test_slru/test_slru.c +++ b/src/test/modules/test_slru/test_slru.c @@ -40,10 +40,6 @@ PG_FUNCTION_INFO_V1(test_slru_delete_all); /* Number of SLRU page slots */ #define NUM_TEST_BUFFERS 16 -/* SLRU control lock */ -LWLock TestSLRULock; -#define TestSLRULock (&TestSLRULock) - static SlruCtlData TestSlruCtlData; #define TestSlruCtl (&TestSlruCtlData) @@ -63,9 +59,9 @@ test_slru_page_write(PG_FUNCTION_ARGS) int pageno = PG_GETARG_INT32(0); char *data = text_to_cstring(PG_GETARG_TEXT_PP(1)); int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(TestSlruCtl, pageno); - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); - + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruZeroPage(TestSlruCtl, pageno); /* these should match */ @@ -80,7 +76,7 @@ test_slru_page_write(PG_FUNCTION_ARGS) BLCKSZ - 1); SimpleLruWritePage(TestSlruCtl, slotno); - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_VOID(); } @@ -99,13 +95,14 @@ test_slru_page_read(PG_FUNCTION_ARGS) bool write_ok = PG_GETARG_BOOL(1); char *data = NULL; int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(TestSlruCtl, pageno); /* find page in buffers, reading it if necessary */ - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); slotno = SimpleLruReadPage(TestSlruCtl, pageno, write_ok, InvalidTransactionId); data = (char *) TestSlruCtl->shared->page_buffer[slotno]; - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_TEXT_P(cstring_to_text(data)); } @@ -116,14 +113,15 @@ test_slru_page_readonly(PG_FUNCTION_ARGS) int pageno = PG_GETARG_INT32(0); char *data = NULL; int slotno; + LWLock *lock = SimpleLruGetSLRUBankLock(TestSlruCtl, pageno); /* find page in buffers, reading it if necessary */ slotno = SimpleLruReadPage_ReadOnly(TestSlruCtl, pageno, InvalidTransactionId); - Assert(LWLockHeldByMe(TestSLRULock)); + Assert(LWLockHeldByMe(lock)); data = (char *) TestSlruCtl->shared->page_buffer[slotno]; - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_TEXT_P(cstring_to_text(data)); } @@ -133,10 +131,11 @@ test_slru_page_exists(PG_FUNCTION_ARGS) { int pageno = PG_GETARG_INT32(0); bool found; + LWLock *lock = SimpleLruGetSLRUBankLock(TestSlruCtl, pageno); - LWLockAcquire(TestSLRULock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); found = SimpleLruDoesPhysicalPageExist(TestSlruCtl, pageno); - LWLockRelease(TestSLRULock); + LWLockRelease(lock); PG_RETURN_BOOL(found); } @@ -215,6 +214,7 @@ test_slru_shmem_startup(void) { const char slru_dir_name[] = "pg_test_slru"; int test_tranche_id; + int test_buffer_tranche_id; if (prev_shmem_startup_hook) prev_shmem_startup_hook(); @@ -228,11 +228,13 @@ test_slru_shmem_startup(void) /* initialize the SLRU facility */ test_tranche_id = LWLockNewTrancheId(); LWLockRegisterTranche(test_tranche_id, "test_slru_tranche"); - LWLockInitialize(TestSLRULock, test_tranche_id); + + test_buffer_tranche_id = LWLockNewTrancheId(); + LWLockRegisterTranche(test_tranche_id, "test_buffer_tranche"); TestSlruCtl->PagePrecedes = test_slru_page_precedes_logically; SimpleLruInit(TestSlruCtl, "TestSLRU", - NUM_TEST_BUFFERS, 0, TestSLRULock, slru_dir_name, + NUM_TEST_BUFFERS, 0, slru_dir_name, test_buffer_tranche_id, test_tranche_id, SYNC_HANDLER_NONE); } -- 2.39.2 (Apple Git-143)