diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index a6fda81feb6..9b99e5e3ce6 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -736,7 +736,7 @@ GetLWLockIdentifier(uint32 classId, uint16 eventId) * Returns true if the lock isn't free and we need to wait. */ static bool -LWLockAttemptLock(LWLock *lock, LWLockMode mode) +LWLockAttemptLock(LWLock *lock, LWLockMode mode, bool wakeup) { uint32 old_state; @@ -764,7 +764,10 @@ LWLockAttemptLock(LWLock *lock, LWLockMode mode) } else { - lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0; + if (wakeup) + lock_free = (old_state & LW_VAL_EXCLUSIVE) == 0; + else + lock_free = (old_state & (LW_VAL_EXCLUSIVE | LW_FLAG_HAS_WAITERS)) == 0; if (lock_free) desired_state += LW_VAL_SHARED; } @@ -978,9 +981,11 @@ LWLockWakeup(LWLock *lock) * * NB: Mode can be LW_WAIT_UNTIL_FREE here! */ -static void +static bool LWLockQueueSelf(LWLock *lock, LWLockMode mode) { + bool first; + /* * If we don't have a PGPROC structure, there's no way to wait. This * should never occur, since MyProc should only be null during shared @@ -1002,9 +1007,15 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode) /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */ if (mode == LW_WAIT_UNTIL_FREE) + { + first = true; proclist_push_head(&lock->waiters, MyProc->pgprocno, lwWaitLink); + } else + { + first = proclist_is_empty(&lock->waiters); proclist_push_tail(&lock->waiters, MyProc->pgprocno, lwWaitLink); + } /* Can release the mutex now */ LWLockWaitListUnlock(lock); @@ -1013,6 +1024,7 @@ LWLockQueueSelf(LWLock *lock, LWLockMode mode) pg_atomic_fetch_add_u32(&lock->nwaiters, 1); #endif + return first; } /* @@ -1177,13 +1189,13 @@ LWLockAcquire(LWLock *lock, LWLockMode mode) */ for (;;) { - bool mustwait; + bool mustwait, first; /* * Try to grab the lock the first time, we're not in the waitqueue * yet/anymore. */ - mustwait = LWLockAttemptLock(lock, mode); + mustwait = LWLockAttemptLock(lock, mode, !result); if (!mustwait) { @@ -1203,10 +1215,10 @@ LWLockAcquire(LWLock *lock, LWLockMode mode) */ /* add to the queue */ - LWLockQueueSelf(lock, mode); + first = LWLockQueueSelf(lock, mode); /* we're now guaranteed to be woken up if necessary */ - mustwait = LWLockAttemptLock(lock, mode); + mustwait = LWLockAttemptLock(lock, mode, (!result) || first); /* ok, grabbed the lock the second time round, need to undo queueing */ if (!mustwait) @@ -1310,7 +1322,7 @@ LWLockConditionalAcquire(LWLock *lock, LWLockMode mode) HOLD_INTERRUPTS(); /* Check for the lock */ - mustwait = LWLockAttemptLock(lock, mode); + mustwait = LWLockAttemptLock(lock, mode, false); if (mustwait) { @@ -1375,13 +1387,13 @@ LWLockAcquireOrWait(LWLock *lock, LWLockMode mode) * NB: We're using nearly the same twice-in-a-row lock acquisition * protocol as LWLockAcquire(). Check its comments for details. */ - mustwait = LWLockAttemptLock(lock, mode); + mustwait = LWLockAttemptLock(lock, mode, true); if (mustwait) { LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); - mustwait = LWLockAttemptLock(lock, mode); + mustwait = LWLockAttemptLock(lock, mode, true); if (mustwait) {