From 351ce81b4d1b2a61fcfcc9902d7122e5410c8b63 Mon Sep 17 00:00:00 2001 From: Thomas Munro Date: Thu, 27 Oct 2022 12:59:45 +1300 Subject: [PATCH 6/8] Use SetLatches() for synchronous replication wakeups. Don't issue SetLatch() system calls while holding the central synchronous replication lock. --- src/backend/replication/syncrep.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 1a022b11a0..31cf05ed2f 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -100,7 +100,7 @@ static int SyncRepWaitMode = SYNC_REP_NO_WAIT; static void SyncRepQueueInsert(int mode); static void SyncRepCancelWait(void); -static int SyncRepWakeQueue(bool all, int mode); +static int SyncRepWakeQueue(bool all, int mode, LatchBatch *wakeups); static bool SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, @@ -449,6 +449,7 @@ SyncRepReleaseWaiters(void) int numwrite = 0; int numflush = 0; int numapply = 0; + LatchBatch wakeups = {0}; /* * If this WALSender is serving a standby that is not on the list of @@ -518,21 +519,23 @@ SyncRepReleaseWaiters(void) if (walsndctl->lsn[SYNC_REP_WAIT_WRITE] < writePtr) { walsndctl->lsn[SYNC_REP_WAIT_WRITE] = writePtr; - numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE); + numwrite = SyncRepWakeQueue(false, SYNC_REP_WAIT_WRITE, &wakeups); } if (walsndctl->lsn[SYNC_REP_WAIT_FLUSH] < flushPtr) { walsndctl->lsn[SYNC_REP_WAIT_FLUSH] = flushPtr; - numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH); + numflush = SyncRepWakeQueue(false, SYNC_REP_WAIT_FLUSH, &wakeups); } if (walsndctl->lsn[SYNC_REP_WAIT_APPLY] < applyPtr) { walsndctl->lsn[SYNC_REP_WAIT_APPLY] = applyPtr; - numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY); + numapply = SyncRepWakeQueue(false, SYNC_REP_WAIT_APPLY, &wakeups); } LWLockRelease(SyncRepLock); + SetLatches(&wakeups); + elog(DEBUG3, "released %d procs up to write %X/%X, %d procs up to flush %X/%X, %d procs up to apply %X/%X", numwrite, LSN_FORMAT_ARGS(writePtr), numflush, LSN_FORMAT_ARGS(flushPtr), @@ -876,7 +879,7 @@ SyncRepGetStandbyPriority(void) * The caller must hold SyncRepLock in exclusive mode. */ static int -SyncRepWakeQueue(bool all, int mode) +SyncRepWakeQueue(bool all, int mode, LatchBatch *wakeups) { volatile WalSndCtlData *walsndctl = WalSndCtl; PGPROC *proc = NULL; @@ -929,7 +932,7 @@ SyncRepWakeQueue(bool all, int mode) /* * Wake only when we have set state and removed from queue. */ - SetLatch(&(thisproc->procLatch)); + AddLatch(wakeups, &thisproc->procLatch); numprocs++; } @@ -951,6 +954,8 @@ SyncRepUpdateSyncStandbysDefined(void) if (sync_standbys_defined != WalSndCtl->sync_standbys_defined) { + LatchBatch wakeups = {0}; + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); /* @@ -963,7 +968,7 @@ SyncRepUpdateSyncStandbysDefined(void) int i; for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); + SyncRepWakeQueue(true, i, &wakeups); } /* @@ -976,6 +981,8 @@ SyncRepUpdateSyncStandbysDefined(void) WalSndCtl->sync_standbys_defined = sync_standbys_defined; LWLockRelease(SyncRepLock); + + SetLatches(&wakeups); } } -- 2.35.1