diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 723051e..85997f6 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -210,6 +210,10 @@ static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode); static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode); static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock); +static bool GroupLockShouldJumpQueue(LockMethod lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + PROCLOCK *proclock); /* * To make the fast-path lock mechanism work, we must have some way of @@ -339,7 +343,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) static uint32 proclock_hash(const void *key, Size keysize); static void RemoveLocalLock(LOCALLOCK *locallock); static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, - const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); + PGPROC *leader, const LOCKTAG *locktag, uint32 hashcode, + LOCKMODE lockmode); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); static void FinishStrongLockAcquire(void); @@ -894,8 +899,8 @@ LockAcquireExtended(const LOCKTAG *locktag, * away anytime. So we have to use SetupLockInTable() to recompute the * lock and proclock pointers, even if they're already set. */ - proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, - hashcode, lockmode); + proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader, + locktag, hashcode, lockmode); if (!proclock) { AbortStrongLockAcquire(); @@ -914,18 +919,27 @@ LockAcquireExtended(const LOCKTAG *locktag, /* * If lock requested conflicts with locks requested by waiters, must join - * wait queue. Otherwise, check for conflict with already-held locks. - * (That's last because most complex check.) + * wait queue (except for certain cases involving group locking, where + * new lockers must sometimes jump the entire wait queue to avoid + * deadlock). Otherwise, we can grant ourselves the lock if there are + * no conflicts. */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) - status = STATUS_FOUND; + { + if (proclock->groupLeader != NULL && + GroupLockShouldJumpQueue(lockMethodTable, lockmode, lock, + proclock)) + status = STATUS_OK; + else + status = STATUS_FOUND; + } else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock); if (status == STATUS_OK) { - /* No conflict with held or previously requested locks */ + /* We can and should grant ourselves the lock at once */ GrantLock(lock, proclock, lockmode); GrantLockLocal(locallock, owner); } @@ -1053,7 +1067,7 @@ LockAcquireExtended(const LOCKTAG *locktag, * held at exit. */ static PROCLOCK * -SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, +SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, PGPROC *leader, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode) { LOCK *lock; @@ -1141,6 +1155,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, { uint32 partition = LockHashPartition(hashcode); + proclock->groupLeader = leader; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ @@ -1258,9 +1273,10 @@ RemoveLocalLock(LOCALLOCK *locallock) * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, no matter what purpose they are held for - * (eg, session and transaction locks do not conflict). - * So, we must subtract off our own locks when determining whether the - * requested new lock conflicts with those already held. + * (eg, session and transaction locks do not conflict). Nor do the locks + * of one process in a lock group conflict with those of another process in + * the same group. So, we must subtract off these locks when determining + * whether the requested new lock conflicts with those already held. */ int LockCheckConflicts(LockMethod lockMethodTable, @@ -1270,8 +1286,12 @@ LockCheckConflicts(LockMethod lockMethodTable, { int numLockModes = lockMethodTable->numLockModes; LOCKMASK myLocks; - LOCKMASK otherLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, @@ -1282,44 +1302,215 @@ LockCheckConflicts(LockMethod lockMethodTable, * type of lock that conflicts with request. Bitwise compare tells if * there is a conflict. */ - if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) + if (!(conflictMask & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } /* - * Rats. Something conflicts. But it could still be my own lock. We have - * to construct a conflict mask that does not reflect our own locks, but - * only lock types held by other processes. + * Rats. Something conflicts. But it could still be my own lock, or + * a lock held by another member of my locking group. First, figure out + * how many conflicts remain after subtracting out any locks I hold + * myself. */ myLocks = proclock->holdMask; - otherLocks = 0; for (i = 1; i <= numLockModes; i++) { - int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0; + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } - if (lock->granted[i] > myHolding) - otherLocks |= LOCKBIT_ON(i); + /* If no conflicts remain, we get the lock. */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock); + return STATUS_OK; + } + + /* If we're not using group locking, this is definitely a conflict. */ + if (proclock->groupLeader == NULL) + { + PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", proclock); + return STATUS_FOUND; + } + + /* Important special case: we're the only member of a lock group. */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2) + { + Assert(proclock->tag.myProc == MyProc); + Assert(MyProc->lockGroupMembers == 1); + PROCLOCK_PRINT("LockCheckConflicts: conflicting (trivial group)", + proclock); + return STATUS_FOUND; } /* - * now check again for conflicts. 'otherLocks' describes the types of - * locks held by other processes. If one of these conflicts with the kind - * of lock that I want, there is a conflict and I have to sleep. + * Locks held in conflicting modes by members of our own lock group are + * not real conflicts; we can subtract those out and see if we still have + * a conflict. This is O(N) in the number of processes holding or awaiting + * locks on this object. We could improve that by making the shared memory + * state more complex (and larger) but it doesn't seem worth it. */ - if (!(lockMethodTable->conflictTab[lockmode] & otherLocks)) + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) { - /* no conflict. OK to get the lock */ - PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); - return STATUS_OK; + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + (otherproclock->holdMask & conflictMask) != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (group)", + proclock); + return STATUS_OK; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); } - PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); + /* Nope, it's a real conflict. */ + PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); return STATUS_FOUND; } /* + * GroupLockGrantWithoutWait -- should a group lock be granted without + * waiting, despite the presence of conflicting waiters? + * + * If some member of our locking group already holds a lock on the object, + * then we should skip the wait queue and grant ourselves the lock immediately. + * This is because we presume lock group members will eventually wait for + * each other; thus, if we didn't do this, such situations would result in + * an eventual deadlock. However, if a conflicting lock is present that is + * not held by another member of our lock group, then we can't do this. + * In that case we'll have to wait despite the deadlock risk and hope for + * the best. + */ +static bool +GroupLockShouldJumpQueue(LockMethod lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + PROCLOCK *proclock) +{ + int numLockModes = lockMethodTable->numLockModes; + LOCKMASK myLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; + int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; + + /* + * If we're the only member of the lock group, then clearly no other + * member holds a lock. We should NOT jump the queue. + */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2) + { + Assert(proclock->tag.myProc == MyProc); + Assert(MyProc->lockGroupMembers == 1); + PROCLOCK_PRINT("GroupLockShouldJumpQueue: trivial group", proclock); + return false; + } + + /* Count the number of lock conflicts, excluding my own locks. */ + myLocks = proclock->holdMask; + for (i = 1; i <= numLockModes; i++) + { + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } + + /* + * Search for locks held by other group members. Even if there are + * no conflicts, we can't exit early yet, because we don't know whether + * any group member actually holds a lock. + */ + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) + { + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + otherproclock->holdMask != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + /* + * Does the group member hold a lock in 1 or more conflicting + * modes? If so, reduce the count of remaining conflicts by the + * number of such modes. + */ + if (intersectMask != 0) + { + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + } + + /* + * Whether there were any conflicting modes here or not, the fact + * that the lock is held at all makes us eligible to jump the + * queue. But we can only do that once the absence of conflicts + * is established. + */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("GroupLockShouldJumpQueue: jump", proclock); + return true; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + /* Either no group members hold locks, or there are conflicts. */ + PROCLOCK_PRINT("GroupLockShouldJumpQueue: fallthrough", proclock); + return false; +} + +/* * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * @@ -2534,7 +2725,8 @@ FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode)) continue; - proclock = SetupLockInTable(lockMethodTable, proc, locktag, + proclock = SetupLockInTable(lockMethodTable, proc, + proc->lockGroupLeader, locktag, hashcode, lockmode); if (!proclock) { @@ -2590,8 +2782,8 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) /* Find or create lock object. */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, - locallock->hashcode, lockmode); + proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader, + locktag, locallock->hashcode, lockmode); if (!proclock) { LWLockRelease(partitionLock); @@ -3094,6 +3286,9 @@ PostPrepare_Locks(TransactionId xid) PROCLOCKTAG proclocktag; int partition; + /* Can't prepare a lock group follower. */ + Assert(LockGroupLeader == MyProc || LockGroupLeader == NULL); + /* This is a critical section: any error means big trouble */ START_CRIT_SECTION(); @@ -3195,6 +3390,19 @@ PostPrepare_Locks(TransactionId xid) Assert(proclock->tag.myProc == MyProc); + /* + * We shouldn't be in a lock group, except for a single-entry + * group for which we're the leader, which is OK. We need to + * clear the groupLeader pointer in that case, so that the dummy + * PGPROC doesn't end up pointing back to our PGPROC. + */ + if (proclock->groupLeader != NULL) + { + Assert(proclock->groupLeader == MyProc); + Assert(MyProc->lockGroupMembers == 1); + proclock->groupLeader = NULL; + } + lock = proclock->tag.myLock; /* Ignore VXID locks */ @@ -3784,6 +3992,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, */ if (!found) { + proclock->groupLeader = NULL; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ @@ -4058,7 +4267,8 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) LWLockAcquire(partitionLock, LW_EXCLUSIVE); proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, - &tag, hashcode, ExclusiveLock); + proc->lockGroupLeader, &tag, hashcode, + ExclusiveLock); if (!proclock) { LWLockRelease(partitionLock); diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index ea88a24..1af9851 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -64,6 +64,13 @@ PGPROC *MyProc = NULL; PGXACT *MyPgXact = NULL; /* + * If we're not in a lock group, LockGroupLeader will be NULL. Otherwise, + * it should be the set to the leader of the lock group of which we're a + * member. This will be the same as MyProc iff we're the group leader. + */ +PGPROC *LockGroupLeader = NULL; + +/* * This spinlock protects the freelist of recycled PGPROC structures. * We cannot use an LWLock because the LWLock manager depends on already * having a PGPROC and a wait semaphore! But these structures are touched @@ -240,18 +247,21 @@ InitProcGlobal(void) /* PGPROC for normal backend, add to freeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->freeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->autovacFreeProcs; } else if (i < MaxBackends) { /* PGPROC for bgworker, add to bgworkerFreeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; ProcGlobal->bgworkerFreeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs; } /* Initialize myProcLocks[] shared memory queues. */ @@ -279,6 +289,7 @@ InitProcess(void) { /* use volatile pointer to prevent code rearrangement */ volatile PROC_HDR *procglobal = ProcGlobal; + PGPROC * volatile * procgloballist; /* * ProcGlobal should be set up already (if we are a backend, we inherit @@ -297,9 +308,17 @@ InitProcess(void) */ InitializeLatchSupport(); + /* Decide which list should supply our PGPROC. */ + if (IsAnyAutoVacuumProcess()) + procgloballist = &procglobal->autovacFreeProcs; + else if (IsBackgroundWorker) + procgloballist = &procglobal->bgworkerFreeProcs; + else + procgloballist = &procglobal->freeProcs; + /* - * Try to get a proc struct from the free list. If this fails, we must be - * out of PGPROC structures (not to mention semaphores). + * Try to get a proc struct from the appropriate free list. If this + * fails, we must be out of PGPROC structures (not to mention semaphores). * * While we are holding the ProcStructLock, also copy the current shared * estimate of spins_per_delay to local storage. @@ -308,21 +327,11 @@ InitProcess(void) set_spins_per_delay(procglobal->spins_per_delay); - if (IsAnyAutoVacuumProcess()) - MyProc = procglobal->autovacFreeProcs; - else if (IsBackgroundWorker) - MyProc = procglobal->bgworkerFreeProcs; - else - MyProc = procglobal->freeProcs; + MyProc = *procgloballist; if (MyProc != NULL) { - if (IsAnyAutoVacuumProcess()) - procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next; - else if (IsBackgroundWorker) - procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next; - else - procglobal->freeProcs = (PGPROC *) MyProc->links.next; + *procgloballist = (PGPROC *) MyProc->links.next; SpinLockRelease(ProcStructLock); } else @@ -341,6 +350,12 @@ InitProcess(void) MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno]; /* + * Cross-check that the PGPROC is of the type we expect; if this were + * not the case, it would get returned to the wrong list. + */ + Assert(MyProc->procgloballist == procgloballist); + + /* * Now that we have a PGPROC, mark ourselves as an active postmaster * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in @@ -375,6 +390,7 @@ InitProcess(void) MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + Assert(MyProc->lockGroupLeader == NULL); #ifdef USE_ASSERT_CHECKING { int i; @@ -538,6 +554,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + Assert(MyProc->lockGroupLeader == NULL); #ifdef USE_ASSERT_CHECKING { int i; @@ -780,16 +797,6 @@ ProcKill(int code, Datum arg) /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); -#ifdef USE_ASSERT_CHECKING - { - int i; - - /* Last process should have released all locks. */ - for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); - } -#endif - /* * Release any LW locks I am holding. There really shouldn't be any, but * it's cheap to check again before we cut the knees off the LWLock @@ -801,6 +808,50 @@ ProcKill(int code, Datum arg) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* If we're a lock group member, detach from the lock group. */ + if (LockGroupLeader != NULL && LockGroupLeader != MyProc) + { + int members; + + LWLockAcquire(LockGroupLeader->backendLock, LW_EXCLUSIVE); + members = --LockGroupLeader->lockGroupMembers; + LWLockRelease(LockGroupLeader->backendLock); + + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + MyProc->lockGroupLeader = NULL; + LWLockRelease(MyProc->backendLock); + + /* If we're the last member of the lock group, detach the PGPROC. */ + if (members == 0) + { + PGPROC * volatile * procgloballist; + +#ifdef USE_ASSERT_CHECKING + { + int i; + + /* Last process should have released all locks. */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + Assert(SHMQueueEmpty(&(LockGroupLeader->myProcLocks[i]))); + } +#endif + + procgloballist = LockGroupLeader->procgloballist; + + SpinLockAcquire(ProcStructLock); + + /* Return PGPROC structure to appropriate freelist */ + LockGroupLeader->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = LockGroupLeader; + + /* Update shared estimate of spins_per_delay */ + procglobal->spins_per_delay = + update_spins_per_delay(procglobal->spins_per_delay); + + SpinLockRelease(ProcStructLock); + } + } + /* * Clear MyProc first; then disown the process latch. This is so that * signal handlers won't try to clear the process latch after it's no @@ -810,29 +861,55 @@ ProcKill(int code, Datum arg) MyProc = NULL; DisownLatch(&proc->procLatch); - SpinLockAcquire(ProcStructLock); - - /* Return PGPROC structure (and semaphore) to appropriate freelist */ - if (IsAnyAutoVacuumProcess()) - { - proc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs; - procglobal->autovacFreeProcs = proc; - } - else if (IsBackgroundWorker) + /* + * If we are a lock group leader, we need to check whether any other + * group members are active. If not, we can declare ourselves to no longer + * be a lock group leader, allowing our PGPROC to be recycled + * immediately. + */ + if (LockGroupLeader == proc) { - proc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs; - procglobal->bgworkerFreeProcs = proc; + int members; + + LWLockAcquire(proc->backendLock, LW_EXCLUSIVE); + members = --proc->lockGroupMembers; + LWLockRelease(proc->backendLock); + + LockGroupLeader = NULL; } - else + + /* + * If we were never a lock group leader or have managed to give up that + * designation, then we can immediately release our PGPROC. If not, then + * the last group member will do that on exit. + */ + if (LockGroupLeader == NULL) { - proc->links.next = (SHM_QUEUE *) procglobal->freeProcs; - procglobal->freeProcs = proc; - } + PGPROC * volatile * procgloballist; - /* Update shared estimate of spins_per_delay */ - procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); +#ifdef USE_ASSERT_CHECKING + { + int i; - SpinLockRelease(ProcStructLock); + /* Last process should have released all locks. */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + Assert(SHMQueueEmpty(&(proc->myProcLocks[i]))); + } +#endif + + procgloballist = proc->procgloballist; + SpinLockAcquire(ProcStructLock); + + /* Return PGPROC structure (and semaphore) to appropriate freelist */ + proc->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = proc; + + /* Update shared estimate of spins_per_delay */ + procglobal->spins_per_delay = + update_spins_per_delay(procglobal->spins_per_delay); + + SpinLockRelease(ProcStructLock); + } /* * This process is no longer present in shared memory in any meaningful @@ -960,18 +1037,53 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) bool early_deadlock = false; bool allow_autovacuum_cancel = true; int myWaitStatus; - PGPROC *proc; + PGPROC *proc = NULL; int i; + PGPROC *groupLeader = LockGroupLeader; + + /* + * Ignore trivial lock groups. + * + * We read MyProc->lockGroupMembers here without a lock. The read itself + * is atomic; while the value could be changing under us, it can't change + * from a value < 2 to a value >= 2 while any group locks are actually + * present. Similarly, when iterating over the wait queue, we needn't + * worry that the lock group membership of a process will change under us: + * that's not allowed while a process holds any locks. + */ + if (MyProc == groupLeader && MyProc->lockGroupMembers >= 2) + groupLeader = NULL; /* * Determine where to add myself in the wait queue. * - * Normally I should go at the end of the queue. However, if I already - * hold locks that conflict with the request of any previous waiter, put - * myself in the queue just in front of the first such waiter. This is not - * a necessary step, since deadlock detection would move me to before that - * waiter anyway; but it's relatively cheap to detect such a conflict - * immediately, and avoid delaying till deadlock timeout. + * Normally I should go at the end of the queue. However, if I'm a + * member of a lock group, and some other member of the lock group is + * already waiting for a lock, then add add myself just after the + * existing waiters. This is necessary for correctness; any code that + * scans the wait queue is entitled to assume that lockers from the same + * group are in consecutive positions in the queue. + */ + if (groupLeader != NULL) + { + PGPROC *cproc = (PGPROC *) waitQueue->links.next; + + for (i = 0; i < waitQueue->size; i++) + { + if (cproc->lockGroupLeader == LockGroupLeader) + proc = cproc; + else if (proc != NULL) + break; + cproc = (PGPROC *) cproc->links.next; + } + } + + /* + * If I already hold locks that conflict with the request of any previous + * waiter, put myself in the queue just in front of the first such waiter. + * This is not a necessary step, since deadlock detection would move me + * to before that waiter anyway; but it's relatively cheap to detect such + * a conflict immediately, and avoid delaying till deadlock timeout. * * Special case: if I find I should go in front of some waiter, check to * see if I conflict with already-held locks or the requests before that @@ -982,16 +1094,24 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (myHeldLocks != 0) { + PGPROC *cproc = (PGPROC *) waitQueue->links.next; LOCKMASK aheadRequests = 0; - proc = (PGPROC *) waitQueue->links.next; for (i = 0; i < waitQueue->size; i++) { + /* + * If we reached our own lock group in the wait queue without + * finding a conflict, we aren't going to find one at all prior + * to the insertion point, so bail out. + */ + if (groupLeader != NULL && cproc->lockGroupLeader == groupLeader) + break; + /* Must he wait for me? */ - if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) + if (lockMethodTable->conflictTab[cproc->waitLockMode] & myHeldLocks) { /* Must I wait for him ? */ - if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks) + if (lockMethodTable->conflictTab[lockmode] & cproc->heldLocks) { /* * Yes, so we have a deadlock. Easiest way to clean up @@ -1000,7 +1120,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * a flag to check below, and break out of loop. Also, * record deadlock info for later message. */ - RememberSimpleDeadLock(MyProc, lockmode, lock, proc); + RememberSimpleDeadLock(MyProc, lockmode, lock, cproc); early_deadlock = true; break; } @@ -1016,22 +1136,19 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) GrantAwaitedLock(); return STATUS_OK; } - /* Break out of loop to put myself before him */ + /* Break out of loop and put myself before him */ + proc = cproc; break; } /* Nope, so advance to next waiter */ - aheadRequests |= LOCKBIT_ON(proc->waitLockMode); - proc = (PGPROC *) proc->links.next; + aheadRequests |= LOCKBIT_ON(cproc->waitLockMode); + cproc = (PGPROC *) cproc->links.next; } - - /* - * If we fall out of loop normally, proc points to waitQueue head, so - * we will insert at tail of queue as desired. - */ } - else + + if (proc == NULL) { - /* I hold no locks, so I can't push in front of anyone. */ + /* No special case applies, so I can't push in front of anyone. */ proc = (PGPROC *) &(waitQueue->links); } @@ -1614,6 +1731,71 @@ check_done: LWLockRelease(LockHashPartitionLockByIndex(i)); } +/* + * BecomeLockGroupLeader - designate process as lock group leader + * + * Once this function has returned, other processes can join the lock group + * by calling BecomLockGroupFollower. + */ +void +BecomeLockGroupLeader(void) +{ + /* Can't be leader and follower. */ + Assert(LockGroupLeader == NULL || LockGroupLeader == MyProc); + + /* This can be called more than once; but we must not redo the work. */ + if (LockGroupLeader == NULL) + { + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + Assert(MyProc->lockGroupMembers == 0); + Assert(MyProc->lockGroupLeader == NULL); + MyProc->lockGroupLeader = MyProc; + MyProc->lockGroupLeaderIdentifier = MyProcPid; + MyProc->lockGroupMembers = 1; + LWLockRelease(MyProc->backendLock); + } +} + +/* + * BecomeLockGroupFollower - designate process as lock group follower + * + * This is pretty straightforward except for the possibility that the leader + * whose group we're trying to join might exit before we manage to do so; + * and the PGPROC might get recycled for an unrelated process. To avoid + * that, we require the caller to pass the PID of the intended PGPROC as + * an interlock. Returns true if we successfully join the intended lock + * group, and false if not. + */ +bool +BecomeLockGroupFollower(PGPROC *leader, int pid) +{ + bool ok = false; + + /* Can't become a follower if we already in a lock group. */ + Assert(LockGroupLeader == NULL); + + /* Can't follow ourselves. */ + Assert(MyProc != leader); + + /* Try to join the group. */ + LWLockAcquire(leader->backendLock, LW_EXCLUSIVE); + if (leader->lockGroupMembers > 0 && + leader->lockGroupLeaderIdentifier == pid) + { + ok = true; + leader->lockGroupMembers++; + LockGroupLeader = leader; + } + LWLockRelease(leader->backendLock); + + /* Advertise our new leader. */ + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + MyProc->lockGroupLeader = leader; + LWLockRelease(MyProc->backendLock); + + return ok; +} + /* * ProcWaitForSignal - wait for a signal from another backend. diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 4c49e3c..b15addb 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -362,6 +362,7 @@ typedef struct PROCLOCK PROCLOCKTAG tag; /* unique identifier of proclock object */ /* data */ + PGPROC *groupLeader; /* group leader, or NULL if no lock group */ LOCKMASK holdMask; /* bitmask for lock types currently held */ LOCKMASK releaseMask; /* bitmask for lock types to be released */ SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */ @@ -473,7 +474,6 @@ typedef enum * worker */ } DeadLockState; - /* * The lockmgr's shared hash tables are partitioned to reduce contention. * To determine which partition a given locktag belongs to, compute the tag's diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c23f4da..6f842f7 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -77,6 +77,7 @@ struct PGPROC { /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ SHM_QUEUE links; /* list link if process is in a list */ + PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ PGSemaphoreData sem; /* ONE semaphore to sleep on */ int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */ @@ -142,6 +143,11 @@ struct PGPROC bool fpVXIDLock; /* are we holding a fast-path VXID lock? */ LocalTransactionId fpLocalTransactionId; /* lxid for fast-path VXID * lock */ + + /* Support for lock groups. */ + int lockGroupMembers; /* 0 if not a lock group leader */ + int lockGroupLeaderIdentifier; /* MyProcPid, if I'm a leader */ + PGPROC *lockGroupLeader; /* lock group leader, if I'm a follower */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -149,6 +155,7 @@ struct PGPROC extern PGDLLIMPORT PGPROC *MyProc; extern PGDLLIMPORT struct PGXACT *MyPgXact; +extern PGDLLIMPORT PGPROC *LockGroupLeader; /* * Prior to PostgreSQL 9.2, the fields below were stored as part of the @@ -253,6 +260,8 @@ extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void CheckDeadLock(void); extern bool IsWaitingForLock(void); extern void LockErrorCleanup(void); +extern void BecomeLockGroupLeader(void); +extern bool BecomeLockGroupFollower(PGPROC *leader, int pid); extern void ProcWaitForSignal(void); extern void ProcSendSignal(int pid);