>From 4cc9fd429a073cccf648fdb457ed9999534aa58e Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 24 Sep 2013 03:04:44 +0200 Subject: [PATCH 4/4] Wait free LW_SHARED lwlock acquiration --- src/backend/storage/lmgr/lwlock.c | 754 ++++++++++++++++++++++++++------------ 1 file changed, 523 insertions(+), 231 deletions(-) diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 732a5d2..fe8858f 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -17,6 +17,82 @@ * IDENTIFICATION * src/backend/storage/lmgr/lwlock.c * + * NOTES: + * + * This used to be a pretty straight forward reader-writer lock + * implementation, in which the internal state was protected by a + * spinlock. Unfortunately the overhead of taking the spinlock proved to be + * too high for workloads/locks that were locked in shared mode very + * frequently. + * Thus a new implementation was devised that provides wait-free shared lock + * acquiration for locks that aren't exclusively locked. + * + * The basic idea is to have a single atomic variable 'lockcount' instead of + * the formerly separate shared and exclusive counters and to use an atomic + * increment to acquire the lock. That's fairly easy to do for rw-spinlocks, + * but a lot harder for something like LWLocks that want to wait in the OS. + * + * For exlusive lock acquisition we use an atomic compare-and-exchange on the + * lockcount variable swapping in EXCLUSIVE_LOCK/1<<31/0x80000000 if and only + * if the current value of lockcount is 0. If the swap was not successfull, we + * have to wait. + * + * For shared lock acquisition we use an atomic add (lock xadd) to the + * lockcount variable to add 1. If the value is bigger than EXCLUSIVE_LOCK we + * know that somebody actually has an exclusive lock, and we back out by + * atomically decrementing by 1 again. If so, we have to wait for the exlusive + * locker to release the lock. + * + * To release the lock we use an atomic decrement to release the lock. If the + * new value is zero (we get that atomically), we know we have to release + * waiters. + * + * The attentive reader probably might have noticed that naively doing the + * above has two glaring race conditions: + * + * 1) too-quick-for-queueing: We try to lock using the atomic operations and + * notice that we have to wait. Unfortunately until we have finished queuing, + * the former locker very well might have already finished it's work. That's + * problematic because we're now stuck waiting inside the OS. + * + * 2) spurious failed locks: Due to the logic of backing out of shared + * locks after we unconditionally added a 1 to lockcount, we might have + * prevented another exclusive locker from getting the lock: + * 1) Session A: LWLockAcquire(LW_EXCLUSIVE) - success + * 2) Session B: LWLockAcquire(LW_SHARED) - lockcount += 1 + * 3) Session B: LWLockAcquire(LW_SHARED) - oops, bigger than EXCLUSIVE_LOCK + * 4) Session A: LWLockRelease() + * 5) Session C: LWLockAcquire(LW_EXCLUSIVE) - check if lockcount = 0, no. wait. + * 6) Session B: LWLockAcquire(LW_SHARED) - lockcount -= 1 + * 7) Session B: LWLockAcquire(LW_SHARED) - wait + * + * So we'd now have both B) and C) waiting on a lock that nobody is holding + * anymore. Not good. + * + * To mitigate those races we use a two phased attempt at locking: + * Phase 1: * Try to do it atomically, if we succeed, nice + * Phase 2: Add us too the waitqueue of the lock + * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from + * the queue + * Phase 4: Sleep till wakeup, goto Phase 1 + * + * This protects us against both problems from above: + * 1) Nobody can release too quick, before we're queued, since after Phase 2 since we're + * already queued. + * 2) If somebody spuriously got blocked from acquiring the lock, they will + * get queued in Phase 2 and we can wake them up if neccessary or they will + * have gotten the lock in Phase 2. + * + * There above algorithm only works for LWLockAcquire, not directly for + * LWLockAcquireConditional where we don't want to wait. In that case we just + * need to retry acquiring the lock until we're sure we didn't disturb anybody + * in doing so. + * + * TODO: + * - decide if we need a spinlock fallback + * - expand documentation + * - make LWLOCK_STATS do something sensible again + * - make LOCK_DEBUG output nicer *------------------------------------------------------------------------- */ #include "postgres.h" @@ -28,6 +104,7 @@ #include "lib/ilist.h" #include "miscadmin.h" #include "pg_trace.h" +#include "storage/atomics.h" #include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" @@ -37,15 +114,20 @@ /* We use the ShmemLock spinlock to protect LWLockAssign */ extern slock_t *ShmemLock; +#define EXCLUSIVE_LOCK ((uint32) 1 << 31) +/* must be greater than MAX_BACKENDS */ +#define SHARED_LOCK_MASK (~EXCLUSIVE_LOCK) typedef struct LWLock { - slock_t mutex; /* Protects LWLock and queue of PGPROCs */ - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ - dlist_head waiters; - /* tail is undefined when head is NULL */ + slock_t mutex; /* Protects LWLock and queue of PGPROCs */ + bool releaseOK; /* T if ok to release waiters */ + dlist_head waiters; /* list of waiters */ + pg_atomic_uint32 lockcount; /* state of exlusive/nonexclusive lockers */ + pg_atomic_uint32 nwaiters; /* number of waiters */ +#ifdef LWLOCK_DEBUG + PGPROC *owner; +#endif } LWLock; /* @@ -60,7 +142,7 @@ typedef struct LWLock * LWLock is between 16 and 32 bytes on all known platforms, so these two * cases are sufficient. */ -#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32) +#define LWLOCK_PADDED_SIZE 64 typedef union LWLockPadded { @@ -75,7 +157,6 @@ typedef union LWLockPadded */ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; - /* * We use this structure to keep track of locked LWLocks for release * during error recovery. The maximum size could be determined at runtime @@ -84,8 +165,14 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; */ #define MAX_SIMUL_LWLOCKS 100 +typedef struct LWLockHandle +{ + LWLockId lock; + LWLockMode mode; +} LWLockHandle; + static int num_held_lwlocks = 0; -static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; +static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; @@ -102,24 +189,29 @@ static int *spin_delay_counts; bool Trace_lwlocks = false; inline static void -PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) +PRINT_LWDEBUG(const char *where, LWLockId lockid, volatile LWLock *lock, LWLockMode mode) { if (Trace_lwlocks) - elog(LOG, "%s(%d): excl %d shared %d rOK %d", - where, (int) lockid, - (int) lock->exclusive, lock->shared, + { + uint32 lockcount = pg_atomic_read_u32(&lock->lockcount); + elog(LOG, "%s(%d)%u: excl %u shared %u waiters %u rOK %d", + where, (int) lockid, mode, + (lockcount & EXCLUSIVE_LOCK) >> 31, + (lockcount & SHARED_LOCK_MASK), + pg_atomic_read_u32(&lock->nwaiters), (int) lock->releaseOK); + } } inline static void -LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg) +LOG_LWDEBUG(const char *where, LWLockId lockid, LWLockMode mode, const char *msg) { if (Trace_lwlocks) - elog(LOG, "%s(%d): %s", where, (int) lockid, msg); + elog(LOG, "%s(%d)%u: %s", where, (int) lockid, mode, msg); } #else /* not LOCK_DEBUG */ -#define PRINT_LWDEBUG(a,b,c) -#define LOG_LWDEBUG(a,b,c) +#define PRINT_LWDEBUG(a,b,c,d) ((void)0) +#define LOG_LWDEBUG(a,b,c,d) ((void)0) #endif /* LOCK_DEBUG */ #ifdef LWLOCK_STATS @@ -285,8 +377,8 @@ CreateLWLocks(void) { SpinLockInit(&lock->lock.mutex); lock->lock.releaseOK = true; - lock->lock.exclusive = 0; - lock->lock.shared = 0; + pg_atomic_init_u32(&lock->lock.lockcount, 0); + pg_atomic_init_u32(&lock->lock.nwaiters, 0); dlist_init(&lock->lock.waiters); } @@ -328,6 +420,231 @@ LWLockAssign(void) return result; } +/* + * Internal function handling the atomic manipulation of lock->lockcount. + * + * 'double_check' = true means that we try to check more carefully + * against causing somebody else to spuriously believe the lock is + * already taken, although we're just about to back out of it. + */ +static inline bool +LWLockAttemptLock(volatile LWLock* lock, LWLockMode mode, bool double_check, bool *potentially_spurious) +{ + bool mustwait; + uint32 oldstate; + + Assert(mode == LW_EXCLUSIVE || mode == LW_SHARED); + + *potentially_spurious = false; + + if (mode == LW_EXCLUSIVE) + { + uint32 expected = 0; + pg_read_barrier(); + + /* check without CAS first; it's way cheaper, frequently locked otherwise */ + if (pg_atomic_read_u32(&lock->lockcount) != 0) + mustwait = true; + else if (!pg_atomic_compare_exchange_u32(&lock->lockcount, + &expected, EXCLUSIVE_LOCK)) + { + /* + * ok, no can do. Between the pg_atomic_read() above and the + * CAS somebody else acquired the lock. + */ + mustwait = true; + } + else + { + /* yipeyyahee */ + mustwait = false; +#ifdef LWLOCK_DEBUG + lock->owner = MyProc; +#endif + } + } + else + { + /* + * If requested by caller, do an unlocked check first. This is useful + * if potentially spurious results have a noticeable cost. + */ + if (double_check) + { + pg_read_barrier(); + if (pg_atomic_read_u32(&lock->lockcount) >= EXCLUSIVE_LOCK) + { + mustwait = true; + goto out; + } + } + + /* + * Acquire the share lock unconditionally using an atomic addition. We + * might have to back out again if it turns out somebody else has an + * exclusive lock. + */ + oldstate = pg_atomic_fetch_add_u32(&lock->lockcount, 1); + + if (oldstate >= EXCLUSIVE_LOCK) + { + /* + * Ok, somebody else holds the lock exclusively. We need to back + * away from the shared lock, since we don't actually hold it right + * now. Since there's a window between lockcount += 1 and lockcount + * -= 1, the previous exclusive locker could have released and + * another exclusive locker could have seen our +1. We need to + * signal that to the upper layers so they can deal with the race + * condition. + */ + + /* + * FIXME: check return value if (double_check), it's not + * spurious if still exclusively locked. + */ + pg_atomic_fetch_sub_u32(&lock->lockcount, 1); + + + mustwait = true; + *potentially_spurious = true; + } + else + { + /* yipeyyahee */ + mustwait = false; + } + } + +out: + return mustwait; +} + +/* + * Wakeup all the lockers that currently have a chance to run. + */ +static void +LWLockWakeup(volatile LWLock *lock, LWLockId lockid, LWLockMode mode) +{ + bool releaseOK; + bool wokeup_somebody = false; + dlist_head wakeup; + dlist_mutable_iter iter; + + dlist_init(&wakeup); + + /* remove the to-be-awakened PGPROCs from the queue */ + releaseOK = true; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + dlist_foreach_modify(iter, (dlist_head *) &lock->waiters) + { + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + + if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) + continue; + + dlist_delete(&waiter->lwWaitLink); + dlist_push_tail(&wakeup, &waiter->lwWaitLink); + + if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) + { + /* + * Prevent additional wakeups until retryer gets to run. Backends + * that are just waiting for the lock to become free don't retry + * automatically. + */ + releaseOK = false; + /* + * Don't wakeup (further) exclusive locks. + */ + wokeup_somebody = true; + } + + /* + * Once we've woken up an exclusive lock, there's no point in waking + * up anybody else. + */ + if(waiter->lwWaitMode == LW_EXCLUSIVE) + break; + } + lock->releaseOK = releaseOK; + + /* We are done updating shared state of the lock queue. */ + SpinLockRelease(&lock->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + dlist_foreach_modify(iter, (dlist_head *) &wakeup) + { + PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); + LOG_LWDEBUG("LWLockRelease", lockid, mode, "release waiter"); + dlist_delete(&waiter->lwWaitLink); + pg_write_barrier(); + waiter->lwWaiting = false; + PGSemaphoreUnlock(&waiter->sem); + } +} + +/* + * Add ourselves to the end of the queue. Mode can be LW_WAIT_UNTIL_FREE here! + */ +static inline void +LWLockQueueSelf(volatile LWLock *lock, LWLockMode mode) +{ + /* + * If we don't have a PGPROC structure, there's no way to wait. This + * should never occur, since MyProc should only be null during shared + * memory initialization. + */ + if (MyProc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + pg_atomic_fetch_add_u32(&lock->nwaiters, 1); + + SpinLockAcquire(&lock->mutex); + MyProc->lwWaiting = true; + MyProc->lwWaitMode = mode; + dlist_push_tail((dlist_head *) &lock->waiters, &MyProc->lwWaitLink); + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); +} + +/* + * Remove ourselves from the waitlist. This is used if we queued ourselves + * because we thought we needed to sleep but, after further checking, we + * discover that we don't actually need to do so. Somebody else might have + * already woken us up, in that case return false. + */ +static inline bool +LWLockDequeueSelf(volatile LWLock *lock) +{ + bool found = false; + dlist_mutable_iter iter; + + SpinLockAcquire(&lock->mutex); + + /* need to iterate, somebody else could have unqueued us */ + dlist_foreach_modify(iter, (dlist_head *) &lock->waiters) + { + PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur); + if (proc == MyProc) + { + found = true; + dlist_delete(&proc->lwWaitLink); + break; + } + } + + SpinLockRelease(&lock->mutex); + + if (found) + pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + return found; +} /* * LWLockAcquire - acquire a lightweight lock in the specified mode @@ -341,10 +658,13 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); PGPROC *proc = MyProc; - bool retry = false; int extraWaits = 0; + bool potentially_spurious; + uint32 iterations = 0; - PRINT_LWDEBUG("LWLockAcquire", lockid, lock); + AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); + + PRINT_LWDEBUG("LWLockAcquire", lockid, lock, mode); #ifdef LWLOCK_STATS /* Set up local count state first time through in a given process */ @@ -395,58 +715,71 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) { bool mustwait; - /* Acquire mutex. Time spent holding mutex should be short! */ #ifdef LWLOCK_STATS + /* Acquire mutex. Time spent holding mutex should be short! */ + /* FIXME this stuff is completely useless now. Should consider a + * different way to do accounting -- perhaps at LWLockAttemptLock? */ spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex); -#else - SpinLockAcquire(&lock->mutex); #endif - /* If retrying, allow LWLockRelease to release waiters again */ - if (retry) - lock->releaseOK = true; + /* + * try to grab the lock the first time, we're not in the waitqueue yet. + */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious); + + if (!mustwait) + break; /* got the lock */ + + /* + * Ok, at this point we couldn't grab the lock on the first try. We + * cannot simply queue ourselves to the end of the list and wait to be + * woken up because by now the lock could long have been released. + * Instead add us to the queue and try to grab the lock again. If we + * suceed we need to revert the queuing and be happy, otherwise we + * recheck the lock. If we still couldn't grab it, we know that the + * other lock will see our queue entries when releasing since they + * existed before we checked for the lock. + */ + + /* add to the queue */ + LWLockQueueSelf(lock, mode); - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) + /* we're now guaranteed to be woken up if necessary */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious); + + /* ok, grabbed the lock the second time round, need to undo queueing */ + if (!mustwait) { - if (lock->exclusive == 0 && lock->shared == 0) + if (!LWLockDequeueSelf(lock)) { - lock->exclusive++; - mustwait = false; + /* + * Somebody else dequeued us and has or will wake us up. Wait + * for the correct wakeup, otherwise our ->lwWaiting would get + * reset at some inconvenient point later, and releaseOk + * wouldn't be managed correctly. + */ + for (;;) + { + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + lock->releaseOK = true; } - else - mustwait = true; + PRINT_LWDEBUG("LWLockAcquire undo queue", lockid, lock, mode); + break; } else { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; + PRINT_LWDEBUG("LWLockAcquire waiting 4", lockid, lock, mode); } - if (!mustwait) - break; /* got the lock */ - /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. + * NB: There's no need to deal with spurious lock attempts + * here. Anyone we prevented from acquiring the lock will + * enqueue themselves using the same protocol we used here. */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); - - proc->lwWaiting = true; - proc->lwWaitMode = mode; - dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink); - - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); /* * Wait until awakened. @@ -460,7 +793,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockAcquire", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquire", lockid, mode, "waiting"); #ifdef LWLOCK_STATS block_counts[lockid]++; @@ -476,22 +809,22 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) break; extraWaits++; } + lock->releaseOK = true; TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); - LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquire", lockid, mode, "awakened"); /* Now loop back and try to acquire lock again. */ - retry = true; + pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); + iterations++; } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; /* * Fix the process wait semaphore's count for any absorbed wakeups. @@ -512,8 +845,11 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); bool mustwait; + bool potentially_spurious; + + AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE); - PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock, mode); /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) @@ -526,48 +862,42 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } - - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); +retry: + /* + * passing 'true' to check more carefully to avoid potential + * spurious acquisitions + */ + mustwait = LWLockAttemptLock(lock, mode, true, &potentially_spurious); if (mustwait) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); + LOG_LWDEBUG("LWLockConditionalAcquire", lockid, mode, "failed"); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); + + /* + * We ran into an exclusive lock and might have blocked another + * exclusive lock from taking a shot because it took a time to back + * off. Retry till we are either sure we didn't block somebody (because + * somebody else certainly has the lock) or till we got it. + * + * We cannot rely on the two-step lock-acquisition protocol as in + * LWLockAcquire because we're not using it. + */ + if (potentially_spurious) + { + SPIN_DELAY(); + goto retry; + } } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); } - return !mustwait; } @@ -592,8 +922,11 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) PGPROC *proc = MyProc; bool mustwait; int extraWaits = 0; + bool potentially_spurious_first; + bool potentially_spurious_second; - PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock); + Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE); + PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock, mode); #ifdef LWLOCK_STATS /* Set up local count state first time through in a given process */ @@ -612,79 +945,64 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } + /* + * NB: We're using nearly the same twice-in-a-row lock acquisition + * protocol as LWLockAcquire(). Check its comments for details. + */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_first); if (mustwait) { - /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. - */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); + LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - dlist_push_head((dlist_head *) &lock->waiters, &proc->lwWaitLink); + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_second); - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); - - /* - * Wait until awakened. Like in LWLockAcquire, be prepared for bogus - * wakups, because we share the semaphore with ProcWaitForSignal. - */ - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting"); + if (mustwait) + { + /* + * Wait until awakened. Like in LWLockAcquire, be prepared for bogus + * wakups, because we share the semaphore with ProcWaitForSignal. + */ + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + block_counts[lockid]++; #endif + TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + pg_atomic_fetch_sub_u32(&lock->nwaiters, 1); - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "awakened"); + } + else + { + /* got lock in the second attempt, undo queueing */ + if (!LWLockDequeueSelf(lock)) + { + for (;;) + { + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + } - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened"); - } - else - { - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); + /* FIXME: don't need that anymore? */ +#if 0 + LWLockWakeup(lock, lockid, mode); +#endif + } } /* @@ -697,13 +1015,15 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed"); + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "failed"); TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); } else { + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "suceeded"); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); } @@ -717,13 +1037,11 @@ void LWLockRelease(LWLockId lockid) { volatile LWLock *lock = &(LWLockArray[lockid].lock); - dlist_head wakeup; - dlist_mutable_iter iter; int i; - - dlist_init(&wakeup); - - PRINT_LWDEBUG("LWLockRelease", lockid, lock); + LWLockMode mode; + uint32 lockcount; + bool check_waiters; + bool have_waiters = false; /* * Remove lock from list of locks held. Usually, but not always, it will @@ -731,8 +1049,11 @@ LWLockRelease(LWLockId lockid) */ for (i = num_held_lwlocks; --i >= 0;) { - if (lockid == held_lwlocks[i]) + if (lockid == held_lwlocks[i].lock) + { + mode = held_lwlocks[i].mode; break; + } } if (i < 0) elog(ERROR, "lock %d is not held", (int) lockid); @@ -740,78 +1061,50 @@ LWLockRelease(LWLockId lockid) for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); + PRINT_LWDEBUG("LWLockRelease", lockid, lock, mode); - /* Release my hold on lock */ - if (lock->exclusive > 0) - lock->exclusive--; + pg_read_barrier(); + + /* Release my hold on lock, both are a full barrier */ + if (mode == LW_EXCLUSIVE) + lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, EXCLUSIVE_LOCK); else - { - Assert(lock->shared > 0); - lock->shared--; - } + lockcount = pg_atomic_sub_fetch_u32(&lock->lockcount, 1); + + /* nobody else can have that kind of lock */ + Assert(lockcount < EXCLUSIVE_LOCK); /* - * See if I need to awaken any waiters. If I released a non-last shared - * hold, there cannot be anything to do. Also, do not awaken any waiters - * if someone has already awakened waiters that haven't yet acquired the - * lock. + * Anybody we need to wakeup needs to have started queueing before + * we removed ourselves from the queue and the __sync_ operations + * above are full barriers. */ - if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) - { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; - bool wokeup_somebody = false; - - dlist_foreach_modify(iter, (dlist_head *) &lock->waiters) - { - PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - - if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE) - continue; - - dlist_delete(&waiter->lwWaitLink); - dlist_push_tail(&wakeup, &waiter->lwWaitLink); - /* - * Prevent additional wakeups until retryer gets to - * run. Backends that are just waiting for the lock to become - * free don't retry automatically. - */ - if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE) - { - releaseOK = false; - wokeup_somebody = true; - } + if (pg_atomic_read_u32(&lock->nwaiters) > 0) + have_waiters = true; + + /* we're still waiting for backends to get scheduled, don't release again */ + if (!lock->releaseOK) + check_waiters = false; + /* grant permission to run, even if a spurious share lock increases lockcount */ + else if (mode == LW_EXCLUSIVE && have_waiters) + check_waiters = true; + /* nobody has this locked anymore, potential exclusive lockers get a chance */ + else if (lockcount == 0 && have_waiters) + check_waiters = true; + /* nobody queued or not free */ + else + check_waiters = false; - if(waiter->lwWaitMode == LW_EXCLUSIVE) - break; - } - lock->releaseOK = releaseOK; + if (check_waiters) + { + PRINT_LWDEBUG("LWLockRelease releasing", lockid, lock, mode); + LWLockWakeup(lock, lockid, mode); } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); /* - * Awaken any waiters I removed from the queue. - */ - dlist_foreach_modify(iter, (dlist_head *) &wakeup) - { - PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur); - LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); - dlist_delete(&waiter->lwWaitLink); - pg_write_barrier(); - waiter->lwWaiting = false; - PGSemaphoreUnlock(&waiter->sem); - } - - /* * Now okay to allow cancel/die interrupts. */ RESUME_INTERRUPTS(); @@ -834,7 +1127,7 @@ LWLockReleaseAll(void) { HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ - LWLockRelease(held_lwlocks[num_held_lwlocks - 1]); + LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); } } @@ -842,8 +1135,7 @@ LWLockReleaseAll(void) /* * LWLockHeldByMe - test whether my process currently holds a lock * - * This is meant as debug support only. We do not distinguish whether the - * lock is held shared or exclusive. + * This is meant as debug support only. */ bool LWLockHeldByMe(LWLockId lockid) @@ -852,7 +1144,7 @@ LWLockHeldByMe(LWLockId lockid) for (i = 0; i < num_held_lwlocks; i++) { - if (held_lwlocks[i] == lockid) + if (held_lwlocks[i].lock == lockid) return true; } return false; -- 1.8.5.rc1.dirty