From 387e5d15f89cdff505c28d61f4d89b6dd5f4dbcb Mon Sep 17 00:00:00 2001 From: reVInotip Date: Sat, 18 Oct 2025 10:45:08 +0700 Subject: [PATCH 2/4] Refactor syncrep The code has been refactored to preserve the semantic sections in syncrep.c. Several functions have been reorganized and moved between different code sections to better align with their usage patterns and improve logical grouping. --- src/backend/replication/syncrep.c | 492 +++++++++++++++++++------------------- 1 file changed, 243 insertions(+), 249 deletions(-) diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index 1be667c637d..4390fb7a998 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -640,6 +640,249 @@ SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, return true; } +/* + * Check if we are in the list of sync standbys, and if so, determine + * priority sequence. Return priority if set, or zero to indicate that + * we are not a potential sync standby. + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender, or allow any name if we find a wildcard "*". + */ +static int +SyncRepGetStandbyPriority(void) +{ + const char *standby_name; + int priority; + bool found = false; + + if (!SyncStandbysDefined() || SyncRepConfig == NULL) + return 0; + + standby_name = SyncRepConfig->member_names; + for (priority = 1; priority <= SyncRepConfig->nmembers; priority++) + { + if (pg_strcasecmp(standby_name, application_name) == 0 || + strcmp(standby_name, "*") == 0) + { + found = true; + break; + } + standby_name += strlen(standby_name) + 1; + } + + if (!found) + return 0; + + /* + * In quorum-based sync replication, all the standbys in the list have the + * same priority, one. + */ + return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; +} + +/* + * Walk the specified queue from head. Set the state of any backends that + * need to be woken, remove them from the queue, and then wake them. + * Pass all = true to wake whole queue; otherwise, just wake up to + * the walsender's LSN. + * + * The caller must hold SyncRepLock in exclusive mode. + */ +static int +SyncRepWakeQueue(bool all, int mode) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + int numprocs = 0; + dlist_mutable_iter iter; + + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); + Assert(SyncRepQueueIsOrderedByLSN(mode)); + + dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) + { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + + /* + * Assume the queue is ordered by LSN + */ + if (!all && walsndctl->lsn[mode] < proc->waitLSN) + return numprocs; + + /* + * Remove from queue. + */ + dlist_delete_thoroughly(&proc->syncRepLinks); + + /* + * SyncRepWaitForLSN() reads syncRepState without holding the lock, so + * make sure that it sees the queue link being removed before the + * syncRepState change. + */ + pg_write_barrier(); + + /* + * Set state to complete; see SyncRepWaitForLSN() for discussion of + * the various states. + */ + proc->syncRepState = SYNC_REP_WAIT_COMPLETE; + + /* + * Wake only when we have set state and removed from queue. + */ + SetLatch(&(proc->procLatch)); + + numprocs++; + } + + return numprocs; +} + +/* + * The checkpointer calls this as needed to update the shared + * sync_standbys_status flag, so that backends don't remain permanently wedged + * if synchronous_standby_names is unset. It's safe to check the current value + * without the lock, because it's only ever updated by one process. But we + * must take the lock to change it. + */ +void +SyncRepUpdateSyncStandbysDefined(void) +{ + bool sync_standbys_defined = SyncStandbysDefined(); + + if (sync_standbys_defined != + ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0)) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * If synchronous_standby_names has been reset to empty, it's futile + * for backends to continue waiting. Since the user no longer wants + * synchronous replication, we'd better wake them up. + */ + if (!sync_standbys_defined) + { + int i; + + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) + SyncRepWakeQueue(true, i); + } + + /* + * Only allow people to join the queue when there are synchronous + * standbys defined. Without this interlock, there's a race + * condition: we might wake up all the current waiters; then, some + * backend that hasn't yet reloaded its config might go to sleep on + * the queue (and never wake up). This prevents that. + */ + WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT | + (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0); + + LWLockRelease(SyncRepLock); + } + else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0) + { + LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); + + /* + * Note that there is no need to wake up the queues here. We would + * reach this path only if SyncStandbysDefined() returns false, or it + * would mean that some backends are waiting with the GUC set. See + * SyncRepWaitForLSN(). + */ + Assert(!SyncStandbysDefined()); + + /* + * Even if there is no sync standby defined, let the readers of this + * information know that the sync standby data has been initialized. + * This can just be done once, hence the previous check on + * SYNC_STANDBY_INIT to avoid useless work. + */ + WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT; + + LWLockRelease(SyncRepLock); + } +} + +#ifdef USE_ASSERT_CHECKING +static bool +SyncRepQueueIsOrderedByLSN(int mode) +{ + XLogRecPtr lastLSN; + dlist_iter iter; + + Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); + + lastLSN = 0; + + dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) + { + PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); + + /* + * Check the queue is ordered by LSN and that multiple procs don't + * have matching LSNs + */ + if (proc->waitLSN <= lastLSN) + return false; + + lastLSN = proc->waitLSN; + } + + return true; +} +#endif + +/* + * =========================================================== + * Synchronous Replication functions for wal receiver and wal sender processes + * =========================================================== + */ + +/* + * Calculates the Write, Flush, and Apply positions for synchronous + * standbys using the synchronous replication method. Returns true if the calculation is successful, + * otherwise false. + */ +static bool +SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, + XLogRecPtr *flushPtr, + XLogRecPtr *applyPtr, + SyncRepStandbyData *sync_standbys, + int num_standbys) +{ + /* Quick out if not even configured to be synchronous */ + if (SyncRepConfig == NULL) + return false; + + /* + * In a priority-based sync replication, the synced positions are the + * oldest ones among sync standbys. In a quorum-based, they are the Nth + * latest ones. + * + * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest + * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation + * because it's a bit more efficient. + * + * XXX If the numbers of current and requested sync standbys are the same, + * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced + * positions even in a quorum-based sync replication. + */ + if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) + { + SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys); + } + else + { + SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, + sync_standbys, num_standbys, + SyncRepConfig->num_sync); + } + + return true; +} + /* * Calculate the oldest Write, Flush and Apply positions among sync standbys. */ @@ -834,205 +1077,6 @@ standby_priority_comparator(const void *a, const void *b) return sa->walsnd_index - sb->walsnd_index; } - -/* - * Check if we are in the list of sync standbys, and if so, determine - * priority sequence. Return priority if set, or zero to indicate that - * we are not a potential sync standby. - * - * Compare the parameter SyncRepStandbyNames against the application_name - * for this WALSender, or allow any name if we find a wildcard "*". - */ -static int -SyncRepGetStandbyPriority(void) -{ - const char *standby_name; - int priority; - bool found = false; - - /* - * Since synchronous cascade replication is not allowed, we always set the - * priority of cascading walsender to zero. - */ - - if (!SyncStandbysDefined() || SyncRepConfig == NULL) - return 0; - - standby_name = SyncRepConfig->member_names; - for (priority = 1; priority <= SyncRepConfig->nmembers; priority++) - { - if (pg_strcasecmp(standby_name, application_name) == 0 || - strcmp(standby_name, "*") == 0) - { - found = true; - break; - } - standby_name += strlen(standby_name) + 1; - } - - if (!found) - return 0; - - /* - * In quorum-based sync replication, all the standbys in the list have the - * same priority, one. - */ - return (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) ? priority : 1; -} - -/* - * Walk the specified queue from head. Set the state of any backends that - * need to be woken, remove them from the queue, and then wake them. - * Pass all = true to wake whole queue; otherwise, just wake up to - * the walsender's LSN. - * - * The caller must hold SyncRepLock in exclusive mode. - */ -static int -SyncRepWakeQueue(bool all, int mode) -{ - volatile WalSndCtlData *walsndctl = WalSndCtl; - int numprocs = 0; - dlist_mutable_iter iter; - - Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - Assert(LWLockHeldByMeInMode(SyncRepLock, LW_EXCLUSIVE)); - Assert(SyncRepQueueIsOrderedByLSN(mode)); - - dlist_foreach_modify(iter, &WalSndCtl->SyncRepQueue[mode]) - { - PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); - - /* - * Assume the queue is ordered by LSN - */ - if (!all && walsndctl->lsn[mode] < proc->waitLSN) - return numprocs; - - /* - * Remove from queue. - */ - dlist_delete_thoroughly(&proc->syncRepLinks); - - /* - * SyncRepWaitForLSN() reads syncRepState without holding the lock, so - * make sure that it sees the queue link being removed before the - * syncRepState change. - */ - pg_write_barrier(); - - /* - * Set state to complete; see SyncRepWaitForLSN() for discussion of - * the various states. - */ - proc->syncRepState = SYNC_REP_WAIT_COMPLETE; - - /* - * Wake only when we have set state and removed from queue. - */ - SetLatch(&(proc->procLatch)); - - numprocs++; - } - - return numprocs; -} - -/* - * The checkpointer calls this as needed to update the shared - * sync_standbys_status flag, so that backends don't remain permanently wedged - * if synchronous_standby_names is unset. It's safe to check the current value - * without the lock, because it's only ever updated by one process. But we - * must take the lock to change it. - */ -void -SyncRepUpdateSyncStandbysDefined(void) -{ - bool sync_standbys_defined = SyncStandbysDefined(); - - if (sync_standbys_defined != - ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED) != 0)) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - - /* - * If synchronous_standby_names has been reset to empty, it's futile - * for backends to continue waiting. Since the user no longer wants - * synchronous replication, we'd better wake them up. - */ - if (!sync_standbys_defined) - { - int i; - - for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++) - SyncRepWakeQueue(true, i); - } - - /* - * Only allow people to join the queue when there are synchronous - * standbys defined. Without this interlock, there's a race - * condition: we might wake up all the current waiters; then, some - * backend that hasn't yet reloaded its config might go to sleep on - * the queue (and never wake up). This prevents that. - */ - WalSndCtl->sync_standbys_status = SYNC_STANDBY_INIT | - (sync_standbys_defined ? SYNC_STANDBY_DEFINED : 0); - - LWLockRelease(SyncRepLock); - } - else if ((WalSndCtl->sync_standbys_status & SYNC_STANDBY_INIT) == 0) - { - LWLockAcquire(SyncRepLock, LW_EXCLUSIVE); - - /* - * Note that there is no need to wake up the queues here. We would - * reach this path only if SyncStandbysDefined() returns false, or it - * would mean that some backends are waiting with the GUC set. See - * SyncRepWaitForLSN(). - */ - Assert(!SyncStandbysDefined()); - - /* - * Even if there is no sync standby defined, let the readers of this - * information know that the sync standby data has been initialized. - * This can just be done once, hence the previous check on - * SYNC_STANDBY_INIT to avoid useless work. - */ - WalSndCtl->sync_standbys_status |= SYNC_STANDBY_INIT; - - LWLockRelease(SyncRepLock); - } -} - -#ifdef USE_ASSERT_CHECKING -static bool -SyncRepQueueIsOrderedByLSN(int mode) -{ - XLogRecPtr lastLSN; - dlist_iter iter; - - Assert(mode >= 0 && mode < NUM_SYNC_REP_WAIT_MODE); - - lastLSN = 0; - - dlist_foreach(iter, &WalSndCtl->SyncRepQueue[mode]) - { - PGPROC *proc = dlist_container(PGPROC, syncRepLinks, iter.cur); - - /* - * Check the queue is ordered by LSN and that multiple procs don't - * have matching LSNs - */ - if (proc->waitLSN <= lastLSN) - return false; - - lastLSN = proc->waitLSN; - } - - return true; -} -#endif - /* * =========================================================== * Synchronous Replication functions for wal receiver processes @@ -1110,56 +1154,6 @@ SyncRepGetSendingSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr, XLogRecP pfree(sync_standbys); } -/* - * =========================================================== - * Synchronous Replication functions for wal receiver and wal sender processes - * =========================================================== - */ - -/* - * Calculates the Write, Flush, and Apply positions for synchronous - * standbys using the synchronous replication method. Returns true if the calculation is successful, - * otherwise false. - */ -static bool -SyncRepGetSyncRecPtrBySyncRepMethod(XLogRecPtr *writePtr, - XLogRecPtr *flushPtr, - XLogRecPtr *applyPtr, - SyncRepStandbyData *sync_standbys, - int num_standbys) -{ - /* Quick out if not even configured to be synchronous */ - if (SyncRepConfig == NULL) - return false; - - /* - * In a priority-based sync replication, the synced positions are the - * oldest ones among sync standbys. In a quorum-based, they are the Nth - * latest ones. - * - * SyncRepGetNthLatestSyncRecPtr() also can calculate the oldest - * positions. But we use SyncRepGetOldestSyncRecPtr() for that calculation - * because it's a bit more efficient. - * - * XXX If the numbers of current and requested sync standbys are the same, - * we can use SyncRepGetOldestSyncRecPtr() to calculate the synced - * positions even in a quorum-based sync replication. - */ - if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY) - { - SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys); - } - else - { - SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr, - sync_standbys, num_standbys, - SyncRepConfig->num_sync); - } - - return true; -} - /* * =========================================================== * Synchronous Replication functions executed by any process