From 8df6afc21824e590c056ef9419d72002ba4da114 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 5 Aug 2025 21:49:03 -0400 Subject: [PATCH v4] Improve initial slot synchronization in pg_sync_replication_slots() During initial slot synchronization on a standby, the operation may fail if required catalog rows or WALs have been removed or are at risk of removal. The slotsync worker handles this by creating a temporary slot for initial sync and retain it even in case of failure. It will keep retrying until the slot on the primary has been advanced to a position where all the required data are also available on the standby. However, pg_sync_replication_slots() had no such protection mechanism. The SQL API would fail immediately if synchronization requirements weren't met. This could lead to permanent failure as the standby might continue removing the still-required data. To address this, we now make pg_sync_replication_slots() wait for the primary slot to advance to a suitable position before completing synchronization and before removing the temporary slot. Once the slot advances to a suitable position, we retry synchronization. Additionally, if a promotion occurs on the standby during this wait, the process exits gracefully and the temporary slot is removed. --- doc/src/sgml/func.sgml | 4 +- doc/src/sgml/logicaldecoding.sgml | 40 ++--- src/backend/replication/logical/slotsync.c | 226 ++++++++++++++++++++++-- src/backend/utils/activity/wait_event_names.txt | 1 + 4 files changed, 229 insertions(+), 42 deletions(-) diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 74a16af..4092677 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -30034,9 +30034,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 593f784..edad0e9 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -364,18 +364,23 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU Replication Slot Synchronization - The logical replication slots on the primary can be synchronized to - the hot standby by using the failover parameter of + The logical replication slots on the primary can be enabled for + synchronization to the hot standby by using the + failover parameter of pg_create_logical_replication_slot, or by using the failover option of - CREATE SUBSCRIPTION during slot creation. - Additionally, enabling - sync_replication_slots on the standby - is required. By enabling - sync_replication_slots - on the standby, the failover slots can be synchronized periodically in + CREATE SUBSCRIPTION during slot creation. After that, + synchronization can be be performed either manually by calling + + pg_sync_replication_slots + on the standby, or automatically by enabling + + sync_replication_slots on the standby. + When + sync_replication_slots is enabled + on the standby, the failover slots are periodically synchronized by the slotsync worker. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby (i.e., primary_slot_name @@ -398,25 +403,6 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU receiving the WAL up to the latest flushed position on the primary server. - - - While enabling - sync_replication_slots allows for automatic - periodic synchronization of failover slots, they can also be manually - synchronized using the - pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot may have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization - via sync_replication_slots provides continuous slot - updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. - - - When slot synchronization is configured as recommended, and the initial synchronization is performed either automatically or diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 2f0c08b..01df8ea 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -146,6 +146,7 @@ typedef struct RemoteSlot ReplicationSlotInvalidationCause invalidated; } RemoteSlot; +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); static void slotsync_failure_callback(int code, Datum arg); static void update_synced_slots_inactive_since(void); @@ -211,7 +212,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, * impact the users, so we used DEBUG1 level to log the message. */ ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1, - errmsg("could not synchronize replication slot \"%s\"", + errmsg("initial sync of replication slot \"%s\" failed; will keep retrying", remote_slot->name), errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.", LSN_FORMAT_ARGS(remote_slot->restart_lsn), @@ -550,6 +551,185 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) } /* + * Wait for remote slot to pass locally reserved position. + * + * Return true if remote_slot could catch up with the locally reserved + * position. Return false in all other cases. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ +#define SLOT_QUERY_COLUMN_COUNT 4 + + StringInfoData cmd; + int wait_iterations = 0; + + Assert(!AmLogicalSlotSyncWorkerProcess()); + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT invalidation_reason IS NOT NULL, restart_lsn," + " confirmed_flush_lsn, catalog_xmin" + " FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + bool new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *tupslot; + Datum d; + int rc; + int col = 0; + bool isnull; + Oid slotRow[SLOT_QUERY_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, XIDOID}; + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + + res = walrcv_exec(wrconn, cmd.data, SLOT_QUERY_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch slot \"%s\" info from the" + " primary server: %s", + remote_slot->name, res->err)); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + ereport(LOG, + errmsg("aborting sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was not found on the primary server.")); + + pfree(cmd.data); + walrcv_clear_result(res); + + return false; + } + + /* + * It is possible that the slot was invalidated on the primary, if so + * handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(tupslot, ++col, &isnull)); + Assert(!isnull); + + if (new_invalidated) + { + /* + * The slot won't be persisted by the caller; it will be cleaned + * up at the end of synchronization. + */ + ereport(WARNING, + errmsg("aborting initial sync for slot \"%s\"", + remote_slot->name), + errdetail("This slot was invalidated on the primary server.")); + + pfree(cmd.data); + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + return false; + } + + /* Any slot with NULL in these fields should not have made it this far */ + d = slot_getattr(tupslot, ++col, &isnull); + Assert(!isnull); + new_restart_lsn = DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + Assert(!isnull); + new_confirmed_lsn = DatumGetLSN(d); + + d = slot_getattr(tupslot, ++col, &isnull); + Assert(!isnull); + new_catalog_xmin = DatumGetTransactionId(d); + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + pfree(cmd.data); + + return true; + } + + /* + * If we've been promoted, then no point continuing. + */ + if (SlotSyncCtx->stopSignaled) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization as" + " promotion is triggered"))); + pfree(cmd.data); + + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 2000L, + WAIT_EVENT_REPLICATION_SLOTSYNC_PRIMARY_CATCHUP); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + + /* log a message every ten seconds */ + wait_iterations++; + if (wait_iterations % 5 == 0) + { + ereport(LOG, + errmsg("continuing to wait for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + } + } +} + +/* * If the remote restart_lsn and catalog_xmin have caught up with the * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. @@ -558,7 +738,8 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(WalReceiverConn *wrconn, + RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -577,12 +758,30 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* * The remote slot didn't catch up to locally reserved position. * - * We do not drop the slot because the restart_lsn can be ahead of the - * current location when recreating the slot in the next cycle. It may - * take more time to create such a slot. Therefore, we keep this slot - * and attempt the synchronization in the next cycle. + * If we're in the slotsync worker, we do not drop the slot because the + * restart_lsn can be ahead of the current location when recreating + * the slot in the next cycle. It may take more time to create such a + * slot. Therefore, we keep this slot and attempt the synchronization + * in the next cycle. */ - return false; + if (AmLogicalSlotSyncWorkerProcess()) + return false; + + /* + * For SQL API synchronization, we wait for the remote slot to catch up + * here, since we can't assume the SQL API will be called again soon. + * We will retry the sync once the slot catches up. + * + * Note: This will return false if a promotion is triggered on the + * standby while waiting, in which case we stop syncing and drop the + * temporary slot. + */ + if (!wait_for_primary_slot_catchup(wrconn, remote_slot)) + return false; + else + update_local_synced_slot(remote_slot, remote_dbid, + &found_consistent_snapshot, + &remote_slot_precedes); } /* @@ -622,7 +821,8 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + Oid remote_dbid) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr; @@ -715,7 +915,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Slot not ready yet, let's attempt to make it sync-ready now. */ if (slot->data.persistency == RS_TEMPORARY) { - slot_updated = update_and_persist_local_synced_slot(remote_slot, + slot_updated = update_and_persist_local_synced_slot(wrconn, + remote_slot, remote_dbid); } @@ -785,7 +986,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(wrconn, remote_slot, remote_dbid); slot_updated = true; } @@ -927,7 +1128,8 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot, + remote_dbid); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } @@ -1131,7 +1333,7 @@ slotsync_reread_config(void) bool conninfo_changed; bool primary_slotname_changed; - Assert(sync_replication_slots); + Assert(!AmLogicalSlotSyncWorkerProcess() || sync_replication_slots); ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 0be307d..9fa36ab 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -64,6 +64,7 @@ LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication paralle RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." +REPLICATION_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." -- 1.8.3.1