From 51b64d83b1c6cd8798693c5a2593fa7bce29565c Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Wed, 24 Dec 2025 09:17:27 +0800 Subject: [PATCH v4 1/2] Refactor replication origin state reset helpers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Factor out common logic for clearing per-transaction and per-session replication origin state into a dedicated helper function. This removes duplicated assignments of replorigin_session_origin, replorigin_session_origin_lsn, and replorigin_session_origin_timestamp across multiple call sites, and makes the intended scope of each reset (clear per-transaction state vs. clear per-session state) explicit. No functional change intended. Author: Chao Li Reviewed-by: Ashutosh Bapat Reviewed-by: Álvaro Herrera Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com --- src/backend/replication/logical/origin.c | 20 +++++++++++++++----- src/backend/replication/logical/tablesync.c | 4 +--- src/backend/replication/logical/worker.c | 14 ++++++-------- src/include/replication/origin.h | 1 + 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 2380f369578..fc9e14ecbc5 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1287,6 +1287,19 @@ replorigin_session_get_progress(bool flush) return remote_lsn; } +/* + * Clear session replication origin state. + * + * If xact_only is true, only clear the per-transaction state. + */ +void +replorigin_session_clear_state(bool xact_only) +{ + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; + if (!xact_only) + replorigin_session_origin = InvalidRepOriginId; +} /* --------------------------------------------------------------------------- @@ -1412,9 +1425,7 @@ pg_replication_origin_session_reset(PG_FUNCTION_ARGS) replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_session_clear_state(false); PG_RETURN_VOID(); } @@ -1482,8 +1493,7 @@ pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(true, false); - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_session_clear_state(true); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 2522e372036..47104ed676c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -323,9 +323,7 @@ ProcessSyncingTablesForSync(XLogRecPtr current_lsn) * This is needed to allow the origin to be dropped. */ replorigin_session_reset(); - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_session_clear_state(false); /* * Drop the tablesync's origin tracking if exists. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 718408bb599..4df177664b7 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -627,7 +627,7 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); -static void replorigin_reset(int code, Datum arg); +static void on_exit_clear_state(int code, Datum arg); /* * Form the origin name for the subscription. @@ -5594,7 +5594,7 @@ start_apply(XLogRecPtr origin_startpos) * transaction loss as that transaction won't be sent again by the * server. */ - replorigin_reset(0, (Datum) 0); + replorigin_session_clear_state(false); if (MySubscription->disableonerr) DisableSubscriptionAndExit(); @@ -5865,18 +5865,16 @@ InitializeLogRepWorker(void) * replication workers that set up origins and apply remote transactions * are protected. */ - before_shmem_exit(replorigin_reset, (Datum) 0); + before_shmem_exit(on_exit_clear_state, (Datum) 0); } /* - * Reset the origin state. + * Callback on exit to reset the origin state. */ static void -replorigin_reset(int code, Datum arg) +on_exit_clear_state(int code, Datum arg) { - replorigin_session_origin = InvalidRepOriginId; - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_session_clear_state(false); } /* diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 2a73f6aa492..ab34ef97c46 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -65,6 +65,7 @@ extern void replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit); extern void replorigin_session_setup(RepOriginId node, int acquired_by); extern void replorigin_session_reset(void); +extern void replorigin_session_clear_state(bool xact_only); extern XLogRecPtr replorigin_session_get_progress(bool flush); /* Checkpoint/Startup integration */ -- 2.39.5 (Apple Git-154)