From 405ef344f33761eed6e8351937d7a416f385299e Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Mon, 22 Sep 2025 11:22:55 +0800 Subject: [PATCH v4 1/2] Fix unintended drop of active replication origins Currently, if two backends configure the same replication origin and one backend resets it first, the acquired_by flag is cleared without recognizing the active usage by the first backend. This can result in the unintended dropping of the origin, potentially leading to issues if the shared memory of the dropped origin is reused for a newly created origin. Such reuse could cause unpredictable advancement of a different slot by the remaining backend holding the memory of the dropped origin. This commit addresses the issue by introducing a reference count for replication origins. The count is incremented when a backend sets up the origin and decremented upon a reset. Also, the backend process which firstly acquired the origin does not release till other acquiring process releases it. This ensures that acquired_by flag cannot be zero while processes are actively using it. --- .../expected/parallel_session_origin.out | 46 ++++++++++- .../specs/parallel_session_origin.spec | 6 +- src/backend/replication/logical/origin.c | 81 +++++++++++++------ 3 files changed, 106 insertions(+), 27 deletions(-) diff --git a/contrib/test_decoding/expected/parallel_session_origin.out b/contrib/test_decoding/expected/parallel_session_origin.out index e515b39f7ce..8e41831fcbc 100644 --- a/contrib/test_decoding/expected/parallel_session_origin.out +++ b/contrib/test_decoding/expected/parallel_session_origin.out @@ -1,6 +1,6 @@ Parsed test spec with 2 sessions -starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s0_reset s1_reset +starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_add_message s0_store_lsn s1_add_message s1_store_lsn s0_compare s1_reset s0_reset step s0_setup: SELECT pg_replication_origin_session_setup('origin'); pg_replication_origin_session_setup ----------------------------------- @@ -65,15 +65,59 @@ step s0_compare: t (1 row) +step s1_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + step s0_reset: SELECT pg_replication_origin_session_reset(); pg_replication_origin_session_reset ----------------------------------- (1 row) + +starting permutation: s0_setup s0_is_setup s1_setup s1_is_setup s0_reset s1_reset s0_reset +step s0_setup: SELECT pg_replication_origin_session_setup('origin'); +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s0_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s1_setup: + SELECT pg_replication_origin_session_setup('origin', pid) + FROM pg_stat_activity + WHERE application_name = 'isolation/parallel_session_origin/s0'; + +pg_replication_origin_session_setup +----------------------------------- + +(1 row) + +step s1_is_setup: SELECT pg_replication_origin_session_is_setup(); +pg_replication_origin_session_is_setup +-------------------------------------- +t +(1 row) + +step s0_reset: SELECT pg_replication_origin_session_reset(); +ERROR: cannot reset replication origin with ID 1 because it is still in use by other processes step s1_reset: SELECT pg_replication_origin_session_reset(); pg_replication_origin_session_reset ----------------------------------- (1 row) +step s0_reset: SELECT pg_replication_origin_session_reset(); +pg_replication_origin_session_reset +----------------------------------- + +(1 row) + diff --git a/contrib/test_decoding/specs/parallel_session_origin.spec b/contrib/test_decoding/specs/parallel_session_origin.spec index c0e5fda0723..2253a7a14eb 100644 --- a/contrib/test_decoding/specs/parallel_session_origin.spec +++ b/contrib/test_decoding/specs/parallel_session_origin.spec @@ -53,4 +53,8 @@ step "s1_reset" { SELECT pg_replication_origin_session_reset(); } # Firstly s0 attaches to a origin and s1 attaches to the same. Both sessions # commits a transaction and store the local_lsn of the replication origin. # Compare LSNs and expect latter transaction (done by s1) has larger local_lsn. -permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s0_reset" "s1_reset" +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_add_message" "s0_store_lsn" "s1_add_message" "s1_store_lsn" "s0_compare" "s1_reset" "s0_reset" + +# Test that the origin cannot be released if another session is actively using +# it. +permutation "s0_setup" "s0_is_setup" "s1_setup" "s1_is_setup" "s0_reset" "s1_reset" "s0_reset" diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 04bc704a332..389d2b38d20 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -130,6 +130,9 @@ typedef struct ReplicationState */ int acquired_by; + /* Count of backends that are currently using this origin. */ + int refcount; + /* * Condition variable that's signaled when acquired_by changes. */ @@ -1069,32 +1072,47 @@ replorigin_get_progress(RepOriginId node, bool flush) return remote_lsn; } -/* - * Tear down a (possibly) configured session replication origin during process - * exit. - */ +/* Helpful function to reset the session replication origin */ static void -ReplicationOriginExitCleanup(int code, Datum arg) +replorigin_session_reset_internal(void) { - ConditionVariable *cv = NULL; + ConditionVariable *cv; - if (session_replication_state == NULL) - return; + Assert(session_replication_state != NULL); LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); - if (session_replication_state->acquired_by == MyProcPid) - { - cv = &session_replication_state->origin_cv; + Assert(session_replication_state->refcount > 0); + /* + * Reset the PID only if the current backend is the first to set up this + * origin. This prevents resetting the PID when other backends are still + * using this origin. + */ + if (session_replication_state->acquired_by == MyProcPid) session_replication_state->acquired_by = 0; - session_replication_state = NULL; - } + + session_replication_state->refcount--; + + cv = &session_replication_state->origin_cv; + session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); - if (cv) - ConditionVariableBroadcast(cv); + ConditionVariableBroadcast(cv); +} + +/* + * Tear down a (possibly) configured session replication origin during process + * exit. + */ +static void +ReplicationOriginExitCleanup(int code, Datum arg) +{ + if (session_replication_state == NULL) + return; + + replorigin_session_reset_internal(); } /* @@ -1205,9 +1223,17 @@ replorigin_session_setup(RepOriginId node, int acquired_by) Assert(session_replication_state->roident != InvalidRepOriginId); if (acquired_by == 0) + { session_replication_state->acquired_by = MyProcPid; + Assert(session_replication_state->refcount == 0); + } else + { Assert(session_replication_state->acquired_by == acquired_by); + Assert(session_replication_state->refcount > 0); + } + + session_replication_state->refcount++; LWLockRelease(ReplicationOriginLock); @@ -1224,8 +1250,6 @@ replorigin_session_setup(RepOriginId node, int acquired_by) void replorigin_session_reset(void) { - ConditionVariable *cv; - Assert(max_active_replication_origins != 0); if (session_replication_state == NULL) @@ -1233,15 +1257,22 @@ replorigin_session_reset(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); - LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); - - session_replication_state->acquired_by = 0; - cv = &session_replication_state->origin_cv; - session_replication_state = NULL; - - LWLockRelease(ReplicationOriginLock); + /* + * The replication origin cannot be reset if the replication origin is + * firstly acquired by this backend and other processes are actively using + * now. This can cause acquired_by to be zero and active replication + * origin might be dropped. + */ + if (session_replication_state->acquired_by == MyProcPid && + session_replication_state->refcount > 1) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot reset replication origin with ID %d because it is still in use by other processes", + session_replication_state->roident), + errdetail("This session is the first process for this replication origin, and other processes are currently sharing it."), + errhint("Reset the replication origin in all other processes before retrying."))); - ConditionVariableBroadcast(cv); + replorigin_session_reset_internal(); } /* -- 2.47.3