>From 8189689b71e5cd8129587345aa6e7b6e0231c6f8 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 24 Sep 2013 03:04:44 +0200 Subject: [PATCH] Wait free LW_SHARED lwlock acquiration --- src/backend/access/transam/xlog.c | 9 +- src/backend/storage/lmgr/lwlock.c | 863 +++++++++++++++++++++++++++----------- 2 files changed, 617 insertions(+), 255 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index fc495d6..07e510b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1620,7 +1620,8 @@ WALInsertSlotAcquireOne(int slotno) break; /* got the lock */ Assert(slot->owner != MyProc); - + Assert(!proc->lwWaiting); + Assert(proc->lwWaitLink == NULL); /* * Add myself to wait queue. */ @@ -1630,7 +1631,10 @@ WALInsertSlotAcquireOne(int slotno) if (slot->head == NULL) slot->head = proc; else + { + Assert(slot->tail->lwWaitLink == NULL); slot->tail->lwWaitLink = proc; + } slot->tail = proc; /* Can release the mutex now */ @@ -1741,6 +1745,9 @@ WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) Assert(slot->owner != MyProc); + Assert(!proc->lwWaiting); + Assert(proc->lwWaitLink == NULL); + /* * Add myself to wait queue. */ diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 4f88d3f..c7fffec 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -17,6 +17,14 @@ * IDENTIFICATION * src/backend/storage/lmgr/lwlock.c * + * TODO: + * - convert lwWaitLink to ilist.h slist or even dlist. + * - remove LWLockWakeup dealing with MyProc + * - revive pure-spinlock implementation + * - abstract away atomic ops, we really only need a few. + * - CAS + * - LOCK XADD + * - overhaul the mask offsets, make SHARED/EXCLUSIVE_LOCK_MASK wider, MAX_BACKENDS *------------------------------------------------------------------------- */ #include "postgres.h" @@ -27,22 +35,29 @@ #include "commands/async.h" #include "miscadmin.h" #include "pg_trace.h" +#include "storage/barrier.h" #include "storage/ipc.h" #include "storage/predicate.h" #include "storage/proc.h" #include "storage/spin.h" - /* We use the ShmemLock spinlock to protect LWLockAssign */ extern slock_t *ShmemLock; +#define EXCLUSIVE_LOCK_FINALIZED ((uint32)1<<31) +#define EXCLUSIVE_LOCK_MASK (~(uint32)((1<<16)-1)) +/* XXX: must be bigger than MAX_BACKENDS */ +#define SHARED_LOCK_MASK ((uint32)((1<<16)-1)) typedef struct LWLock { slock_t mutex; /* Protects LWLock and queue of PGPROCs */ + uint32 lockcount; /* magic */ bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - int shared; /* # of shared holders (0..MaxBackends) */ + uint32 waiters; +#ifdef LWLOCK_DEBUG + PGPROC *owner; +#endif PGPROC *head; /* head of list of waiting PGPROCs */ PGPROC *tail; /* tail of list of waiting PGPROCs */ /* tail is undefined when head is NULL */ @@ -60,7 +75,7 @@ typedef struct LWLock * LWLock is between 16 and 32 bytes on all known platforms, so these two * cases are sufficient. */ -#define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32) +#define LWLOCK_PADDED_SIZE 64 typedef union LWLockPadded { @@ -75,7 +90,6 @@ typedef union LWLockPadded */ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; - /* * We use this structure to keep track of locked LWLocks for release * during error recovery. The maximum size could be determined at runtime @@ -84,12 +98,21 @@ NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL; */ #define MAX_SIMUL_LWLOCKS 100 +struct LWLockHandle +{ + LWLockId lock; + LWLockMode mode; +}; + static int num_held_lwlocks = 0; -static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS]; +static struct LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; +/* read value from memory, does *not* have any barrier semantics */ +#define pg_atomic_read(atomic) (*(volatile uint32 *)&(atomic)) + #ifdef LWLOCK_STATS static int counts_for_pid = 0; static int *sh_acquire_counts; @@ -102,24 +125,27 @@ static int *spin_delay_counts; bool Trace_lwlocks = false; inline static void -PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock) +PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock, LWLockMode mode) { if (Trace_lwlocks) - elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d", - where, (int) lockid, - (int) lock->exclusive, lock->shared, lock->head, + elog(LOG, "%s(%d)%u: excl %u shared %u head %p waiters %u rOK %d", + where, (int) lockid, mode, + (lock->lockcount & EXCLUSIVE_LOCK_MASK) >> 31, + lock->lockcount & SHARED_LOCK_MASK, + lock->head, + lock->waiters, (int) lock->releaseOK); } inline static void -LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg) +LOG_LWDEBUG(const char *where, LWLockId lockid, LWLockMode mode, const char *msg) { if (Trace_lwlocks) - elog(LOG, "%s(%d): %s", where, (int) lockid, msg); + elog(LOG, "%s(%d)%u: %s", where, (int) lockid, mode, msg); } #else /* not LOCK_DEBUG */ -#define PRINT_LWDEBUG(a,b,c) -#define LOG_LWDEBUG(a,b,c) +#define PRINT_LWDEBUG(a,b,c,d) (void)0 +#define LOG_LWDEBUG(a,b,c,d) (void)0 #endif /* LOCK_DEBUG */ #ifdef LWLOCK_STATS @@ -285,8 +311,8 @@ CreateLWLocks(void) { SpinLockInit(&lock->lock.mutex); lock->lock.releaseOK = true; - lock->lock.exclusive = 0; - lock->lock.shared = 0; + lock->lock.lockcount = 0; + lock->lock.waiters = 0; lock->lock.head = NULL; lock->lock.tail = NULL; } @@ -329,6 +355,364 @@ LWLockAssign(void) return result; } +/* + * Internal function handling the atomic manipulation of lock->lockcount. + * + * 'double_check' = true means that we try to check more carefully + * against causing somebody else to spuriously believe the lock is + * already taken, although we're just about to back out of it. + */ +static inline bool +LWLockAttemptLock(volatile LWLock* lock, LWLockMode mode, bool double_check, bool *potentially_spurious) +{ + bool mustwait; + uint32 oldstate; + + if (mode == LW_EXCLUSIVE) + { + pg_read_barrier(); + /* check without CAS, way cheaper, frequently locked otherwise */ + if (pg_atomic_read(lock->lockcount) != 0) + mustwait = true; + /* + * ok, no can do. Between the pg_atomic_read() above and the + * CAS somebody else acquired the lock. + */ + else if (__sync_val_compare_and_swap(&lock->lockcount, + 0, EXCLUSIVE_LOCK_FINALIZED) != 0) + { + mustwait = true; + } + /* yipeyyahee */ + else + { + mustwait = false; +#ifdef LWLOCK_DEBUG + lock->owner = MyProc; +#endif + } + } + else + { + /* + * Do an unlocked check first, useful if potentially spurious + * results have a noticeable cost + */ + if (double_check) + { + pg_read_barrier(); + if (pg_atomic_read(lock->lockcount) >= EXCLUSIVE_LOCK_FINALIZED) + { + mustwait = true; + goto out; + } + } + + /* + * Acquire the share lock unconditionally using an atomic + * addition. We might have to back out again if it turns out + * somebody else has an exclusive lock. + */ + oldstate = __sync_fetch_and_add(&lock->lockcount, 1); + + /* + * ok, somebody else holds the lock exclusively. We need to + * back away from the shared lock, since we don't actually + * hold it right now. Since there's a window between lockcount + * += 1 and logcount -= 1 the previous exclusive locker could + * have released and another exclusive locker could have seen + * our +1. We need to signal that to the upper layers so they + * can deal with the race condition. + */ + if (oldstate >= EXCLUSIVE_LOCK_FINALIZED) + { + /* + * FIXME: check return value if (double_check), it's not + * spurious if still exclusively locked. + */ + __sync_fetch_and_sub(&lock->lockcount, 1); + mustwait = true; + *potentially_spurious = true; + } + /* yipeyyahee */ + else + mustwait = false; + } + +out: + return mustwait; +} + +/* + * Wakeup all the lockers that currently have a chance to run. + */ +static void +LWLockWakeup(volatile LWLock* lock, LWLockId lockid, LWLockMode mode) +{ + /* + * Remove the to-be-awakened PGPROCs from the queue. + */ + bool releaseOK = true; + PGPROC *cur; + PGPROC *last = NULL; + PGPROC *next = NULL; + PGPROC **wakeup; + size_t nwakeup = 0; + size_t wakeup_slots; + bool woke_up_shared = false; + int i; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + Assert((lock->head != NULL && lock->tail != NULL) || + lock->head == NULL); + + cur = lock->head; + + if (cur == NULL) + { +#ifdef LOCK_DEBUG + /* + * this can legally happen since we increase lock->waiters in + * LWLockQueueLock before queueing, but it can also indicate a + * bug, so it's usefull to display this when tracing. + */ + if (pg_atomic_read(lock->waiters) > 0) + { + PRINT_LWDEBUG("LWLockWakeup - no head, but waiters", lockid, lock, 0); + } +#endif + /* nothing to do, can't be out of date because of the spinlock */ + goto unlock; + } + + wakeup_slots = pg_atomic_read(lock->waiters); + wakeup = alloca(wakeup_slots * sizeof(PGPROC *)); + + while (cur != NULL) + { + /* panic because others will be stuck in a semaphore */ + if (nwakeup >= wakeup_slots) + elog(PANIC, "too many waiters"); + + /* wakeup all shared locks we find, unless we've found an exclusive lock first */ + if (cur != MyProc && cur->lwWaitMode == LW_SHARED) + { + woke_up_shared = true; + wakeup[nwakeup++] = cur; + /* + * don't want to wakeup again before any of the woken up + * entries got scheduled. + */ + releaseOK = false; + } + /* + * wakeup all UNTIL_FREE entries until we find a shared lock, + * even if there were shared locks before + */ + else if (cur != MyProc && cur->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + wakeup[nwakeup++] = cur; + } + + /* + * Exlusive lock. Wake this one up, but no further ones since + * they wouldn't be able to acquire the lock anyway until the + * exclusive lock releases. + */ + else if (cur != MyProc && cur->lwWaitMode == LW_EXCLUSIVE && !woke_up_shared) + { + wakeup[nwakeup++] = cur; + + /* repoint lock->head since we're removing the current one */ + if (last == NULL) + lock->head = cur->lwWaitLink; + /* unlink current node from chain */ + else + last->lwWaitLink = cur->lwWaitLink; + + /* + * removing the last node, we need to repoint ->tail, + * otherwise appending won't work. + */ + if (cur == lock->tail) + lock->tail = last; + + Assert((lock->head != NULL && lock->tail != NULL) || + lock->head == NULL); + + cur->lwWaitLink = NULL; + /* + * don't want to wakeup again before any of the woken up + * entries got scheduled. + */ + releaseOK = false; + break; + } + /* won't be woken up, but we want to look at the later nodes */ + else + { + last = cur; + cur = cur->lwWaitLink; + continue; + } + + /* remove current from linked list */ + + /* current is the first node */ + if (last == NULL) + lock->head = cur->lwWaitLink; + else + last->lwWaitLink = cur->lwWaitLink; + + /* current is the end of the list, repoint tail */ + if (cur == lock->tail) + lock->tail = last; + + next = cur->lwWaitLink; + /* mark us not being part of the list, but do not yet wake up */ + cur->lwWaitLink = NULL; + cur = next; + } + + Assert((lock->head != NULL && lock->tail != NULL) || + lock->head == NULL); + +unlock: + lock->releaseOK = releaseOK; + + /* We are done updating shared state of the lock queue. */ + SpinLockRelease(&lock->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + for (i = 0; i < nwakeup; i++) + { + PGPROC *proc = wakeup[i]; + + Assert(proc != MyProc); + Assert(lock->head != proc); + Assert(lock->tail != proc); + + /* + * unset waiting, doing so only now allows some more error + * checking to be done elsewhere + */ + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } +} + +/* + * Add ourselves to the end of the queue. Mode can be + * LW_WAIT_UNTIL_FREE here! + */ +static inline void +LWLockQueueSelf(volatile LWLock *lock, LWLockMode mode) +{ + /* + * If we don't have a PGPROC structure, there's no way to wait. This + * should never occur, since MyProc should only be null during shared + * memory initialization. + */ + if (MyProc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + __sync_fetch_and_add(&lock->waiters, 1); + + SpinLockAcquire(&lock->mutex); + + /* consistent list state */ + Assert((lock->head != NULL && lock->tail != NULL) || + lock->head == NULL); + /* we can only be part of one queue at a time */ + Assert(!MyProc->lwWaiting); + /* quick and dirty check for repeated queueing */ + Assert(lock->head != MyProc); + Assert(lock->tail != MyProc); + + MyProc->lwWaiting = true; + MyProc->lwWaitMode = mode; + MyProc->lwWaitLink = NULL; + + /* no list there yet */ + if (lock->head == NULL) + lock->head = MyProc; + /* to the end with it */ + else + { + Assert(lock->tail != NULL); + Assert(lock->tail->lwWaitLink == NULL); + + lock->tail->lwWaitLink = MyProc; + Assert(lock->tail->lwWaitLink != lock->tail); + } + lock->tail = MyProc; + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); +} + +/* + * Go ahead and remove ourselves from somewhere on the waitlist after + * we've discovered that we don't actually need to be on it after some + * more checking. Somebody else might have already woken us up, in + * that case return false. + */ +static inline bool +LWLockUnqueueSelf(volatile LWLock *lock) +{ + PGPROC *proc; + PGPROC *prev = NULL; + bool found = false; + + SpinLockAcquire(&lock->mutex); + proc = lock->head; + + while (proc != NULL) + { + if (proc == MyProc) + { + /* removing first element */ + if (prev == NULL) + lock->head = proc->lwWaitLink; + /* any other */ + else + { + prev->lwWaitLink = proc->lwWaitLink; + Assert(prev->lwWaitLink != prev); + } + + /* removing last element, repoint tail */ + if (proc == lock->tail) + lock->tail = prev; + + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + found = true; + break; + } + prev = proc; + proc = proc->lwWaitLink; + } + + Assert((lock->head != NULL && lock->tail != NULL) || + lock->head == NULL); + + SpinLockRelease(&lock->mutex); + + /* + * We cannot still be part of a queue even if somebody else woke + * us up, but we still can be marked as lwWaiting. + */ + Assert(MyProc->lwWaitLink == NULL); + + if (found) + __sync_fetch_and_sub(&lock->waiters, 1); + return found; +} /* * LWLockAcquire - acquire a lightweight lock in the specified mode @@ -342,10 +726,11 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); PGPROC *proc = MyProc; - bool retry = false; int extraWaits = 0; + bool potentially_spurious; + uint32 iterations = 0; - PRINT_LWDEBUG("LWLockAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockAcquire", lockid, lock, mode); #ifdef LWLOCK_STATS /* Set up local count state first time through in a given process */ @@ -395,64 +780,69 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) for (;;) { bool mustwait; - /* Acquire mutex. Time spent holding mutex should be short! */ #ifdef LWLOCK_STATS spin_delay_counts[lockid] += SpinLockAcquire(&lock->mutex); #else - SpinLockAcquire(&lock->mutex); #endif - /* If retrying, allow LWLockRelease to release waiters again */ - if (retry) - lock->releaseOK = true; + /* try to grab the lock the first time, we're not in the waitqueue yet */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious); + + if (!mustwait) + break; /* got the lock */ + + /* + * Ok, at this point we couldn't grab the lock on the first + * try. We cannot simply queue ourselves to the end of the + * list and wait to be woken up because by now the lock could + * long have been released. Instead add us to the queue and + * try to grab the lock again. If we suceed we need to revert + * the queuing and be happy, otherwise we recheck the lock. If + * we still couldn't grab it, we know that the other lock will + * see our queue entries when releasing since they existed + * before we checked for the lock. + */ + + /* add to the queue */ + LWLockQueueSelf(lock, mode); + + /* we're now guaranteed to be woken up if neccessary */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious); - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) + /* ok, grabbed the lock the second time round, need to undo queueing */ + if (!mustwait) { - if (lock->exclusive == 0 && lock->shared == 0) + if (!LWLockUnqueueSelf(lock)) { - lock->exclusive++; - mustwait = false; + /* + * somebody else unqueued us and has or will wake us + * up. Wait for the correct wakeup, otherwise our + * ->lwWaiting might get at some unconvenient point + * later, and releaseOk wouldn't be managed correctly. + */ + for (;;) + { + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + lock->releaseOK = true; } - else - mustwait = true; + PRINT_LWDEBUG("LWLockAcquire undo queue", lockid, lock, mode); + break; } else { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; + PRINT_LWDEBUG("LWLockAcquire waiting 4", lockid, lock, mode); } - if (!mustwait) - break; /* got the lock */ - /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. + * NB: There's no need to deal with spurious lock attempts + * here. Anyone we prevented from acquiring the lock will + * enqueue themselves using the same protocol we used here. */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); - - proc->lwWaiting = true; - proc->lwWaitMode = mode; - proc->lwWaitLink = NULL; - if (lock->head == NULL) - lock->head = proc; - else - lock->tail->lwWaitLink = proc; - lock->tail = proc; - - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); /* * Wait until awakened. @@ -466,7 +856,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) * so that the lock manager or signal manager will see the received * signal when it next waits. */ - LOG_LWDEBUG("LWLockAcquire", lockid, "waiting"); + LOG_LWDEBUG("LWLockAcquire", lockid, mode, "waiting"); #ifdef LWLOCK_STATS block_counts[lockid]++; @@ -482,28 +872,34 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) break; extraWaits++; } + lock->releaseOK = true; TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); - LOG_LWDEBUG("LWLockAcquire", lockid, "awakened"); + LOG_LWDEBUG("LWLockAcquire", lockid, mode, "awakened"); /* Now loop back and try to acquire lock again. */ - retry = true; + __sync_fetch_and_sub(&lock->waiters, 1); + iterations++; } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; /* * Fix the process wait semaphore's count for any absorbed wakeups. */ while (extraWaits-- > 0) PGSemaphoreUnlock(&proc->sem); + + /* some minor consistency checks */ + Assert(MyProc == NULL || MyProc->lwWaitLink == NULL); + Assert(MyProc == NULL || !MyProc->lwWaiting); + Assert(MyProc == NULL || lock->head != MyProc); + Assert(MyProc == NULL || lock->tail != MyProc); } /* @@ -518,8 +914,9 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) { volatile LWLock *lock = &(LWLockArray[lockid].lock); bool mustwait; + bool potentially_spurious; - PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock); + PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock, mode); /* Ensure we will have room to remember the lock */ if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) @@ -531,49 +928,51 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) * manipulations of data structures in shared memory. */ HOLD_INTERRUPTS(); - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } - - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); +retry: + /* + * passing 'true' to check more carefully to avoid potential + * spurious acquirations + */ + mustwait = LWLockAttemptLock(lock, mode, true, &potentially_spurious); if (mustwait) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed"); + LOG_LWDEBUG("LWLockConditionalAcquire", lockid, mode, "failed"); TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode); + + /* + * We ran into an exclusive lock and might have blocked + * another exclusive lock from taking a shot because it took a + * time to back off. Retry till we are either sure we didn't + * block somebody (because somebody else securely has the + * lock) or till we got it. + * + * We cannot rely on the two-step lock-acquiration protocol as + * in LWLockAcquire because we're not using it. + * + * Retry until we're sure no spurious acquiration happened. + */ + if (potentially_spurious) + { + SPIN_DELAY(); + goto retry; + } } else { /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode); } + Assert(MyProc == NULL || MyProc->lwWaitLink == NULL); + Assert(MyProc == NULL || !MyProc->lwWaiting); + Assert(MyProc == NULL || lock->head != MyProc); + Assert(MyProc == NULL || lock->tail != MyProc); + return !mustwait; } @@ -598,8 +997,10 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) PGPROC *proc = MyProc; bool mustwait; int extraWaits = 0; + bool potentially_spurious_first; + bool potentially_spurious_second; - PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock); + PRINT_LWDEBUG("LWLockAcquireOrWait", lockid, lock, mode); #ifdef LWLOCK_STATS /* Set up local count state first time through in a given process */ @@ -618,84 +1019,66 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) */ HOLD_INTERRUPTS(); - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); - - /* If I can get the lock, do so quickly. */ - if (mode == LW_EXCLUSIVE) - { - if (lock->exclusive == 0 && lock->shared == 0) - { - lock->exclusive++; - mustwait = false; - } - else - mustwait = true; - } - else - { - if (lock->exclusive == 0) - { - lock->shared++; - mustwait = false; - } - else - mustwait = true; - } + /* + * NB: We're using nearly the same twice-in-a-row lock acquiration + * protocol as LWLockAcquire(). Check it's comments for details. + */ + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_first); if (mustwait) { - /* - * Add myself to wait queue. - * - * If we don't have a PGPROC structure, there's no way to wait. This - * should never occur, since MyProc should only be null during shared - * memory initialization. - */ - if (proc == NULL) - elog(PANIC, "cannot wait without a PGPROC structure"); - - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - proc->lwWaitLink = NULL; - if (lock->head == NULL) - lock->head = proc; - else - lock->tail->lwWaitLink = proc; - lock->tail = proc; + LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE); - /* Can release the mutex now */ - SpinLockRelease(&lock->mutex); + mustwait = LWLockAttemptLock(lock, mode, false, &potentially_spurious_second); - /* - * Wait until awakened. Like in LWLockAcquire, be prepared for bogus - * wakups, because we share the semaphore with ProcWaitForSignal. - */ - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "waiting"); + if (mustwait) + { + /* + * Wait until awakened. Like in LWLockAcquire, be prepared for bogus + * wakups, because we share the semaphore with ProcWaitForSignal. + */ + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "waiting"); #ifdef LWLOCK_STATS - block_counts[lockid]++; + block_counts[lockid]++; #endif + TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); - TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } + Assert(lock->head != MyProc); + Assert(lock->tail != MyProc); - TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + __sync_fetch_and_sub(&lock->waiters, 1); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "awakened"); - } - else - { - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "awakened"); + } + else + { + /* got lock in the second attempt, undo queueing */ + if (!LWLockUnqueueSelf(lock)) + { + for (;;) + { + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + } + + /* FIXME: don't need that anymore? */ + //LWLockWakeup(lock, lockid, mode); + } } /* @@ -708,16 +1091,23 @@ LWLockAcquireOrWait(LWLockId lockid, LWLockMode mode) { /* Failed to get lock, so release interrupt holdoff */ RESUME_INTERRUPTS(); - LOG_LWDEBUG("LWLockAcquireOrWait", lockid, "failed"); + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "failed"); TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); } else { + LOG_LWDEBUG("LWLockAcquireOrWait", lockid, mode, "suceeded"); /* Add lock to list of locks held by this backend */ - held_lwlocks[num_held_lwlocks++] = lockid; + held_lwlocks[num_held_lwlocks].lock = lockid; + held_lwlocks[num_held_lwlocks++].mode = mode; TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); } + Assert(MyProc == NULL || MyProc->lwWaitLink == NULL); + Assert(MyProc == NULL || !MyProc->lwWaiting); + Assert(MyProc == NULL || lock->head != MyProc); + Assert(MyProc == NULL || lock->tail != MyProc); + return !mustwait; } @@ -728,11 +1118,11 @@ void LWLockRelease(LWLockId lockid) { volatile LWLock *lock = &(LWLockArray[lockid].lock); - PGPROC *head; - PGPROC *proc; int i; - - PRINT_LWDEBUG("LWLockRelease", lockid, lock); + LWLockMode mode; + uint32 lockcount; + bool check_waiters; + bool have_waiters = false; /* * Remove lock from list of locks held. Usually, but not always, it will @@ -740,8 +1130,11 @@ LWLockRelease(LWLockId lockid) */ for (i = num_held_lwlocks; --i >= 0;) { - if (lockid == held_lwlocks[i]) + if (lockid == held_lwlocks[i].lock) + { + mode = held_lwlocks[i].mode; break; + } } if (i < 0) elog(ERROR, "lock %d is not held", (int) lockid); @@ -749,100 +1142,63 @@ LWLockRelease(LWLockId lockid) for (; i < num_held_lwlocks; i++) held_lwlocks[i] = held_lwlocks[i + 1]; - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&lock->mutex); + PRINT_LWDEBUG("LWLockRelease", lockid, lock, mode); - /* Release my hold on lock */ - if (lock->exclusive > 0) - lock->exclusive--; - else - { - Assert(lock->shared > 0); - lock->shared--; - } + pg_read_barrier(); - /* - * See if I need to awaken any waiters. If I released a non-last shared - * hold, there cannot be anything to do. Also, do not awaken any waiters - * if someone has already awakened waiters that haven't yet acquired the - * lock. - */ - head = lock->head; - if (head != NULL) - { - if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK) - { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; +#ifdef NOT_ANYMORE + if (pg_atomic_read(lock->waiters) > 0) + have_waiters = true; +#endif - proc = head; + /* Release my hold on lock, both are a full barrier */ + if (mode == LW_EXCLUSIVE) + lockcount = __sync_sub_and_fetch(&lock->lockcount, EXCLUSIVE_LOCK_FINALIZED); + else + lockcount = __sync_sub_and_fetch(&lock->lockcount, 1); - /* - * First wake up any backends that want to be woken up without - * acquiring the lock. - */ - while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) - proc = proc->lwWaitLink; + /* nobody else can have that kind of lock */ + Assert(lockcount < EXCLUSIVE_LOCK_FINALIZED); - /* - * If the front waiter wants exclusive lock, awaken him only. - * Otherwise awaken as many waiters as want shared access. - */ - if (proc->lwWaitMode != LW_EXCLUSIVE) - { - while (proc->lwWaitLink != NULL && - proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE) - { - if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) - releaseOK = false; - proc = proc->lwWaitLink; - } - } - /* proc is now the last PGPROC to be released */ - lock->head = proc->lwWaitLink; - proc->lwWaitLink = NULL; + /* + * Anybody we need to wakeup needs to have started queueing before + * we removed ourselves from the queue and the __sync_ operations + * above are full barriers. + */ - /* - * Prevent additional wakeups until retryer gets to run. Backends - * that are just waiting for the lock to become free don't retry - * automatically. - */ - if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) - releaseOK = false; + if (pg_atomic_read(lock->waiters) > 0) + have_waiters = true; + + /* we're still waiting for backends to get scheduled, don't release again */ + if (!lock->releaseOK) + check_waiters = false; + /* grant permission to run, even if a spurious share lock increases logcount */ + else if (mode == LW_EXCLUSIVE && have_waiters) + check_waiters = true; + /* nobody has this locked anymore, potential exclusive lockers get a chance */ + else if (lockcount == 0 && have_waiters) + check_waiters = true; + /* nobody queued or not free */ + else + check_waiters = false; - lock->releaseOK = releaseOK; - } - else - { - /* lock is still held, can't awaken anything */ - head = NULL; - } + if (check_waiters) + { + PRINT_LWDEBUG("LWLockRelease releasing", lockid, lock, mode); + LWLockWakeup(lock, lockid, mode); } - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&lock->mutex); - TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid); /* - * Awaken any waiters I removed from the queue. - */ - while (head != NULL) - { - LOG_LWDEBUG("LWLockRelease", lockid, "release waiter"); - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); - } - - /* * Now okay to allow cancel/die interrupts. */ RESUME_INTERRUPTS(); + + Assert(MyProc == NULL || MyProc->lwWaitLink == NULL); + Assert(MyProc == NULL || !MyProc->lwWaiting); + Assert(MyProc == NULL || lock->head != MyProc); + Assert(MyProc == NULL || lock->tail != MyProc); } @@ -862,7 +1218,7 @@ LWLockReleaseAll(void) { HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */ - LWLockRelease(held_lwlocks[num_held_lwlocks - 1]); + LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock); } } @@ -870,8 +1226,7 @@ LWLockReleaseAll(void) /* * LWLockHeldByMe - test whether my process currently holds a lock * - * This is meant as debug support only. We do not distinguish whether the - * lock is held shared or exclusive. + * This is meant as debug support only. */ bool LWLockHeldByMe(LWLockId lockid) @@ -880,7 +1235,7 @@ LWLockHeldByMe(LWLockId lockid) for (i = 0; i < num_held_lwlocks; i++) { - if (held_lwlocks[i] == lockid) + if (held_lwlocks[i].lock == lockid) return true; } return false; -- 1.8.4.21.g992c386.dirty