>From 18eff6f07848d1e069e9552167285580b3005830 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Fri, 15 Nov 2013 19:00:33 +0100 Subject: [PATCH 3/4] Convert the PGPROC->lwWaitLink list into a dlist instead of open coding it. Besides being shorter and much easier to read it: * fixes a bug in xlog.c:WakeupWaiters() accidentally waking up exclusive lockers * changes the logic in LWLockRelease() to release all shared lockers when waking up any. * adds a memory pg_write_barrier() in the wakeup paths between dequeuing and unsetting ->lwWaiting. On a weakly ordered machine the current coding could lead to problems. --- src/backend/access/transam/twophase.c | 1 - src/backend/access/transam/xlog.c | 127 +++++++++++++--------------------- src/backend/storage/lmgr/lwlock.c | 106 +++++++++++----------------- src/backend/storage/lmgr/proc.c | 2 - src/include/storage/proc.h | 3 +- 5 files changed, 90 insertions(+), 149 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e975f8d..7aeb6e8 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -326,7 +326,6 @@ MarkAsPreparing(TransactionId xid, const char *gid, proc->roleId = owner; proc->lwWaiting = false; proc->lwWaitMode = 0; - proc->lwWaitLink = NULL; proc->waitLock = NULL; proc->waitProcLock = NULL; for (i = 0; i < NUM_LOCK_PARTITIONS; i++) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a95149b..ea2fb93 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -393,8 +393,7 @@ typedef struct bool releaseOK; /* T if ok to release waiters */ char exclusive; /* # of exclusive holders (0 or 1) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ + dlist_head waiters; /* list of waiting PGPROCs */ /* tail is undefined when head is NULL */ } XLogInsertSlot; @@ -1626,12 +1625,7 @@ WALInsertSlotAcquireOne(int slotno) */ proc->lwWaiting = true; proc->lwWaitMode = LW_EXCLUSIVE; - proc->lwWaitLink = NULL; - if (slot->head == NULL) - slot->head = proc; - else - slot->tail->lwWaitLink = proc; - slot->tail = proc; + dlist_push_tail((dlist_head *) &slot->waiters, &proc->lwWaitLink); /* Can release the mutex now */ SpinLockRelease(&slot->mutex); @@ -1746,13 +1740,8 @@ WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) */ proc->lwWaiting = true; proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - proc->lwWaitLink = NULL; - /* waiters are added to the front of the queue */ - proc->lwWaitLink = slot->head; - if (slot->head == NULL) - slot->tail = proc; - slot->head = proc; + dlist_push_head((dlist_head *) &slot->waiters, &proc->lwWaitLink); /* Can release the mutex now */ SpinLockRelease(&slot->mutex); @@ -1804,9 +1793,8 @@ static void WakeupWaiters(XLogRecPtr EndPos) { volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; - PGPROC *head; - PGPROC *proc; - PGPROC *next; + dlist_head wakeup; + dlist_mutable_iter iter; /* * If we have already reported progress up to the same point, do nothing. @@ -1818,6 +1806,8 @@ WakeupWaiters(XLogRecPtr EndPos) /* xlogInsertingAt should not go backwards */ Assert(slot->xlogInsertingAt < EndPos); + dlist_init(&wakeup); + /* Acquire mutex. Time spent holding mutex should be short! */ SpinLockAcquire(&slot->mutex); @@ -1827,25 +1817,19 @@ WakeupWaiters(XLogRecPtr EndPos) slot->xlogInsertingAt = EndPos; /* - * See if there are any waiters that need to be woken up. + * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken + * up. */ - head = slot->head; - - if (head != NULL) + dlist_foreach_modify(iter, (dlist_head *) &slot->waiters) { - proc = head; + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */ - next = proc->lwWaitLink; - while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) - { - proc = next; - next = next->lwWaitLink; - } + if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) + break; - /* proc is now the last PGPROC to be released */ - slot->head = next; - proc->lwWaitLink = NULL; + dlist_delete(&waiter->lwWaitLink); + dlist_push_tail(&wakeup, &waiter->lwWaitLink); } /* We are done updating shared state of the lock itself. */ @@ -1854,13 +1838,13 @@ WakeupWaiters(XLogRecPtr EndPos) /* * Awaken any waiters I removed from the queue. */ - while (head != NULL) + dlist_foreach_modify(iter, (dlist_head *) &wakeup) { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + dlist_delete(&waiter->lwWaitLink); + pg_write_barrier(); + waiter->lwWaiting = false; + PGSemaphoreUnlock(&waiter->sem); } } @@ -1886,8 +1870,11 @@ static void WALInsertSlotReleaseOne(int slotno) { volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot; - PGPROC *head; - PGPROC *proc; + dlist_head wakeup; + dlist_mutable_iter iter; + bool releaseOK = true; + + dlist_init(&wakeup); /* Acquire mutex. Time spent holding mutex should be short! */ SpinLockAcquire(&slot->mutex); @@ -1904,43 +1891,28 @@ WALInsertSlotReleaseOne(int slotno) /* * See if I need to awaken any waiters.. */ - head = slot->head; - if (head != NULL) + dlist_foreach_modify(iter, (dlist_head *) &slot->waiters) { - if (slot->releaseOK) - { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - proc = head; + dlist_delete(&waiter->lwWaitLink); + dlist_push_tail(&wakeup, &waiter->lwWaitLink); - /* - * First wake up any backends that want to be woken up without - * acquiring the lock. These are always in the front of the queue. - */ - while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) - proc = proc->lwWaitLink; - - /* - * Awaken the first exclusive-waiter, if any. - */ - if (proc->lwWaitLink) - { - Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE); - proc = proc->lwWaitLink; - releaseOK = false; - } - /* proc is now the last PGPROC to be released */ - slot->head = proc->lwWaitLink; - proc->lwWaitLink = NULL; + /* + * First wake up any backends that want to be woken up without + * acquiring the lock. These are always in the front of the + * queue. Then wakeup the first exclusive-waiter, if any. + */ - slot->releaseOK = releaseOK; + /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */ + if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) + { + Assert(waiter->lwWaitMode == LW_EXCLUSIVE); + releaseOK = false; + break; } - else - head = NULL; } + slot->releaseOK = releaseOK; /* We are done updating shared state of the slot itself. */ SpinLockRelease(&slot->mutex); @@ -1948,13 +1920,13 @@ WALInsertSlotReleaseOne(int slotno) /* * Awaken any waiters I removed from the queue. */ - while (head != NULL) + dlist_foreach_modify(iter, (dlist_head *) &wakeup) { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + dlist_delete(&waiter->lwWaitLink); + pg_write_barrier(); + waiter->lwWaiting = false; + PGSemaphoreUnlock(&waiter->sem); } /* @@ -5104,8 +5076,7 @@ XLOGShmemInit(void) slot->releaseOK = true; slot->exclusive = 0; - slot->head = NULL; - slot->tail = NULL; + dlist_init(&slot->waiters); } SpinLockInit(&XLogCtl->Insert.insertpos_lck); diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 4f88d3f..732a5d2 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -25,6 +25,7 @@ #include "access/multixact.h" #include "access/subtrans.h" #include "commands/async.h" +#include "lib/ilist.h" #include "miscadmin.h" #include "pg_trace.h" #include "storage/ipc.h" @@ -43,8 +44,7 @@ typedef struct LWLock bool releaseOK; /* T if ok to release waiters */ char exclusive; /* # of exclusive holders (0 or 1) */ int shared; /* # of shared holders (0..MaxBackends) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ + dlist_head waiters; /* tail is undefined when head is NULL */ } LWLock; @@ -105,9 +105,9 @@ inline static void PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) { if (Trace_lwlocks) - elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d", + elog(LOG, "%s(%d): excl %d shared %d rOK %d", where, (int) lockid, - (int) lock->exclusive, lock->shared, lock->head, + (int) lock->exclusive, lock->shared, (int) lock->releaseOK); } @@ -287,8 +287,7 @@ CreateLWLocks(void) lock->lock.releaseOK = true; lock->lock.exclusive = 0; lock->lock.shared = 0; - lock->lock.head = NULL; - lock->lock.tail = NULL; + dlist_init(&lock->lock.waiters); } /* @@ -444,12 +443,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) proc->lwWaiting = true; proc->lwWaitMode = mode; - proc->lwWaitLink = NULL; - if (lock->head == NULL) - lock->head = proc; - else - lock->tail->lwWaitLink = proc; - lock->tail = proc; + dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink); /* Can release the mutex now */ SpinLockRelease(&lock->mutex); @@ -657,12 +651,7 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) proc->lwWaiting = true; proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - proc->lwWaitLink = NULL; - if (lock->head == NULL) - lock->head = proc; - else - lock->tail->lwWaitLink = proc; - lock->tail = proc; + dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink); /* Can release the mutex now */ SpinLockRelease(&lock->mutex); @@ -728,10 +717,12 @@ void LWLockRelease(LWLockId lockid) { volatile LWLock *lock = &(LWLockArray[lockid].lock); - PGPROC *head; - PGPROC *proc; + dlist_head wakeup; + dlist_mutable_iter iter; int i; + dlist_init(&wakeup); + PRINT_LWDEBUG("LWLockRelease", lockid, lock); /* @@ -767,58 +758,39 @@ LWLockRelease(LWLockId lockid) * if someone has already awakened waiters that haven't yet acquired the * lock. */ - head = lock->head; - if (head != NULL) + if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) { - if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) + /* + * Remove the to-be-awakened PGPROCs from the queue. + */ + bool releaseOK = true; + bool wokeup_somebody = false; + + dlist_foreach_modify(iter, (dlist_head *) &lock->waiters) { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - proc = head; + if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) + continue; - /* - * First wake up any backends that want to be woken up without - * acquiring the lock. - */ - while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) - proc = proc->lwWaitLink; + dlist_delete(&waiter->lwWaitLink); + dlist_push_tail(&wakeup, &waiter->lwWaitLink); /* - * If the front waiter wants exclusive lock, awaken him only. - * Otherwise awaken as many waiters as want shared access. + * Prevent additional wakeups until retryer gets to + * run. Backends that are just waiting for the lock to become + * free don't retry automatically. */ - if (proc->lwWaitMode != LW_EXCLUSIVE) + if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) { - while (proc->lwWaitLink != NULL && - proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE) - { - if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) - releaseOK = false; - proc = proc->lwWaitLink; - } - } - /* proc is now the last PGPROC to be released */ - lock->head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - - /* - * Prevent additional wakeups until retryer gets to run. Backends - * that are just waiting for the lock to become free don't retry - * automatically. - */ - if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) releaseOK = false; + wokeup_somebody = true; + } - lock->releaseOK = releaseOK; - } - else - { - /* lock is still held, can't awaken anything */ - head = NULL; + if(waiter->lwWaitMode == LW_EXCLUSIVE) + break; } + lock->releaseOK = releaseOK; } /* We are done updating shared state of the lock itself. */ @@ -829,14 +801,14 @@ LWLockRelease(LWLockId lockid) /* * Awaken any waiters I removed from the queue. */ - while (head != NULL) + dlist_foreach_modify(iter, (dlist_head *) &wakeup) { + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); + dlist_delete(&waiter->lwWaitLink); + pg_write_barrier(); + waiter->lwWaiting = false; + PGSemaphoreUnlock(&waiter->sem); } /* diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 222251d..75bddcd 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -370,7 +370,6 @@ InitProcess(void) MyPgXact->vacuumFlags |= PROC_IS_AUTOVACUUM; MyProc->lwWaiting = false; MyProc->lwWaitMode = 0; - MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; #ifdef USE_ASSERT_CHECKING @@ -533,7 +532,6 @@ InitAuxiliaryProcess(void) MyPgXact->vacuumFlags = 0; MyProc->lwWaiting = false; MyProc->lwWaitMode = 0; - MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; #ifdef USE_ASSERT_CHECKING diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 3b04d3c..73cea1c 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -15,6 +15,7 @@ #define _PROC_H_ #include "access/xlogdefs.h" +#include "lib/ilist.h" #include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" @@ -101,7 +102,7 @@ struct PGPROC /* Info about LWLock the process is currently waiting for, if any. */ bool lwWaiting; /* true if waiting for an LW lock */ uint8 lwWaitMode; /* lwlock mode being waited for */ - struct PGPROC *lwWaitLink; /* next waiter for same LW lock */ + dlist_node lwWaitLink; /* next waiter for same LW lock */ /* Info about lock the process is currently waiting for, if any. */ /* waitLock and waitProcLock are NULL if not currently waiting. */ -- 1.8.5.rc1.dirty