From 86d7a437f737e3976ad28dcf8056b08ec64c87b4 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 6 Aug 2025 19:02:56 +0800 Subject: [PATCH v58 2/2] Resume retaining the information for conflict detection The patch allows the launcher to re-initialized invalidated slot, if at least one apply worker has confirmed that the retention duration is now within the max_conflict_retention_duration. --- doc/src/sgml/config.sgml | 7 +- src/backend/replication/logical/launcher.c | 83 +++++++++++++---- src/backend/replication/logical/worker.c | 100 ++++++++++++++++----- src/include/replication/worker_internal.h | 7 ++ 4 files changed, 158 insertions(+), 39 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 5a9aee66e3f..6cd0b48c8dd 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -5421,8 +5421,11 @@ ANY num_sync ( data.xmin)); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); @@ -1488,6 +1517,26 @@ CreateConflictDetectionSlot(void) ReplicationSlotSave(); } +/* + * Create and acquire the replication slot used to retain information for + * conflict detection, if not yet. + */ +void +CreateConflictDetectionSlot(void) +{ + /* Exit early, if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + ereport(LOG, + errmsg("creating replication conflict detection slot")); + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false, + false, false); + + init_conflict_slot_xmin(); +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e327ffe8e00..72deb83c228 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -418,6 +418,10 @@ typedef struct RetainDeadTuplesData long table_sync_wait_time; /* time spent waiting for table sync * to finish */ + bool wait_for_initial_xid; /* wait for the launcher to initialize + * the apply worker's + * oldest_nonremovable_xid */ + /* * The following fields are used to determine the timing for the next * round of transaction ID advancement. @@ -4375,10 +4379,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data) if (!MySubscription->retaindeadtuples) return false; - /* No need to advance if we have already stopped retaining */ - if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) - return false; - return true; } @@ -4416,6 +4416,33 @@ get_candidate_xid(RetainDeadTuplesData *rdt_data) TransactionId oldest_running_xid; TimestampTz now; + /* + * No need to advance if the apply worker has resumed retention but the + * launcher has not yet initialized slot.xmin and assigned it to + * oldest_nonremovable_xid. + * + * It might seem feasible to directly check the conflict detection + * slot.xmin instead of relying on the launcher to assign the worker's + * oldest_nonremovable_xid; however, that could lead to a race condition + * where slot.xmin is set to InvalidTransactionId immediately after the + * check. In such cases, oldest_nonremovable_xid would no longer be + * protected by a replication slot and could become unreliable if a + * wraparound occurs. + */ + if (rdt_data->wait_for_initial_xid) + { + TransactionId nonremovable_xid; + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + if (!TransactionIdIsValid(nonremovable_xid)) + return; + + rdt_data->wait_for_initial_xid = false; + } + /* * Use last_recv_time when applying changes in the loop to avoid * unnecessary system time retrieval. If last_recv_time is not available, @@ -4660,13 +4687,38 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) if (last_flushpos < rdt_data->remote_lsn) return; + /* + * If conflict info retention was previously stopped due to a timeout, and + * the time required to advance the non-removable transaction ID has now + * decreased to within acceptable limits, log a message. The next step is + * to wait for the launcher to initialize the oldest_nonremovable_xid. + */ + if (MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" resumes retaining the information for detecting conflicts", + MySubscription->name), + errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.", + LSN_FORMAT_ARGS(rdt_data->remote_lsn), + max_conflict_retention_duration)); + + rdt_data->wait_for_initial_xid = true; + } + /* * Reaching here means the remote WAL position has been received, and all * transactions up to that position on the publisher have been applied and * flushed locally. So, we can advance the non-removable transaction ID. + * + * However, if oldest_nonremovable_xid is invalid, indicating that + * retention was stopped and is now being resumed, refrain from updating + * oldest_nonremovable_xid until the launcher provides an initial value + * (see get_candidate_xid() for details). */ SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) + MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid; + MyLogicalRepWorker->stop_conflict_info_retention = false; SpinLockRelease(&MyLogicalRepWorker->relmutex); elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", @@ -4709,9 +4761,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data) * to InvalidTransactionId, notify the launcher to set the slot.xmin to * InvalidTransactionId as well, and return true. Return false otherwise. * - * Currently, the retention will not resume automatically unless user manually - * disables retain_dead_tuples and re-enables it after confirming that the - * replication slot has been dropped. + * The retention will resume automatically if the worker has confirmed that the + * retention duration is now within the max_conflict_retention_duration. */ static bool should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) @@ -4741,18 +4792,26 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data) rdt_data->table_sync_wait_time)) return false; - ereport(LOG, - errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information", - MySubscription->name), - errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", - max_conflict_retention_duration)); - - SpinLockAcquire(&MyLogicalRepWorker->relmutex); - MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; - SpinLockRelease(&MyLogicalRepWorker->relmutex); - - /* Notify launcher to update the conflict slot */ - ApplyLauncherWakeup(); + /* + * Log a message and reset relevant data when the worker is about to stop + * retaining conflict information. + */ + if (!MyLogicalRepWorker->stop_conflict_info_retention) + { + ereport(LOG, + errmsg("logical replication worker for subscription \"%s\" will stop retaining the information for detecting conflicts", + MySubscription->name), + errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.", + max_conflict_retention_duration)); + + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId; + MyLogicalRepWorker->stop_conflict_info_retention = true; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + /* Notify launcher to update the conflict slot */ + ApplyLauncherWakeup(); + } reset_retention_data_fields(rdt_data); @@ -5607,6 +5666,7 @@ InitializeLogRepWorker(void) */ if (am_leader_apply_worker() && MySubscription->retaindeadtuples && + !MyLogicalRepWorker->stop_conflict_info_retention && !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid)) { ereport(LOG, diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index b86c759394f..54a55e7c1bd 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -100,6 +100,13 @@ typedef struct LogicalRepWorker */ TransactionId oldest_nonremovable_xid; + /* + * Indicates whether the apply worker has stopped retaining information + * useful for conflict detection. This is used only when + * retain_dead_tuples is enabled. + */ + bool stop_conflict_info_retention; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; -- 2.50.1.windows.1