From 601cbb02265a5373d298e6803715d30c2370a111 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Sep 2025 18:15:33 +0900 Subject: [PATCH v8 3/3] Avoid setting ReplicationState in case of ERROR --- contrib/test_decoding/expected/replorigin.out | 3 ++ contrib/test_decoding/sql/replorigin.sql | 3 ++ src/backend/replication/logical/origin.c | 31 +++++++++++++------ 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index c85e1a01b23..4f64ea8942f 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -41,6 +41,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" DETAIL: Key (roname)=(regress_test_decoding: regression_slot) already exists. +-- ensure session setup with invalid pid fail +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1); +ERROR: could not find replication state slot for replication origin with OID 1 which was acquired by -1 --ensure deletions work (once) SELECT pg_replication_origin_create('regress_test_decoding: temp'); pg_replication_origin_create diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index e71ee02d050..d899d5cdc18 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -26,6 +26,9 @@ SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); -- ensure duplicate creations fail SELECT pg_replication_origin_create('regress_test_decoding: regression_slot'); +-- ensure session setup with invalid pid fail +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot', -1); + --ensure deletions work (once) SELECT pg_replication_origin_create('regress_test_decoding: temp'); SELECT pg_replication_origin_drop('regress_test_decoding: temp'); diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 98d47e1beb8..0bbc96bcee5 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1122,6 +1122,8 @@ replorigin_session_setup(RepOriginId node, int acquired_by) static bool registered_cleanup; int i; int free_slot = -1; + ReplicationState *candidate_state = NULL; + bool initialized = false; if (!registered_cleanup) { @@ -1168,34 +1170,43 @@ replorigin_session_setup(RepOriginId node, int acquired_by) } /* ok, found slot */ - session_replication_state = curstate; + candidate_state = curstate; break; } - if (session_replication_state == NULL && free_slot == -1) + if (candidate_state == NULL && free_slot == -1) ereport(ERROR, (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), errmsg("could not find free replication state slot for replication origin with ID %d", node), errhint("Increase \"max_active_replication_origins\" and try again."))); - else if (session_replication_state == NULL) + else if (candidate_state == NULL) { /* initialize new slot */ - session_replication_state = &replication_states[free_slot]; - Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); - Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); - session_replication_state->roident = node; + candidate_state = &replication_states[free_slot]; + Assert(candidate_state->remote_lsn == InvalidXLogRecPtr); + Assert(candidate_state->local_lsn == InvalidXLogRecPtr); + candidate_state->roident = node; + initialized = true; } - Assert(session_replication_state->roident != InvalidRepOriginId); + Assert(candidate_state->roident != InvalidRepOriginId); if (acquired_by == 0) - session_replication_state->acquired_by = MyProcPid; - else if (session_replication_state->acquired_by != acquired_by) + candidate_state->acquired_by = MyProcPid; + else if (candidate_state->acquired_by != acquired_by) + { + if (initialized) + candidate_state->roident = InvalidRepOriginId; + elog(ERROR, "could not find replication state slot for replication origin with OID %u which was acquired by %d", node, acquired_by); + } + + /* Candidate slot looks ok, use it */ + session_replication_state = candidate_state; LWLockRelease(ReplicationOriginLock); -- 2.47.3