From 79176f05def67a3cd2236c1ad9465959860d26cc Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Tue, 22 Jul 2025 07:45:26 -0400 Subject: [PATCH v3] Fix a possible deadlock during ALTER SUBSCRIPTION ... DROP PUBLICATION In most situations the tablesync worker will drop the corresponding origin before it finishes executing, but if an error causes the tablesync worker to fail just prior to dropping the origin, or if it is delayed or it didn't get cpu time, the apply worker could find the origin and attempt to drop the origin. During this time if the user simultaneously issues an ALTER SUBSCRIPTION ... DROP PUBLICATION, there is a possibility of a deadlock between the apply worker and the user process, because of the order in which the locks are taken. The fix is to get locks on SubscriptionRelationId, SubscriptionRelRelationId and ReplicationOriginRelationId in that order when dropping tracking origins. --- src/backend/catalog/pg_subscription.c | 5 +++-- src/backend/commands/subscriptioncmds.c | 10 ++++++++- src/backend/replication/logical/tablesync.c | 35 +++++++++++++++++++++++++---- src/include/catalog/pg_subscription_rel.h | 2 +- 4 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 1395032..7894314 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -319,7 +319,7 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state, */ void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn) + XLogRecPtr sublsn, bool lock_needed) { Relation rel; HeapTuple tup; @@ -327,7 +327,8 @@ UpdateSubscriptionRelState(Oid subid, Oid relid, char state, Datum values[Natts_pg_subscription_rel]; bool replaces[Natts_pg_subscription_rel]; - LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); + if (lock_needed) + LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock); rel = table_open(SubscriptionRelRelationId, RowExclusiveLock); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index e23b0de..cbcede1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1625,7 +1625,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, void DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) { - Relation rel; + Relation rel, sub_rel; ObjectAddress myself; HeapTuple tup; Oid subid; @@ -1772,7 +1772,12 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Note that the state can't change because we have already stopped both * the apply and tablesync workers and they can't restart because of * exclusive lock on the subscription. + * + * Lock pg_subscription_rel with AccessExclusiveLock to prevent any + * deadlock with apply workers of other subscriptions trying to drop + * tracking origin. */ + sub_rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); rstates = GetSubscriptionRelations(subid, true); foreach(lc, rstates) { @@ -1795,6 +1800,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) replorigin_drop_by_name(originname, true, false); } + /* Once the origin tracking has been dropped, we can release lock */ + table_close(sub_rel, AccessExclusiveLock); + /* Clean up dependencies */ deleteSharedDependencyRecordsFor(SubscriptionRelationId, subid, 0); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e4fd634..10299d6 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -316,7 +316,8 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); /* * End streaming so that LogRepWorkerWalRcvConn can be used to drop @@ -425,6 +426,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) ListCell *lc; bool started_tx = false; bool should_exit = false; + Relation rel = NULL; Assert(!IsTransactionState()); @@ -492,7 +494,18 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * worker to remove the origin tracking as if there is any * error while dropping we won't restart it to drop the * origin. So passing missing_ok = true. + * + * Also lock SubscriptionRelationId with AccessShareLock and + * take AccessExclusiveLock on SubscriptionRelRelationId to + * prevent any deadlocks with the user concurrently performing + * refresh on the subscription. */ + LockSharedObject(SubscriptionRelationId, MyLogicalRepWorker->subid, + 0, AccessShareLock); + + if (!rel) + rel = table_open(SubscriptionRelRelationId, AccessExclusiveLock); + ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid, rstate->relid, originname, @@ -504,7 +517,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) */ UpdateSubscriptionRelState(MyLogicalRepWorker->subid, rstate->relid, rstate->state, - rstate->lsn); + rstate->lsn, false); } } else @@ -555,7 +568,14 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * This is required to avoid any undetected deadlocks * due to any existing lock as deadlock detector won't * be able to detect the waits on the latch. + * + * Also close any tables prior to the commit. */ + if (rel) + { + table_close(rel, AccessExclusiveLock); + rel = NULL; + } CommitTransactionCommand(); pgstat_report_stat(false); } @@ -622,6 +642,11 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) } } + /* close and unlock table if opened */ + if (rel) + table_close(rel, AccessExclusiveLock); + + if (started_tx) { /* @@ -1413,7 +1438,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); pgstat_report_stat(true); @@ -1546,7 +1572,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, SUBREL_STATE_FINISHEDCOPY, - MyLogicalRepWorker->relstate_lsn); + MyLogicalRepWorker->relstate_lsn, + true); CommitTransactionCommand(); diff --git a/src/include/catalog/pg_subscription_rel.h b/src/include/catalog/pg_subscription_rel.h index c91797c..d29a608 100644 --- a/src/include/catalog/pg_subscription_rel.h +++ b/src/include/catalog/pg_subscription_rel.h @@ -85,7 +85,7 @@ typedef struct SubscriptionRelState extern void AddSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool retain_lock); extern void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, - XLogRecPtr sublsn); + XLogRecPtr sublsn, bool lock_needed); extern char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn); extern void RemoveSubscriptionRel(Oid subid, Oid relid); -- 1.8.3.1