From d78bdae54627fed5b82a45b6861c4f783deb7fcc Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 7 Nov 2023 15:37:20 +0530 Subject: [PATCH v31 3/3] Allow slot-sync workers to wait for the cascading standbys. The GUC standby_slot_names is needed to be set on first standby in order to allow it to wait for confirmation for cascading standbys before updating logical 'synced' slots in slot-sync workers. The intent is that the logical slots (synced ones) should not go ahead of cascading standbys. For the user created slots on first standby, we already have this wait logic in place in logical walsender and in pg_logical_slot_get_changes_guts(), but for synced slots (which can not be consumed yet), we need to make sure that they are not going ahead of cascading standbys and that is acheived by introducing the wait in slot-sync worker before we actually update the slots. --- src/backend/replication/logical/slotsync.c | 87 ++++++++++++++++++++-- src/backend/replication/walsender.c | 13 ++-- src/include/replication/walsender.h | 5 ++ 3 files changed, 91 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2bf36d3841..87955b63c2 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -84,6 +84,9 @@ typedef struct RemoteSlot */ static int PrimaryCatchupWaitAttempt = 0; +static void ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots); + /* * Wait for remote slot to pass locally reserved position. */ @@ -491,6 +494,53 @@ drop_obsolete_slots(Oid *dbids, List *remote_slot_list) } } +/* + * Wait for cascading physical standbys corresponding to physical slots + * specified in standby_slot_names GUC to confirm receiving given lsn. + */ +static void +slotsync_wait_for_standby_confirmation(XLogRecPtr wait_for_lsn, + WalReceiverConn *wrconn) +{ + List *standby_slots; + + /* Nothing to be done */ + if (strcmp(standby_slot_names, "") == 0) + return; + + /* TODO: optimize it, cache it somehwere? */ + SlotSyncInitConfig(&standby_slots); + + for (;;) + { + int rc; + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + /* + * This will reload configuration and will refresh the standby_slots + * as well provided standby_slot_names GUC is changed by the user. + */ + ProcessSlotSyncInterrupts(&wrconn, &standby_slots); + + /* + * XXX: Is waiting for 5 second before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 5000L, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } +} + /* * Construct Slot Query * @@ -697,6 +747,7 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) long naptime = WORKER_DEFAULT_NAPTIME_MS; Oid *dbids; int count = 0; + XLogRecPtr max_confirmed_lsn = 0; ListCell *cell; /* The primary_slot_name is not set yet or WALs not received yet */ @@ -808,6 +859,9 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) /* Create list of remote slots */ remote_slot_list = lappend(remote_slot_list, remote_slot); + if (remote_slot->confirmed_lsn > max_confirmed_lsn) + max_confirmed_lsn = remote_slot->confirmed_lsn; + /* * Update naptime as required depending on slot activity. Check only * for the first slot, if one slot has activity then all slots will. @@ -818,6 +872,17 @@ synchronize_slots(dsa_area *dsa, WalReceiverConn *wrconn) ExecClearTuple(slot); } + /* + * If there are cascading standbys, wait for their confirmation before we + * update synced logical slots locally. + * + * Instead of waiting on confirmation for lsn of each slot, let us wait + * once for confirmation on max_confirmed_lsn. If that is confirmed by + * each cascading standby, we are good to update all the slots. + */ + if (list_length(remote_slot_list)) + slotsync_wait_for_standby_confirmation(max_confirmed_lsn, wrconn); + /* Now sync the slots locally */ foreach(cell, remote_slot_list) { @@ -891,12 +956,18 @@ remote_connect() * If primary_conninfo has changed, reconnect to primary. */ static void -slotsync_reread_config(WalReceiverConn **wrconn) +slotsync_reread_config(WalReceiverConn **wrconn, List **standby_slots) { char *conninfo = pstrdup(PrimaryConnInfo); - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); + /* + * Reload configs and recreate the standby_slot_names_list if GUC + * standby_slot_names changed. + */ + if (standby_slots) + WalSndRereadConfigAndSlots(standby_slots); + else + ProcessConfigFile(PGC_SIGHUP); /* Reconnect if GUC primary_conninfo got changed */ if (strcmp(conninfo, PrimaryConnInfo) != 0) @@ -914,7 +985,8 @@ slotsync_reread_config(WalReceiverConn **wrconn) * Interrupt handler for main loop of slot-sync worker. */ static void -ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) +ProcessSlotSyncInterrupts(WalReceiverConn **wrconn, + List **standby_slots) { CHECK_FOR_INTERRUPTS(); @@ -930,7 +1002,10 @@ ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) if (ConfigReloadPending) - slotsync_reread_config(wrconn); + { + ConfigReloadPending = false; + slotsync_reread_config(wrconn, standby_slots); + } } /* @@ -1000,7 +1075,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) int rc; long naptime; - ProcessSlotSyncInterrupts(&wrconn); + ProcessSlotSyncInterrupts(&wrconn, NULL /* standby_slots */); if (!RecoveryInProgress()) proc_exit(0); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1188930c44..09536ffc53 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1694,7 +1694,7 @@ WalSndWakeupNeeded() * Reload the config file and reinitialize the standby slot list if the GUC * standby_slot_names has changed. */ -static void +void WalSndRereadConfigAndSlots(List **standby_slots) { char *pre_standby_slot_names = pstrdup(standby_slot_names); @@ -1725,7 +1725,7 @@ WalSndRereadConfigAndSlots(List **standby_slots) * If the current replication slot does not have failover enabled, NIL is * returned. */ -static List * +List * WalSndGetStandbySlots(void) { List *standby_slots = NIL; @@ -1747,13 +1747,12 @@ WalSndGetStandbySlots(void) * This function updates the passed standby_slots list, removing any slots that * have already caught up to or surpassed the given wait_for_lsn. */ -static void +void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) { ListCell *lc; - List *standby_slots_cpy = *standby_slots; - foreach(lc, standby_slots_cpy) + foreach(lc, *standby_slots) { char *name = lfirst(lc); XLogRecPtr restart_lsn = InvalidXLogRecPtr; @@ -1820,10 +1819,8 @@ WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) if (warningfmt) ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); - standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + *standby_slots = foreach_delete_current(*standby_slots, lc); } - - *standby_slots = standby_slots_cpy; } /* diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index ecbd3526c5..25c522993f 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include #include "access/xlogdefs.h" +#include "nodes/pg_list.h" /* * What to do with a snapshot in create replication slot command. @@ -50,7 +51,11 @@ extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, + List **standby_slots); extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); +extern List *WalSndGetStandbySlots(void); +extern void WalSndRereadConfigAndSlots(List **standby_slots); /* * Remember that we want to wakeup walsenders later -- 2.34.1