From eb8bb9eda4c1ea5fe2abe87df66ef2b86cdb2441 Mon Sep 17 00:00:00 2001 From: Bertrand Drouvot Date: Thu, 11 Jan 2024 13:57:53 +0000 Subject: [PATCH v3] Fix race condition in InvalidatePossiblyObsoleteSlot() In case of an active slot we proceed in 2 steps: - terminate the backend holding the slot - report the slot as obsolete This is racy because between the two we release the mutex on the slot, which means the effective_xmin and effective_catalog_xmin could advance during that time. Holding the mutex longer is not an option (given what we'd to do while holding it) so record the previous LSNs instead that were used during the backend termination. --- src/backend/replication/slot.c | 39 ++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) 100.0% src/backend/replication/ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2180a38063..33f76c4acc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1454,6 +1454,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { int last_signaled_pid = 0; bool released_lock = false; + bool terminated = false; + XLogRecPtr initial_effective_xmin = InvalidXLogRecPtr; + XLogRecPtr initial_catalog_effective_xmin = InvalidXLogRecPtr; + XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; + ReplicationSlotInvalidationCause conflict_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; for (;;) { @@ -1488,11 +1493,24 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, */ if (s->data.invalidated == RS_INVAL_NONE) { + /* + * We'll release the slot's mutex soon, so it's possible that + * those values change since the process holding the slot has been + * terminated (if any), so record them here to ensure we would + * report the slot as obsolete correctly. + */ + if (!terminated) + { + initial_restart_lsn = s->data.restart_lsn; + initial_effective_xmin = s->effective_xmin; + initial_catalog_effective_xmin = s->effective_catalog_xmin; + } + switch (cause) { case RS_INVAL_WAL_REMOVED: - if (s->data.restart_lsn != InvalidXLogRecPtr && - s->data.restart_lsn < oldestLSN) + if (initial_restart_lsn != InvalidXLogRecPtr && + initial_restart_lsn < oldestLSN) conflict = cause; break; case RS_INVAL_HORIZON: @@ -1501,12 +1519,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* invalid DB oid signals a shared relation */ if (dboid != InvalidOid && dboid != s->data.database) break; - if (TransactionIdIsValid(s->effective_xmin) && - TransactionIdPrecedesOrEquals(s->effective_xmin, + if (TransactionIdIsValid(initial_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_effective_xmin, snapshotConflictHorizon)) conflict = cause; - else if (TransactionIdIsValid(s->effective_catalog_xmin) && - TransactionIdPrecedesOrEquals(s->effective_catalog_xmin, + else if (TransactionIdIsValid(initial_catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, snapshotConflictHorizon)) conflict = cause; break; @@ -1519,6 +1537,13 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, } } + /* + * Assert that the conflict cause recorded previously before we + * terminate the process did not change now for any reason. + */ + Assert(!(conflict_prev != RS_INVAL_NONE && terminated && + conflict_prev != conflict)); + /* if there's no conflict, we're done */ if (conflict == RS_INVAL_NONE) { @@ -1601,6 +1626,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, (void) kill(active_pid, SIGTERM); last_signaled_pid = active_pid; + terminated = true; + conflict_prev = conflict; } /* Wait until the slot is released. */ -- 2.34.1