From 241adc15d4175d66ee96414e360c9878a5f24023 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 12 May 2017 14:28:05 +0900 Subject: [PATCH 1/2] Fix a deadlock bug between DROP SUBSCRIPTION and apply worker. --- src/backend/commands/subscriptioncmds.c | 48 +++++++++++++++--------------- src/backend/replication/logical/launcher.c | 8 +++++ src/backend/storage/lmgr/lwlock.c | 2 +- src/backend/storage/lmgr/lwlocknames.txt | 1 + 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b76cdc5..1cb8353 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -811,18 +811,32 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) StringInfoData cmd; /* - * Lock pg_subscription with AccessExclusiveLock to ensure - * that the launcher doesn't restart new worker during dropping - * the subscription + * To prevent from launcher restarts the new worker, we need to + * keep holding LogicalRepLauncherLock until commit. On the + * other hand, keeping to hold LWLock until commit can be cause + * of deadlock. So we disallow DROP SUBSCRIPTION being called + * in a transaction block. + * + * XXX The command name should really be something like "DROP SUBSCRIPTION + * of a subscription that is associated with a replication slot", but we + * don't have the proper facilities for that. + */ + PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); + + /* + * Protect against launcher restarting the worker. This lock will + * be released at commit. */ - rel = heap_open(SubscriptionRelationId, AccessExclusiveLock); + LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); + + rel = heap_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCache2(SUBSCRIPTIONNAME, MyDatabaseId, CStringGetDatum(stmt->subname)); if (!HeapTupleIsValid(tup)) { - heap_close(rel, NoLock); + heap_close(rel, RowExclusiveLock); if (!stmt->missing_ok) ereport(ERROR, @@ -844,6 +858,9 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) aclcheck_error(ACLCHECK_NOT_OWNER, ACL_KIND_SUBSCRIPTION, stmt->subname); + /* Kill the apply worker so that the slot becomes accessible first. */ + logicalrep_worker_stop(subid, InvalidOid); + /* DROP hook for the subscription being removed */ InvokeObjectDropHook(SubscriptionRelationId, subid, 0); @@ -873,20 +890,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) else slotname = NULL; - /* - * Since dropping a replication slot is not transactional, the replication - * slot stays dropped even if the transaction rolls back. So we cannot - * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. - * - * XXX The command name should really be something like "DROP SUBSCRIPTION - * of a subscription that is associated with a replication slot", but we - * don't have the proper facilities for that. - */ - if (slotname) - PreventTransactionChain(isTopLevel, "DROP SUBSCRIPTION"); - - ObjectAddressSet(myself, SubscriptionRelationId, subid); EventTriggerSQLDropAddObject(&myself, true, true); @@ -901,9 +904,6 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* Remove any associated relation synchronization states. */ RemoveSubscriptionRel(subid, InvalidOid); - /* Kill the apply worker so that the slot becomes accessible. */ - logicalrep_worker_stop(subid, InvalidOid); - /* Remove the origin tracking if exists. */ snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); @@ -913,7 +913,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) /* If there is no slot associated with the subscription, we can finish here. */ if (!slotname) { - heap_close(rel, NoLock); + heap_close(rel, RowExclusiveLock); return; } @@ -964,7 +964,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) pfree(cmd.data); - heap_close(rel, NoLock); + heap_close(rel, RowExclusiveLock); } /* diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 09c87d7..dfce49d 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -405,6 +405,9 @@ retry: /* * Stop the logical replication worker and wait until it detaches from the * slot. + * + * The caller must hold LogicalRepLauncherLock to ensure that new workers are + * not being started during this function call. */ void logicalrep_worker_stop(Oid subid, Oid relid) @@ -832,6 +835,9 @@ ApplyLauncherMain(Datum main_arg) ALLOCSET_DEFAULT_MAXSIZE); oldctx = MemoryContextSwitchTo(subctx); + /* Block any concurrent DROP SUBSCRIPTION */ + LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE); + /* search for subscriptions to start or stop. */ sublist = get_subscription_list(); @@ -855,6 +861,8 @@ ApplyLauncherMain(Datum main_arg) } } + LWLockRelease(LogicalRepLauncherLock); + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 3e13394..949b333 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -494,7 +494,7 @@ RegisterLWLockTranches(void) if (LWLockTrancheArray == NULL) { - LWLockTranchesAllocated = 64; + LWLockTranchesAllocated = 128; LWLockTrancheArray = (char **) MemoryContextAllocZero(TopMemoryContext, LWLockTranchesAllocated * sizeof(char *)); diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index e6025ec..eda66fe 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -50,3 +50,4 @@ OldSnapshotTimeMapLock 42 BackendRandomLock 43 LogicalRepWorkerLock 44 CLogTruncationLock 45 +LogicalRepLauncherLock 46 -- 2.8.1