From 0518683a77837f337fbf6c2495f368439a1a4e7e Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Thu, 6 Jan 2022 16:16:53 +0000 Subject: [PATCH v1] deduplicate min restart_lsn calculation code --- src/backend/access/heap/rewriteheap.c | 2 +- src/backend/replication/logical/logical.c | 2 +- src/backend/replication/logical/snapbuild.c | 2 +- src/backend/replication/slot.c | 86 ++++++--------------- src/backend/replication/slotfuncs.c | 4 +- src/backend/replication/walsender.c | 2 +- src/include/replication/slot.h | 3 +- 7 files changed, 30 insertions(+), 71 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 986a776bbd..7e22e818a6 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1204,7 +1204,7 @@ CheckPointLogicalRewriteHeap(void) redo = GetRedoRecPtr(); /* now check for the restart ptrs from existing slots */ - cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + cutoff = ReplicationSlotsComputeRequiredLSN(true, false); /* don't start earlier than the restart lsn */ if (cutoff != InvalidXLogRecPtr && redo < cutoff) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 10cbdea124..dab4182d49 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1776,7 +1776,7 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); } } else diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index dbdc172a2b..11eda2b7be 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1935,7 +1935,7 @@ CheckPointSnapBuild(void) redo = GetRedoRecPtr(); /* now check for the restart ptrs from existing slots */ - cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + cutoff = ReplicationSlotsComputeRequiredLSN(true, false); /* don't start earlier than the restart lsn */ if (redo < cutoff) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 90ba9b417d..950559afc5 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -665,7 +665,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * limits. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); /* * If removing the directory fails, the worst thing that will happen is @@ -807,82 +807,40 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) } /* - * Compute the oldest restart LSN across all slots and inform xlog module. + * Compute the oldest restart LSN of replication slots + * + * When logical is true, compute for logical decoding slots and don't inform + * xlog module. + * + * When logical is false, compute for all slots and inform xlog module. * * Note: while max_slot_wal_keep_size is theoretically relevant for this * purpose, we don't try to account for that, because this module doesn't * know what to compare against. */ -void -ReplicationSlotsComputeRequiredLSN(void) -{ - int i; - XLogRecPtr min_required = InvalidXLogRecPtr; - - Assert(ReplicationSlotCtl != NULL); - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) - { - ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn; - - if (!s->in_use) - continue; - - SpinLockAcquire(&s->mutex); - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (restart_lsn != InvalidXLogRecPtr && - (min_required == InvalidXLogRecPtr || - restart_lsn < min_required)) - min_required = restart_lsn; - } - LWLockRelease(ReplicationSlotControlLock); - - XLogSetReplicationSlotMinimumLSN(min_required); -} - -/* - * Compute the oldest WAL LSN required by *logical* decoding slots.. - * - * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical - * slots exist. - * - * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it - * ignores physical replication slots. - * - * The results aren't required frequently, so we don't maintain a precomputed - * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin(). - */ XLogRecPtr -ReplicationSlotsComputeLogicalRestartLSN(void) +ReplicationSlotsComputeRequiredLSN(bool only_logical, bool write_to_xlog_shmem) { - XLogRecPtr result = InvalidXLogRecPtr; int i; + XLogRecPtr min_required = InvalidXLogRecPtr; if (max_replication_slots <= 0) return InvalidXLogRecPtr; - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + Assert(ReplicationSlotCtl != NULL); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { - ReplicationSlot *s; + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; - s = &ReplicationSlotCtl->replication_slots[i]; - - /* cannot change while ReplicationSlotCtlLock is held */ if (!s->in_use) continue; - /* we're only interested in logical slots */ - if (!SlotIsLogical(s)) + if (only_logical && !SlotIsLogical(s)) continue; - /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); @@ -890,14 +848,16 @@ ReplicationSlotsComputeLogicalRestartLSN(void) if (restart_lsn == InvalidXLogRecPtr) continue; - if (result == InvalidXLogRecPtr || - restart_lsn < result) - result = restart_lsn; + if (min_required == InvalidXLogRecPtr || + restart_lsn < min_required) + min_required = restart_lsn; } - LWLockRelease(ReplicationSlotControlLock); - return result; + if (write_to_xlog_shmem) + XLogSetReplicationSlotMinimumLSN(min_required); + + return min_required; } /* @@ -1133,7 +1093,7 @@ ReplicationSlotReserveWal(void) } /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); /* * If all required WAL is still there, great, otherwise retry. The @@ -1344,7 +1304,7 @@ restart: if (invalidated) { ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); } return invalidated; @@ -1441,7 +1401,7 @@ StartupReplicationSlots(void) /* Now that we have recovered all the data, compute replication xmin */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); } /* ---- diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d11daeb1fc..84d24097d9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -672,7 +672,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) * advancing potentially done. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); ReplicationSlotRelease(); @@ -873,7 +873,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); ReplicationSlotSave(); #ifdef USE_ASSERT_CHECKING diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 84915ed95b..ebf5d7c338 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1978,7 +1978,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (changed) { ReplicationSlotMarkDirty(); - ReplicationSlotsComputeRequiredLSN(); + (void) ReplicationSlotsComputeRequiredLSN(false, true); } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 53d773ccff..bd977197aa 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -209,8 +209,7 @@ extern void ReplicationSlotMarkDirty(void); extern bool ReplicationSlotValidateName(const char *name, int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); -extern void ReplicationSlotsComputeRequiredLSN(void); -extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); +extern XLogRecPtr ReplicationSlotsComputeRequiredLSN(bool only_logical, bool write_to_xlog_shmem); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); -- 2.25.1