*** a/src/backend/storage/ipc/sinvaladt.c --- b/src/backend/storage/ipc/sinvaladt.c *************** *** 140,146 **** typedef struct ProcState /* procPid is zero in an inactive ProcState array entry. */ pid_t procPid; /* PID of backend, for signaling */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ ! int nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ bool signaled; /* backend has been sent catchup signal */ --- 140,146 ---- /* procPid is zero in an inactive ProcState array entry. */ pid_t procPid; /* PID of backend, for signaling */ /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ ! uint64 nextMsgNum; /* next message number to read */ bool resetState; /* backend needs to reset its state */ bool signaled; /* backend has been sent catchup signal */ *************** *** 167,180 **** typedef struct SISeg /* * General state information */ ! int minMsgNum; /* oldest message still needed */ ! int maxMsgNum; /* next message number to be assigned */ ! int nextThreshold; /* # of messages to call SICleanupQueue */ int lastBackend; /* index of last active procState entry, +1 */ int maxBackends; /* size of procState array */ - slock_t msgnumLock; /* spinlock protecting maxMsgNum */ - /* * Circular buffer holding shared-inval messages */ --- 167,178 ---- /* * General state information */ ! uint64 minMsgNum; /* oldest message still needed */ ! uint64 maxMsgNum; /* next message number to be assigned */ ! uint64 nextThreshold; /* # of messages to call SICleanupQueue */ int lastBackend; /* index of last active procState entry, +1 */ int maxBackends; /* size of procState array */ /* * Circular buffer holding shared-inval messages */ *************** *** 237,243 **** CreateSharedInvalidationState(void) shmInvalBuffer->nextThreshold = CLEANUP_MIN; shmInvalBuffer->lastBackend = 0; shmInvalBuffer->maxBackends = MaxBackends; - SpinLockInit(&shmInvalBuffer->msgnumLock); /* The buffer[] array is initially all unused, so we need not fill it */ --- 235,240 ---- *************** *** 414,422 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n) */ while (n > 0) { ! int nthistime = Min(n, WRITE_QUANTUM); ! int numMsgs; ! int max; n -= nthistime; --- 411,419 ---- */ while (n > 0) { ! uint64 nthistime = Min(n, WRITE_QUANTUM); ! uint64 numMsgs; ! uint64 max; n -= nthistime; *************** *** 454,462 **** SIInsertDataEntries(const SharedInvalidationMessage *data, int n) /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; - SpinLockAcquire(&vsegP->msgnumLock); vsegP->maxMsgNum = max; - SpinLockRelease(&vsegP->msgnumLock); } LWLockRelease(SInvalWriteLock); --- 451,457 ---- *************** *** 495,517 **** int SIGetDataEntries(SharedInvalidationMessage *data, int datasize) { SISeg *segP; ! ProcState *stateP; ! int max; int n; - LWLockAcquire(SInvalReadLock, LW_SHARED); - segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; ! /* Fetch current value of maxMsgNum using spinlock */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; - SpinLockAcquire(&vsegP->msgnumLock); max = vsegP->maxMsgNum; - SpinLockRelease(&vsegP->msgnumLock); } if (stateP->resetState) --- 490,508 ---- SIGetDataEntries(SharedInvalidationMessage *data, int datasize) { SISeg *segP; ! volatile ProcState *stateP; ! uint64 max; int n; segP = shmInvalBuffer; stateP = &segP->procState[MyBackendId - 1]; ! /* Fetch current value of maxMsgNum */ { /* use volatile pointer to prevent code rearrangement */ volatile SISeg *vsegP = segP; max = vsegP->maxMsgNum; } if (stateP->resetState) *************** *** 524,530 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize) stateP->nextMsgNum = max; stateP->resetState = false; stateP->signaled = false; - LWLockRelease(SInvalReadLock); return -1; } --- 515,520 ---- *************** *** 544,555 **** SIGetDataEntries(SharedInvalidationMessage *data, int datasize) } /* * Reset our "signaled" flag whenever we have caught up completely. */ if (stateP->nextMsgNum >= max) stateP->signaled = false; - LWLockRelease(SInvalReadLock); return n; } --- 534,560 ---- } /* + * Recheck whether we've been reset. + */ + if (stateP->resetState) + { + /* + * Force reset. We can say we have dealt with any messages added + * since the reset, as well; and that means we should clear the + * signaled flag, too. + */ + stateP->nextMsgNum = max; + stateP->resetState = false; + stateP->signaled = false; + return -1; + } + + /* * Reset our "signaled" flag whenever we have caught up completely. */ if (stateP->nextMsgNum >= max) stateP->signaled = false; return n; } *************** *** 573,589 **** void SICleanupQueue(bool callerHasWriteLock, int minFree) { SISeg *segP = shmInvalBuffer; ! int min, minsig, lowbound, ! numMsgs, ! i; ProcState *needSig = NULL; /* Lock out all writers and readers */ if (!callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); - LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); /* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the --- 578,593 ---- SICleanupQueue(bool callerHasWriteLock, int minFree) { SISeg *segP = shmInvalBuffer; ! uint64 min, minsig, lowbound, ! numMsgs; ! int i; ProcState *needSig = NULL; /* Lock out all writers and readers */ if (!callerHasWriteLock) LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); /* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the *************** *** 630,651 **** SICleanupQueue(bool callerHasWriteLock, int minFree) segP->minMsgNum = min; /* - * When minMsgNum gets really large, decrement all message counters so as - * to forestall overflow of the counters. This happens seldom enough that - * folding it into the previous loop would be a loser. - */ - if (min >= MSGNUMWRAPAROUND) - { - segP->minMsgNum -= MSGNUMWRAPAROUND; - segP->maxMsgNum -= MSGNUMWRAPAROUND; - for (i = 0; i < segP->lastBackend; i++) - { - /* we don't bother skipping inactive entries here */ - segP->procState[i].nextMsgNum -= MSGNUMWRAPAROUND; - } - } - - /* * Determine how many messages are still in the queue, and set the * threshold at which we should repeat SICleanupQueue(). */ --- 634,639 ---- *************** *** 666,672 **** SICleanupQueue(bool callerHasWriteLock, int minFree) BackendId his_backendId = (needSig - &segP->procState[0]) + 1; needSig->signaled = true; - LWLockRelease(SInvalReadLock); LWLockRelease(SInvalWriteLock); elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_backendId); --- 654,659 ---- *************** *** 675,681 **** SICleanupQueue(bool callerHasWriteLock, int minFree) } else { - LWLockRelease(SInvalReadLock); if (!callerHasWriteLock) LWLockRelease(SInvalWriteLock); } --- 662,667 ---- *** a/src/include/storage/lwlock.h --- b/src/include/storage/lwlock.h *************** *** 51,57 **** typedef enum LWLockId OidGenLock, XidGenLock, ProcArrayLock, - SInvalReadLock, SInvalWriteLock, WALInsertLock, WALWriteLock, --- 51,56 ----