diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 2634476..2a99d6c 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -71,6 +71,12 @@ #define GetLSNIndex(slotno, xid) ((slotno) * CLOG_LSNS_PER_PAGE + \ ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE) / CLOG_XACTS_PER_LSN_GROUP) +/* + * (Un-)defining USE_CONTENT_LOCK selects between two locking models for clog + * pages. With USE_CONTENT_LOCK defined we use per-page content locks, without + * it every status modification is done using cmpxchg, without a content lock. + */ +#define USE_CONTENT_LOCK /* * Link to shared-memory data structures for CLOG control @@ -262,9 +268,16 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, status == TRANSACTION_STATUS_ABORTED || (status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid))); - LWLockAcquire(CLogControlLock, LW_EXCLUSIVE); - /* + * We acquire CLogCtl in share mode (via SimpleLruReadPage_optShared), to + * prevent pages from being replaced. + * + * This is safe because all callers of these routines always check the xid + * is complete first, either by checking TransactionIdIsInProgress() or by + * waiting for the transaction to complete via the lock manager. As a + * result, anybody checking visibility of our xids will never get as far + * as checking the status here at the same time we are setting it. + * * If we're doing an async commit (ie, lsn is valid), then we must wait * for any active write on the page slot to complete. Otherwise our * update could reach disk in that write, which will not do since we @@ -273,7 +286,18 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, * write-busy, since we don't care if the update reaches disk sooner than * we think. */ - slotno = SimpleLruReadPage(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid); + slotno = SimpleLruReadPage_optShared(ClogCtl, pageno, XLogRecPtrIsInvalid(lsn), xid); + + /* + * We might have problems with concurrent writes, but we solve that by + * acquiring an exclusive page lock to serialize commits and ensure there + * is a memory barrier between commit writes. Note that we acquire and + * release the lock for each page, to ensure that transactions with huge + * numbers of subxacts don't hold up everyone else. + */ +#ifdef USE_CONTENT_LOCK + LWLockAcquire(&ClogCtl->shared->buffer_content_locks[slotno].lock, LW_EXCLUSIVE); +#endif /* * Set the main transaction id, if any. @@ -309,15 +333,24 @@ TransactionIdSetPageStatus(TransactionId xid, int nsubxids, TransactionIdSetStatusBit(subxids[i], status, lsn, slotno); } +#ifndef USE_CONTENT_LOCK + /* prevent compiler from moving store up */ + pg_compiler_barrier(); +#endif + ClogCtl->shared->page_dirty[slotno] = true; +#ifdef USE_CONTENT_LOCK + LWLockRelease(&ClogCtl->shared->buffer_content_locks[slotno].lock); +#endif LWLockRelease(CLogControlLock); } /* * Sets the commit status of a single transaction. * - * Must be called with CLogControlLock held + * Must be called with CLogControlLock held in (at least) share mode, and an + * exclusive lock on the page's content lock. */ static void TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno) @@ -350,12 +383,34 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i status != TRANSACTION_STATUS_IN_PROGRESS) || curval == status); - /* note this assumes exclusive access to the clog page */ +#ifdef USE_CONTENT_LOCK + /* note this assumes exclusive write access to the clog page */ byteval = *byteptr; + pg_compiler_barrier(); /* prevent modification being done in-place */ byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift); byteval |= (status << bshift); + pg_compiler_barrier(); /* prevent modification being done in-place */ *byteptr = byteval; - +#else + /* + * Modify only the relevant bits, retry if byte has concurrently been + * modified. + * + * XXX: Obviously this'd require some abstraction. + */ + while (true) + { + char origval; + origval = byteval = *((volatile char *) byteptr); + pg_compiler_barrier(); + byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift); + byteval |= (status << bshift); + *byteptr = byteval; + if (__atomic_compare_exchange_n(byteptr, &origval, byteval, false, + __ATOMIC_SEQ_CST, __ATOMIC_SEQ_CST)) + break; + } +#endif /* * Update the group LSN if the transaction completion LSN is higher. * @@ -367,9 +422,23 @@ TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, i if (!XLogRecPtrIsInvalid(lsn)) { int lsnindex = GetLSNIndex(slotno, xid); + pg_atomic_uint64 *group_lsn = &ClogCtl->shared->group_lsn[lsnindex]; + +#ifdef USE_CONTENT_LOCK + /* we hold an exclusive write lock on the page */ + if ((XLogRecPtr) pg_atomic_read_u64(group_lsn) < lsn) + pg_atomic_write_u64(group_lsn, (uint64) lsn); +#else + uint64 old = pg_atomic_read_u64(group_lsn); - if (ClogCtl->shared->group_lsn[lsnindex] < lsn) - ClogCtl->shared->group_lsn[lsnindex] = lsn; + while (true) + { + if (old >= lsn) + break; + if (pg_atomic_compare_exchange_u64(group_lsn, &old, lsn)) + break; + } +#endif } } @@ -407,7 +476,7 @@ TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn) status = (*byteptr >> bshift) & CLOG_XACT_BITMASK; lsnindex = GetLSNIndex(slotno, xid); - *lsn = ClogCtl->shared->group_lsn[lsnindex]; + *lsn = pg_atomic_read_u64(&ClogCtl->shared->group_lsn[lsnindex]); LWLockRelease(CLogControlLock); diff --git a/src/backend/access/transam/slru.c b/src/backend/access/transam/slru.c index bbae584..4505010 100644 --- a/src/backend/access/transam/slru.c +++ b/src/backend/access/transam/slru.c @@ -152,10 +152,11 @@ SimpleLruShmemSize(int nslots, int nlsns) sz += MAXALIGN(nslots * sizeof(bool)); /* page_dirty[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_number[] */ sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */ - sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_locks[] */ + sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_io_locks[] */ + sz += MAXALIGN(nslots * sizeof(LWLockPadded)); /* buffer_content_locks[] */ if (nlsns > 0) - sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); /* group_lsn[] */ + sz += MAXALIGN(nslots * nlsns * sizeof(pg_atomic_uint64)); /* group_lsn[] */ return BUFFERALIGN(sz) + BLCKSZ * nslots; } @@ -206,25 +207,42 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, if (nlsns > 0) { - shared->group_lsn = (XLogRecPtr *) (ptr + offset); + int i; + + shared->group_lsn = (pg_atomic_uint64 *) (ptr + offset); offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr)); + + for (i = 0; i < nslots * nlsns; i++) + { + pg_atomic_init_u64(&shared->group_lsn[i], 0); + } } /* Initialize LWLocks */ - shared->buffer_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots); + shared->buffer_io_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots); Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH); - strlcpy(shared->lwlock_tranche_name, name, SLRU_MAX_NAME_LENGTH); - shared->lwlock_tranche_id = tranche_id; - shared->lwlock_tranche.name = shared->lwlock_tranche_name; - shared->lwlock_tranche.array_base = shared->buffer_locks; - shared->lwlock_tranche.array_stride = sizeof(LWLockPadded); + strlcpy(shared->lwlock_io_tranche_name, name, SLRU_MAX_NAME_LENGTH); + shared->lwlock_io_tranche_id = tranche_id; + shared->lwlock_io_tranche.name = shared->lwlock_io_tranche_name; + shared->lwlock_io_tranche.array_base = shared->buffer_io_locks; + shared->lwlock_io_tranche.array_stride = sizeof(LWLockPadded); + + shared->buffer_content_locks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nslots); + Assert(strlen(name) + 1 < SLRU_MAX_NAME_LENGTH); + strlcpy(shared->lwlock_content_tranche_name, name, SLRU_MAX_NAME_LENGTH); + shared->lwlock_content_tranche_id = tranche_id; + shared->lwlock_content_tranche.name = shared->lwlock_content_tranche_name; + shared->lwlock_content_tranche.array_base = shared->buffer_content_locks; + shared->lwlock_content_tranche.array_stride = sizeof(LWLockPadded); ptr += BUFFERALIGN(offset); for (slotno = 0; slotno < nslots; slotno++) { - LWLockInitialize(&shared->buffer_locks[slotno].lock, - shared->lwlock_tranche_id); + LWLockInitialize(&shared->buffer_io_locks[slotno].lock, + shared->lwlock_io_tranche_id); + LWLockInitialize(&shared->buffer_content_locks[slotno].lock, + shared->lwlock_content_tranche_id); shared->page_buffer[slotno] = ptr; shared->page_status[slotno] = SLRU_PAGE_EMPTY; @@ -237,7 +255,8 @@ SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns, Assert(found); /* Register SLRU tranche in the main tranches array */ - LWLockRegisterTranche(shared->lwlock_tranche_id, &shared->lwlock_tranche); + LWLockRegisterTranche(shared->lwlock_io_tranche_id, &shared->lwlock_io_tranche); + LWLockRegisterTranche(shared->lwlock_content_tranche_id, &shared->lwlock_content_tranche); /* * Initialize the unshared control struct, including directory path. We @@ -303,8 +322,16 @@ SimpleLruZeroLSNs(SlruCtl ctl, int slotno) SlruShared shared = ctl->shared; if (shared->lsn_groups_per_page > 0) - MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0, - shared->lsn_groups_per_page * sizeof(XLogRecPtr)); + { + int i; + + for (i = slotno * shared->lsn_groups_per_page; + i < (slotno + 1) * shared->lsn_groups_per_page; + i++) + { + pg_atomic_init_u64(&shared->group_lsn[i], 0); + } + } } /* @@ -321,8 +348,8 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) /* See notes at top of file */ LWLockRelease(shared->ControlLock); - LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED); - LWLockRelease(&shared->buffer_locks[slotno].lock); + LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_SHARED); + LWLockRelease(&shared->buffer_io_locks[slotno].lock); LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); /* @@ -336,7 +363,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS || shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS) { - if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED)) + if (LWLockConditionalAcquire(&shared->buffer_io_locks[slotno].lock, LW_SHARED)) { /* indeed, the I/O must have failed */ if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS) @@ -346,7 +373,7 @@ SimpleLruWaitIO(SlruCtl ctl, int slotno) shared->page_status[slotno] = SLRU_PAGE_VALID; shared->page_dirty[slotno] = true; } - LWLockRelease(&shared->buffer_locks[slotno].lock); + LWLockRelease(&shared->buffer_io_locks[slotno].lock); } } } @@ -415,7 +442,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, shared->page_dirty[slotno] = false; /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ - LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); + LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_EXCLUSIVE); /* Release control lock while doing I/O */ LWLockRelease(shared->ControlLock); @@ -435,7 +462,7 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY; - LWLockRelease(&shared->buffer_locks[slotno].lock); + LWLockRelease(&shared->buffer_io_locks[slotno].lock); /* Now it's okay to ereport if we failed */ if (!ok) @@ -457,36 +484,51 @@ SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, * Return value is the shared-buffer slot number now holding the page. * The buffer's LRU access info is updated. * - * Control lock must NOT be held at entry, but will be held at exit. - * It is unspecified whether the lock will be shared or exclusive. + * Control lock must NOT be held at entry, but will be held in share mode at + * exit. */ int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid) { + return SimpleLruReadPage_optShared(ctl, pageno, true, xid); +} + +int +SimpleLruReadPage_optShared(SlruCtl ctl, int pageno, bool write_ok, + TransactionId xid) +{ SlruShared shared = ctl->shared; int slotno; - /* Try to find the page while holding only shared lock */ - LWLockAcquire(shared->ControlLock, LW_SHARED); - - /* See if page is already in a buffer */ - for (slotno = 0; slotno < shared->num_slots; slotno++) + while (true) { - if (shared->page_number[slotno] == pageno && - shared->page_status[slotno] != SLRU_PAGE_EMPTY && - shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS) + /* Try to find the page while holding only shared lock */ + LWLockAcquire(shared->ControlLock, LW_SHARED); + + /* See if page is already in a buffer */ + for (slotno = 0; slotno < shared->num_slots; slotno++) { - /* See comments for SlruRecentlyUsed macro */ - SlruRecentlyUsed(shared, slotno); - return slotno; + if (shared->page_number[slotno] == pageno && + shared->page_status[slotno] != SLRU_PAGE_EMPTY && + shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS && + (write_ok || + shared->page_status[slotno] != SLRU_PAGE_WRITE_IN_PROGRESS)) + { + /* See comments for SlruRecentlyUsed macro */ + SlruRecentlyUsed(shared, slotno); + return slotno; + } } - } - /* No luck, so switch to normal exclusive lock and do regular read */ - LWLockRelease(shared->ControlLock); - LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + /* No luck, so switch to normal exclusive lock and do regular read */ + LWLockRelease(shared->ControlLock); + LWLockAcquire(shared->ControlLock, LW_EXCLUSIVE); + + SimpleLruReadPage(ctl, pageno, write_ok, xid); - return SimpleLruReadPage(ctl, pageno, true, xid); + /* release lock and try again, to avoid holding the lock exclusively */ + LWLockRelease(shared->ControlLock); + } } /* @@ -531,7 +573,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata) shared->page_dirty[slotno] = false; /* Acquire per-buffer lock (cannot deadlock, see notes at top) */ - LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE); + LWLockAcquire(&shared->buffer_io_locks[slotno].lock, LW_EXCLUSIVE); /* Release control lock while doing I/O */ LWLockRelease(shared->ControlLock); @@ -560,7 +602,7 @@ SlruInternalWritePage(SlruCtl ctl, int slotno, SlruFlush fdata) shared->page_status[slotno] = SLRU_PAGE_VALID; - LWLockRelease(&shared->buffer_locks[slotno].lock); + LWLockRelease(&shared->buffer_io_locks[slotno].lock); /* Now it's okay to ereport if we failed */ if (!ok) @@ -738,11 +780,11 @@ SlruPhysicalWritePage(SlruCtl ctl, int pageno, int slotno, SlruFlush fdata) lsnoff; lsnindex = slotno * shared->lsn_groups_per_page; - max_lsn = shared->group_lsn[lsnindex++]; + max_lsn = (XLogRecPtr) pg_atomic_read_u64(&shared->group_lsn[lsnindex++]); for (lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++) { - XLogRecPtr this_lsn = shared->group_lsn[lsnindex++]; - + XLogRecPtr this_lsn = (XLogRecPtr) + pg_atomic_read_u64(&shared->group_lsn[lsnindex++]); if (max_lsn < this_lsn) max_lsn = this_lsn; } diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f8996cd..8201384 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -47,3 +47,4 @@ CommitTsLock 39 ReplicationOriginLock 40 MultiXactTruncationLock 41 OldSnapshotTimeMapLock 42 +CLogModifyLock 43 diff --git a/src/include/access/slru.h b/src/include/access/slru.h index 5fcebc5..fb8a325 100644 --- a/src/include/access/slru.h +++ b/src/include/access/slru.h @@ -14,6 +14,7 @@ #define SLRU_H #include "access/xlogdefs.h" +#include "port/atomics.h" #include "storage/lwlock.h" @@ -81,7 +82,7 @@ typedef struct SlruSharedData * highest LSN known for a contiguous group of SLRU entries on that slot's * page. */ - XLogRecPtr *group_lsn; + pg_atomic_uint64 *group_lsn; int lsn_groups_per_page; /*---------- @@ -103,10 +104,14 @@ typedef struct SlruSharedData int latest_page_number; /* LWLocks */ - int lwlock_tranche_id; - LWLockTranche lwlock_tranche; - char lwlock_tranche_name[SLRU_MAX_NAME_LENGTH]; - LWLockPadded *buffer_locks; + int lwlock_io_tranche_id; + LWLockTranche lwlock_io_tranche; + char lwlock_io_tranche_name[SLRU_MAX_NAME_LENGTH]; + LWLockPadded *buffer_io_locks; + int lwlock_content_tranche_id; + LWLockTranche lwlock_content_tranche; + char lwlock_content_tranche_name[SLRU_MAX_NAME_LENGTH]; + LWLockPadded *buffer_content_locks; } SlruSharedData; typedef SlruSharedData *SlruShared; @@ -150,6 +155,8 @@ extern int SimpleLruReadPage(SlruCtl ctl, int pageno, bool write_ok, TransactionId xid); extern int SimpleLruReadPage_ReadOnly(SlruCtl ctl, int pageno, TransactionId xid); +extern int SimpleLruReadPage_optShared(SlruCtl ctl, int pageno, + bool write_ok, TransactionId xid); extern void SimpleLruWritePage(SlruCtl ctl, int slotno); extern void SimpleLruFlush(SlruCtl ctl, bool allow_redirtied); extern void SimpleLruTruncate(SlruCtl ctl, int cutoffPage);