From 6a9954df6811f8e8096410746b77091eff186c41 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 29 Jul 2025 03:25:50 -0400 Subject: [PATCH v6] Fix a deadlock during ALTER SUBSCRIPTION... DROP PUBLICATION When user drops a publication from a subscription, this will result in a publication refresh on the subscriber which will try and drop any pending origins. Meanwhile the apply worker could also be trying to cleanup origins. There could be a deadlock if the order of locking of SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId are not aligned between functions process_syncing_tables_for_apply() and AlterSubscription_refresh(). The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId in that order when dropping tracking origins. --- src/backend/catalog/pg_subscription.c | 32 +++++++++++++++++++++++++---- src/backend/replication/logical/tablesync.c | 27 +++++++++++++++++++++--- src/include/catalog/pg_subscription_rel.h | 2 ++ 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d07f88c..63ae485 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -273,18 +273,23 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, * Update the state of a subscription table. */ void -UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) +UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) { Relation rel; HeapTuple tup; bool nulls[Natts_pg_subscription_rel]; Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; + LOCKTAG tag PG_USED_FOR_ASSERTS_ONLY; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + Assert(CheckRelationOidLockedByMe(SubscriptionRelRelationId, + RowExclusiveLock, true)); - rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + SET_LOCKTAG_OBJECT(tag, InvalidOid, SubscriptionRelationId, subid, 0); + Assert(LockHeldByMe(&tag, AccessShareLock)); + + rel = table_open(SubscriptionRelRelationId, NoLock); /* Try finding existing mapping. */ tup = SearchSysCacheCopy2(SUBSCRIPTIONRELMAP, @@ -319,6 +324,25 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, } /* + * Update the state of a subscription table. + */ +void +UpdateSubscriptionRelState(Oid subid, Oid relid, char state, + XLogRecPtr sublsn) +{ + Relation rel; + + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + + UpdateSubscriptionRelStateEx(subid, relid, state, sublsn); + + /* close table */ + table_close(rel, NoLock); +} + +/* * Get state of subscription table. * * Returns SUBREL_STATE_UNKNOWN when the table is not in the subscription. diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index ca88133..67b5ea9 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -425,6 +425,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; bool should_exit = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -492,7 +493,16 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. + * + * Lock the subscription and origin in the same order as we + * are doing during DDL commands to avoid deadlocks. See + * AlterSubscription_refresh. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + if (!rel) + rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -502,9 +512,9 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) /* * Update the state to READY only after the origin cleanup. */ - UpdateSubscriptionRelState(MyLogicalRepWorker->subid, - rstate->relid, rstate->state, - rstate->lsn); + UpdateSubscriptionRelStateEx(MyLogicalRepWorker->subid, + rstate->relid, rstate->state, + rstate->lsn); } } else @@ -555,7 +565,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, NoLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -621,6 +638,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* Close table if opened */ + if (rel) + table_close(rel, NoLock); + if (started_tx) { /* diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index 60a2bcc..df09c75 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -84,6 +84,8 @@ extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn); +extern void UpdateSubscriptionRelStateEx(Oid subid, Oid relid, char state, + XLogRecPtr sublsn); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 1.8.3.1