diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index ad213fc454..5ab5e95fa9 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -409,15 +409,23 @@ void SyncRepReleaseWaiters(void) { volatile WalSndCtlData *walsndctl = WalSndCtl; + WalSnd *walsnd = MyWalSnd; XLogRecPtr writePtr; XLogRecPtr flushPtr; XLogRecPtr applyPtr; + XLogRecPtr flush; + WalSndState state; bool got_recptr; bool am_sync; int numwrite = 0; int numflush = 0; int numapply = 0; + SpinLockAcquire(&walsnd->mutex); + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* * If this WALSender is serving a standby that is not on the list of * potential sync standbys then we have nothing to do. If we are still @@ -425,8 +433,8 @@ SyncRepReleaseWaiters(void) * still invalid, then leave quickly also. */ if (MyWalSnd->sync_standby_priority == 0 || - MyWalSnd->state < WALSNDSTATE_STREAMING || - XLogRecPtrIsInvalid(MyWalSnd->flush)) + state < WALSNDSTATE_STREAMING || + XLogRecPtrIsInvalid(flush)) { announce_next_takeover = true; return; @@ -711,14 +719,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -726,7 +744,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* @@ -780,14 +798,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) */ for (i = 0; i < max_wal_senders; i++) { + XLogRecPtr flush; + WalSndState state; + int pid; + walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); + pid = walsnd->pid; + flush = walsnd->flush; + state = walsnd->state; + SpinLockRelease(&walsnd->mutex); + /* Must be active */ - if (walsnd->pid == 0) + if (pid == 0) continue; /* Must be streaming */ - if (walsnd->state != WALSNDSTATE_STREAMING) + if (state != WALSNDSTATE_STREAMING) continue; /* Must be synchronous */ @@ -796,7 +824,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync) continue; /* Must have a valid flush position */ - if (XLogRecPtrIsInvalid(walsnd->flush)) + if (XLogRecPtrIsInvalid(flush)) continue; /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 49cce38880..a04be8039d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2874,10 +2874,12 @@ WalSndRqstFileReload(void) { WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } walsnd->needreload = true; SpinLockRelease(&walsnd->mutex); } @@ -3190,14 +3192,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) TimeOffset flushLag; TimeOffset applyLag; int priority; + int pid; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; + SpinLockAcquire(&walsnd->mutex); if (walsnd->pid == 0) + { + SpinLockRelease(&walsnd->mutex); continue; - - SpinLockAcquire(&walsnd->mutex); + } + pid = walsnd->pid; sentPtr = walsnd->sentPtr; state = walsnd->state; write = walsnd->write; @@ -3210,7 +3216,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); - values[0] = Int32GetDatum(walsnd->pid); + values[0] = Int32GetDatum(pid); if (!superuser()) {