From 7b73ecd4d2f82acc031504f5ccc7ed0ac64193a7 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Thu, 24 Apr 2025 19:40:33 +0800 Subject: [PATCH v30 5/7] Re-create the replication slot if the conflict retention duration reduced The patch allows the launcher to drop and re-create the invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 5 +- src/backend/replication/logical/launcher.c | 37 ++++--- src/backend/replication/logical/worker.c | 114 +++++++-------------- 3 files changed, 61 insertions(+), 95 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 49baa739b62..9a9f6fcb218 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5404,7 +5404,10 @@ ANY num_sync ( name), + errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", + LSN_FORMAT_ARGS(data->remote_lsn), + max_conflict_retention_duration)); + /* * Reaching here means the remote WAL position has been received, and * all transactions up to that position on the publisher have been @@ -4382,6 +4390,7 @@ wait_for_local_flush(RetainConflictInfoData *data) */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; + MyLogicalRepWorker->stop_conflict_info_retention = false; SpinLockRelease(&MyLogicalRepWorker->relmutex); elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", @@ -4423,9 +4432,8 @@ reset_conflict_info_fields(RetainConflictInfoData *data) * LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to * invalidate the slot, and return true. Return false otherwise. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_conflict_info and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainConflictInfoData *data) @@ -4450,19 +4458,26 @@ should_stop_conflict_info_retention(RetainConflictInfoData *data) max_conflict_retention_duration)) return false; - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", - MySubscription->name), - errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", - max_conflict_retention_duration)); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId; - MyLogicalRepWorker->stop_conflict_info_retention = true; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Notify launcher to invalidate the conflict slot */ - ApplyLauncherWakeup(); + /* + * Log a message and reset relevant data when the worker is about to stop + * retaining conflict information. + */ + if (!MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidFullTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Notify launcher to invalidate the conflict slot */ + ApplyLauncherWakeup(); + } reset_conflict_info_fields(data); @@ -4510,51 +4525,6 @@ adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found) } } -/* - * Update the conflict retention status for the current apply worker. It checks - * whether the worker should stop retaining conflict information due to - * invalidation of the replication slot ("pg_conflict_detection"). - * - * Currently, the replication slot is invalidated only if the duration for - * retaining conflict information exceeds the allowed maximum. - */ -static void -update_conflict_retention_status(void) -{ - ReplicationSlotInvalidationCause cause = RS_INVAL_NONE; - ReplicationSlot *slot; - - /* Exit early if retaining conflict information is not required */ - if (!MySubscription->retainconflictinfo) - return; - - /* - * Only the leader apply worker manages conflict retention (see - * maybe_advance_nonremovable_xid() for details). - */ - if (!am_leader_apply_worker()) - return; - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - - slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, false); - - if (slot) - { - SpinLockAcquire(&slot->mutex); - cause = slot->data.invalidated; - SpinLockRelease(&slot->mutex); - - Assert(cause == RS_INVAL_NONE || cause == RS_INVAL_CONFLICT_RETENTION_DURATION); - } - - LWLockRelease(ReplicationSlotControlLock); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->stop_conflict_info_retention = cause != RS_INVAL_NONE; - SpinLockRelease(&MyLogicalRepWorker->relmutex); -} - /* * Exit routine for apply workers due to subscription parameter changes. */ @@ -4726,16 +4696,6 @@ maybe_reread_subscription(void) CommitTransactionCommand(); MySubscriptionValid = true; - - /* - * Update worker status to avoid unnecessary conflict retention if the - * replication slot ("pg_conflict_detection") was invalidated prior to - * enabling the retain_conflict_info option. This is also necessary to - * restart conflict retention if the user has disabled and subsequently - * re-enabled the retain_conflict_info option, resulting in the - * replication slot being recreated. - */ - update_conflict_retention_status(); } /* @@ -5382,8 +5342,6 @@ InitializeLogRepWorker(void) MySubscription->name))); CommitTransactionCommand(); - - update_conflict_retention_status(); } /* -- 2.30.0.windows.2