diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index a21f7d3347..c11945d1c2 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -149,6 +149,13 @@ SyncRepWaitForLSN(XLogRecPtr lsn, bool commit) const char *old_status; int mode; + /* + * This should be called while holding interrupts during a transaction + * commit to prevent the follow-up shared memory queue cleanups to be + * influenced by external interruptions. + */ + Assert(InterruptHoldoffCount > 0); + /* Cap the level for anything other than commit to remote flush only. */ if (commit) mode = SyncRepWaitMode; @@ -361,12 +368,10 @@ SyncRepCancelWait(void) void SyncRepCleanupAtProcExit(void) { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); if (!SHMQueueIsDetached(&(MyProc->syncRepLinks))) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); SHMQueueDelete(&(MyProc->syncRepLinks)); - LWLockRelease(SyncRepLock); - } + LWLockRelease(SyncRepLock); } /* @@ -508,6 +513,8 @@ SyncRepReleaseWaiters(void) /* * Calculate the synced Write, Flush and Apply positions among sync standbys. * + * The caller must hold SyncRepLock. + * * Return false if the number of sync standbys is less than * synchronous_standby_names specifies. Otherwise return true and * store the positions into *writePtr, *flushPtr and *applyPtr. @@ -521,6 +528,8 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, { List *sync_standbys; + Assert(LWLockHeldByMe(SyncRepLock)); + *writePtr = InvalidXLogRecPtr; *flushPtr = InvalidXLogRecPtr; *applyPtr = InvalidXLogRecPtr; @@ -680,6 +689,8 @@ cmp_lsn(const void *a, const void *b) List * SyncRepGetSyncStandbys(bool *am_sync) { + Assert(LWLockHeldByMe(SyncRepLock)); + /* Set default result */ if (am_sync != NULL) *am_sync = false; @@ -984,7 +995,7 @@ SyncRepGetStandbyPriority(void) * Pass all = true to wake whole queue; otherwise, just wake up to * the walsender's LSN. * - * Must hold SyncRepLock. + * The caller must hold SyncRepLock in exclusive mode. */ static int SyncRepWakeQueue(bool all, int mode) @@ -995,6 +1006,7 @@ SyncRepWakeQueue(bool all, int mode) int numprocs = 0; Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); Assert(SyncRepQueueIsOrderedByLSN(mode)); proc = (PGPROC *) SHMQueueNext(&(WalSndCtl->SyncRepQueue[mode]),