From 02ea832d505ee05b701c2e255a9f084a0b765624 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 7 Jan 2022 03:16:51 +0000 Subject: [PATCH v2] deduplicate min restart_lsn calculation code --- src/backend/access/heap/rewriteheap.c | 2 +- src/backend/replication/logical/logical.c | 5 +- src/backend/replication/logical/snapbuild.c | 2 +- src/backend/replication/slot.c | 89 ++++++--------------- src/backend/replication/slotfuncs.c | 8 +- src/backend/replication/walsender.c | 5 +- src/include/replication/slot.h | 3 +- 7 files changed, 43 insertions(+), 71 deletions(-) diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index 986a776bbd..7265ac0652 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -1204,7 +1204,7 @@ CheckPointLogicalRewriteHeap(void) redo = GetRedoRecPtr(); /* now check for the restart ptrs from existing slots */ - cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + cutoff = ReplicationSlotsComputeRequiredLSN(true); /* don't start earlier than the restart lsn */ if (cutoff != InvalidXLogRecPtr && redo < cutoff) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 10cbdea124..f61378fa2f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1771,12 +1771,15 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn) */ if (updated_xmin) { + XLogRecPtr min_required; + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->effective_catalog_xmin = MyReplicationSlot->data.catalog_xmin; SpinLockRelease(&MyReplicationSlot->mutex); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); } } else diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index dbdc172a2b..a0cb09836e 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -1935,7 +1935,7 @@ CheckPointSnapBuild(void) redo = GetRedoRecPtr(); /* now check for the restart ptrs from existing slots */ - cutoff = ReplicationSlotsComputeLogicalRestartLSN(); + cutoff = ReplicationSlotsComputeRequiredLSN(true); /* don't start earlier than the restart lsn */ if (redo < cutoff) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 90ba9b417d..d299bbf347 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -594,6 +594,7 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) { char path[MAXPGPATH]; char tmppath[MAXPGPATH]; + XLogRecPtr min_required; /* * If some other backend ran this code concurrently with us, we might try @@ -665,7 +666,8 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * limits. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); /* * If removing the directory fails, the worst thing that will happen is @@ -807,82 +809,37 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) } /* - * Compute the oldest restart LSN across all slots and inform xlog module. + * Compute the oldest restart LSN of replication slots + * + * When only_logical is true, compute for logical decoding slots only. * * Note: while max_slot_wal_keep_size is theoretically relevant for this * purpose, we don't try to account for that, because this module doesn't * know what to compare against. */ -void -ReplicationSlotsComputeRequiredLSN(void) -{ - int i; - XLogRecPtr min_required = InvalidXLogRecPtr; - - Assert(ReplicationSlotCtl != NULL); - - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (i = 0; i < max_replication_slots; i++) - { - ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn; - - if (!s->in_use) - continue; - - SpinLockAcquire(&s->mutex); - restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); - - if (restart_lsn != InvalidXLogRecPtr && - (min_required == InvalidXLogRecPtr || - restart_lsn < min_required)) - min_required = restart_lsn; - } - LWLockRelease(ReplicationSlotControlLock); - - XLogSetReplicationSlotMinimumLSN(min_required); -} - -/* - * Compute the oldest WAL LSN required by *logical* decoding slots.. - * - * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical - * slots exist. - * - * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it - * ignores physical replication slots. - * - * The results aren't required frequently, so we don't maintain a precomputed - * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin(). - */ XLogRecPtr -ReplicationSlotsComputeLogicalRestartLSN(void) +ReplicationSlotsComputeRequiredLSN(bool only_logical) { - XLogRecPtr result = InvalidXLogRecPtr; int i; + XLogRecPtr min_required = InvalidXLogRecPtr; if (max_replication_slots <= 0) return InvalidXLogRecPtr; - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + Assert(ReplicationSlotCtl != NULL); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { - ReplicationSlot *s; + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn; - s = &ReplicationSlotCtl->replication_slots[i]; - - /* cannot change while ReplicationSlotCtlLock is held */ if (!s->in_use) continue; - /* we're only interested in logical slots */ - if (!SlotIsLogical(s)) + if (only_logical && !SlotIsLogical(s)) continue; - /* read once, it's ok if it increases while we're checking */ SpinLockAcquire(&s->mutex); restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); @@ -890,14 +847,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void) if (restart_lsn == InvalidXLogRecPtr) continue; - if (result == InvalidXLogRecPtr || - restart_lsn < result) - result = restart_lsn; + if (min_required == InvalidXLogRecPtr || + restart_lsn < min_required) + min_required = restart_lsn; } - LWLockRelease(ReplicationSlotControlLock); - return result; + return min_required; } /* @@ -1096,6 +1052,7 @@ ReplicationSlotReserveWal(void) { XLogSegNo segno; XLogRecPtr restart_lsn; + XLogRecPtr min_required; /* * For logical slots log a standby snapshot and start logical decoding @@ -1133,7 +1090,8 @@ ReplicationSlotReserveWal(void) } /* prevent WAL removal as fast as possible */ - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); /* * If all required WAL is still there, great, otherwise retry. The @@ -1343,8 +1301,11 @@ restart: */ if (invalidated) { + XLogRecPtr min_required; + ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); } return invalidated; @@ -1396,6 +1357,7 @@ StartupReplicationSlots(void) { DIR *replication_dir; struct dirent *replication_de; + XLogRecPtr min_required; elog(DEBUG1, "starting up replication slots"); @@ -1441,7 +1403,8 @@ StartupReplicationSlots(void) /* Now that we have recovered all the data, compute replication xmin */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); } /* ---- diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d11daeb1fc..5def594640 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -608,6 +608,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) bool nulls[2]; HeapTuple tuple; Datum result; + XLogRecPtr min_required; Assert(!MyReplicationSlot); @@ -672,7 +673,8 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) * advancing potentially done. */ ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); ReplicationSlotRelease(); @@ -816,6 +818,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr copy_confirmed_flush; bool copy_islogical; char *copy_name; + XLogRecPtr min_required; /* Copy data of source slot again */ SpinLockAcquire(&src->mutex); @@ -873,7 +876,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredXmin(false); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); ReplicationSlotSave(); #ifdef USE_ASSERT_CHECKING diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 84915ed95b..cf0a65d975 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1977,8 +1977,11 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) if (changed) { + XLogRecPtr min_required; + ReplicationSlotMarkDirty(); - ReplicationSlotsComputeRequiredLSN(); + min_required = ReplicationSlotsComputeRequiredLSN(false); + XLogSetReplicationSlotMinimumLSN(min_required); } /* diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 53d773ccff..53ae2b3607 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -209,8 +209,7 @@ extern void ReplicationSlotMarkDirty(void); extern bool ReplicationSlotValidateName(const char *name, int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); -extern void ReplicationSlotsComputeRequiredLSN(void); -extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void); +extern XLogRecPtr ReplicationSlotsComputeRequiredLSN(bool only_logical); extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive); extern void ReplicationSlotsDropDBSlots(Oid dboid); extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno); -- 2.25.1