From 6c2c432cf2b1875adc43c01caa7c7c7f35e4c70f Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 23 Jul 2025 14:41:37 +0800 Subject: [PATCH v53 3/3] Re-create the replication slot if the conflict retention duration reduced The patch allows the launcher to drop and re-create the invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 5 +- src/backend/replication/logical/launcher.c | 15 ++++- src/backend/replication/logical/worker.c | 64 +++++++++++++++------- src/backend/utils/adt/pg_upgrade_support.c | 2 +- src/include/replication/logicallauncher.h | 2 +- 5 files changed, 63 insertions(+), 25 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 0d6616857e7..e616381e44a 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5425,7 +5425,10 @@ ANY num_sync ( name), + errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + max_conflict_retention_duration)); + + apply_worker_exit(); + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and @@ -4577,6 +4594,7 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + MyLogicalRepWorker->stop_conflict_info_retention = false; SpinLockRelease(&MyLogicalRepWorker->relmutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", @@ -4619,9 +4637,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) * LogicalRepWorker->stop_conflict_info_retention to true, notify the launcher to * invalidate the slot, and return true. Return false otherwise. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_dead_tuples and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4651,19 +4668,26 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", - MySubscription->name), - errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", - max_conflict_retention_duration)); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; - MyLogicalRepWorker->stop_conflict_info_retention = true; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Notify launcher to invalidate the conflict slot */ - ApplyLauncherWakeup(); + /* + * Log a message and reset relevant data when the worker is about to stop + * retaining conflict information. + */ + if (!MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Notify launcher to invalidate the conflict slot */ + ApplyLauncherWakeup(); + } reset_retention_data_fields(rdt_data); diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index a4f8b4faa90..e20fc44adda 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -423,7 +423,7 @@ binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS) { CHECK_IS_BINARY_UPGRADE; - CreateConflictDetectionSlot(); + CreateConflictDetectionSlot(false); ReplicationSlotRelease(); diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 6e3007db5f0..5052c394c8f 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -29,7 +29,7 @@ extern void ApplyLauncherWakeupAtCommit(void); extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); -extern void CreateConflictDetectionSlot(void); +extern void CreateConflictDetectionSlot(bool recreate_if_invalid); extern bool IsLogicalLauncher(void); -- 2.50.1.windows.1