From ccc2e4c76779d765a104b0de69f0154ddb71e36a Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Tue, 11 Nov 2025 18:12:53 +0800 Subject: [PATCH v3] Fix a race condition of updating procArray->replication_slot_xmin. Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest xmin across all slots while not holding ProcArrayLock if already_locked is false, and acquires the ProcArrayLock just before updating the replication slot xmin. Therefore, if a process calls ReplicationSlotsComputeRequiredXmin() with already_locked being false and another process updates the replication slot xmin before the process acquiring the lock, the slot xmin was overwritten with an old value. In the reported failure, a walsender for an apply worker computes InvalidTransaction as the oldest xmin and overwrote a valid replication slot xmin value computed by a walsender for a tablesync worker with this value. Then the walsender for a tablesync worker ended up computing the transaction id by GetOldestSafeDecodingTransactionId() without considering replication slot xmin. That led to an error ""cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u", which was an assertion failure prior to 240e0dbacd3. This commit changes ReplicationSlotsComputeRequiredXmin() so that it computes the oldest xmin while holding ProcArrayLock in exclusive mode. We keep already_locked parameter in ProcArraySetReplicationSlotXmin() on backbranches to not break ABI compatibility. --- src/backend/replication/logical/launcher.c | 2 ++ src/backend/replication/logical/logical.c | 12 ++++---- src/backend/replication/logical/slotsync.c | 2 ++ src/backend/replication/slot.c | 32 ++++++++++++++++++---- 4 files changed, 38 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6214028eda9..86aced9bdf5 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void) Assert(MyReplicationSlot && !TransactionIdIsValid(MyReplicationSlot->data.xmin)); + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(false); @@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void) ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); /* Write this slot to disk */ ReplicationSlotMarkDirty(); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 866f92cf799..e9a07e67a73 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -405,11 +405,11 @@ CreateInitDecodingContext(const char *plugin, * without further interlock its return value might immediately be out of * date. * - * So we have to acquire the ProcArrayLock to prevent computation of new - * xmin horizons by other backends, get the safe decoding xid, and inform - * the slot machinery about the new limit. Once that's done the - * ProcArrayLock can be released as the slot machinery now is - * protecting against vacuum. + * So we have to acquire both the ReplicationSlotControlLock and the + * ProcArrayLock to prevent concurrent computation and update of new xmin + * horizons by other backends, get the safe decoding xid, and inform the + * slot machinery about the new limit. Once that's done the both locks + * can be released as the slot machinery now is protecting against vacuum. * * Note that, temporarily, the data, not just the catalog, xmin has to be * reserved if a data snapshot is to be exported. Otherwise the initial @@ -422,6 +422,7 @@ CreateInitDecodingContext(const char *plugin, * * ---- */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot); @@ -436,6 +437,7 @@ CreateInitDecodingContext(const char *plugin, ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc9..84f1d62b572 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -775,6 +775,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) reserve_wal_for_local_slot(remote_slot->restart_lsn); + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(true); SpinLockAcquire(&slot->mutex); @@ -783,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); update_and_persist_local_synced_slot(remote_slot, remote_dbid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 1ec1e997b27..6712c6ee7c7 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1170,8 +1170,11 @@ ReplicationSlotPersist(void) /* * Compute the oldest xmin across all slots and store it in the ProcArray. * - * If already_locked is true, ProcArrayLock has already been acquired - * exclusively. + * If already_locked is true, both the ReplicationSlotControlLock and + * the ProcArrayLock have already been acquired exclusively. + * + * Note that the ReplicationSlotControlLock must be locked first to avoid + * deadlocks. */ void ReplicationSlotsComputeRequiredXmin(bool already_locked) @@ -1181,8 +1184,26 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) TransactionId agg_catalog_xmin = InvalidTransactionId; Assert(ReplicationSlotCtl != NULL); + Assert(!already_locked || + (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) && + LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE))); - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + /* + * Hold the ReplicationSlotControlLock exclusive until after updating the + * slot xmin values, so no backend can compute and update the new value + * concurrently. + * + * One might think that we can hold the ProcArrayLock exclusively, compute + * the xmin values while holding the ReplicationSlotControlLock in shared + * mode, and update the slot xmin values, but it could increase lock + * contention on the ProcArrayLock, which is not great since this function + * can be called at non-negligible frequency. + * + * We instead increase lock contention on the ReplicationSlotControlLock + * but it would be less harmful. + */ + if (!already_locked) + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); for (i = 0; i < max_replication_slots; i++) { @@ -1217,9 +1238,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) agg_catalog_xmin = effective_catalog_xmin; } - LWLockRelease(ReplicationSlotControlLock); - ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); + + if (!already_locked) + LWLockRelease(ReplicationSlotControlLock); } /* -- 2.31.1