Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Andres Freund |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | 20230405182835.ehufe4fj2zx3pjix@awork3.anarazel.de Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys ("Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>) |
Responses |
Re: Minimal logical decoding on standbys
|
List | pgsql-hackers |
Hi, On 2023-04-05 17:56:14 +0200, Drouvot, Bertrand wrote: > @@ -7963,6 +7963,23 @@ xlog_redo(XLogReaderState *record) > /* Update our copy of the parameters in pg_control */ > memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); > > + /* > + * Invalidate logical slots if we are in hot standby and the primary > + * does not have a WAL level sufficient for logical decoding. No need > + * to search for potentially conflicting logically slots if standby is > + * running with wal_level lower than logical, because in that case, we > + * would have either disallowed creation of logical slots or > + * invalidated existing ones. > + */ > + if (InRecovery && InHotStandby && > + xlrec.wal_level < WAL_LEVEL_LOGICAL && > + wal_level >= WAL_LEVEL_LOGICAL) > + { > + TransactionId ConflictHorizon = InvalidTransactionId; > + > + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, InvalidOid, &ConflictHorizon); > + } I mentioned this before, but I still don't understand why InvalidateObsoleteReplicationSlots() accepts ConflictHorizon as a pointer. It's not even modified, as far as I can see? > /* > * Report shared-memory space needed by ReplicationSlotsShmemInit. > */ > @@ -855,8 +862,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) > SpinLockAcquire(&s->mutex); > effective_xmin = s->effective_xmin; > effective_catalog_xmin = s->effective_catalog_xmin; > - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && > - XLogRecPtrIsInvalid(s->data.restart_lsn)); > + invalidated = ObsoleteSlotIsInvalid(s, true) || LogicalReplicationSlotIsInvalid(s); > SpinLockRelease(&s->mutex); I don't understand why we need to have two different functions for this. > /* invalidated slots need not apply */ > @@ -1225,28 +1231,92 @@ ReplicationSlotReserveWal(void) > } > } > > + > +/* > + * Report terminating or conflicting message. > + * > + * For both, logical conflict on standby and obsolete slot are handled. > + */ > +static void > +ReportTerminationInvalidation(bool terminating, bool check_on_xid, int pid, > + NameData slotname, TransactionId *xid, > + XLogRecPtr restart_lsn, XLogRecPtr oldestLSN) > +{ > + StringInfoData err_msg; > + StringInfoData err_detail; > + bool hint = false; > + > + initStringInfo(&err_detail); > + > + if (check_on_xid) > + { > + if (!terminating) > + { > + initStringInfo(&err_msg); > + appendStringInfo(&err_msg, _("invalidating replication slot \"%s\" because it conflicts with recovery"), > + NameStr(slotname)); I still don't think the main error message should differ between invalidating a slot due recovery and max_slot_wal_keep_size. > + > /* > - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot > - * and mark it invalid, if necessary and possible. > + * Helper for InvalidateObsoleteReplicationSlots > + * > + * Acquires the given slot and mark it invalid, if necessary and possible. > * > * Returns whether ReplicationSlotControlLock was released in the interim (and > * in that case we're not holding the lock at return, otherwise we are). > * > - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) > + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) What's the point of making this specific to "obsolete slots"? > * This is inherently racy, because we release the LWLock > * for syscalls, so caller must restart if we return true. > */ > static bool > InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > - bool *invalidated) > + bool *invalidated, TransactionId *xid) > { > int last_signaled_pid = 0; > bool released_lock = false; > + bool check_on_xid; > + > + check_on_xid = xid ? true : false; > > for (;;) > { > XLogRecPtr restart_lsn; > + > NameData slotname; > int active_pid = 0; > > @@ -1263,19 +1333,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, > * Check if the slot needs to be invalidated. If it needs to be > * invalidated, and is not currently acquired, acquire it and mark it > * as having been invalidated. We do this with the spinlock held to > - * avoid race conditions -- for example the restart_lsn could move > - * forward, or the slot could be dropped. > + * avoid race conditions -- for example the restart_lsn (or the > + * xmin(s) could) move forward or the slot could be dropped. > */ > SpinLockAcquire(&s->mutex); > > restart_lsn = s->data.restart_lsn; > > /* > - * If the slot is already invalid or is fresh enough, we don't need to > - * do anything. > + * If the slot is already invalid or is a non conflicting slot, we > + * don't need to do anything. > */ > - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) > + if (DoNotInvalidateSlot(s, xid, &oldestLSN)) DoNotInvalidateSlot() seems odd to me, and makes the code harder to understand. I'd make it something like: if (!SlotIsInvalid(s) && ( LogicalSlotConflictsWith(s, xid) || SlotConflictsWithLSN(s, lsn))) > /* > - * Mark any slot that points to an LSN older than the given segment > - * as invalid; it requires WAL that's about to be removed. > + * Invalidate Obsolete slots or resolve recovery conflicts with logical slots. I don't like that this spreads "obsolete slots" around further - it's very unspecific. A logical slot that needs to be removed due to an xid conflict is just as obsolete as one that needs to be removed due to max_slot_wal_keep_size. I'd rephrase this to be about required resources getting removed or such, one case of that is WAL another case is xids. > restart: > LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); > @@ -1414,21 +1505,35 @@ restart: > if (!s->in_use) > continue; > > - if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated)) > + if (xid) > { > - /* if the lock was released, start from scratch */ > - goto restart; > + /* we are only dealing with *logical* slot conflicts */ > + if (!SlotIsLogical(s)) > + continue; > + > + /* > + * not the database of interest and we don't want all the > + * database, skip > + */ > + if (s->data.database != dboid && TransactionIdIsValid(*xid)) > + continue; ISTM that this should be in InvalidatePossiblyObsoleteSlot(). > /* > - * If any slots have been invalidated, recalculate the resource limits. > + * If any slots have been invalidated, recalculate the required xmin and > + * the required lsn (if appropriate). > */ > if (invalidated) > { > ReplicationSlotsComputeRequiredXmin(false); > - ReplicationSlotsComputeRequiredLSN(); > + if (!xid) > + ReplicationSlotsComputeRequiredLSN(); > } Why make this conditional? If we invalidated a logical slot, we also don't require as much WAL anymore, no? > @@ -491,6 +493,9 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, > PROCSIG_RECOVERY_CONFLICT_SNAPSHOT, > WAIT_EVENT_RECOVERY_CONFLICT_SNAPSHOT, > true); > + > + if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) > + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, locator.dbOid, &snapshotConflictHorizon); > } Hm. Is there a reason for doing this before resolving conflicts with existing sessions? Another issue: ResolveRecoveryConflictWithVirtualXIDs() takes WaitExceedsMaxStandbyDelay() into account, but InvalidateObsoleteReplicationSlots() does not. I think that's ok, because the setup should prevent this case from being reached in normal paths, but at least there should be a comment documenting this. > +static inline bool > +LogicalReplicationSlotXidsConflict(ReplicationSlot *s, TransactionId xid) > +{ > + TransactionId slot_effective_xmin; > + TransactionId slot_catalog_xmin; > + > + slot_effective_xmin = s->effective_xmin; > + slot_catalog_xmin = s->data.catalog_xmin; > + > + return (((TransactionIdIsValid(slot_effective_xmin) && TransactionIdPrecedesOrEquals(slot_effective_xmin, xid)) || > + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin, xid)))); > +} return -ETOOMANYPARENS > +static inline bool > +SlotIsFreshEnough(ReplicationSlot *s, XLogRecPtr oldestLSN) > +{ > + return (s->data.restart_lsn >= oldestLSN); > +} > + > +static inline bool > +LogicalSlotIsNotConflicting(ReplicationSlot *s, TransactionId *xid) > +{ > + return (TransactionIdIsValid(*xid) && !LogicalReplicationSlotXidsConflict(s, *xid)); > +} > + > +static inline bool > +DoNotInvalidateSlot(ReplicationSlot *s, TransactionId *xid, XLogRecPtr *oldestLSN) > +{ > + if (xid) > + return (LogicalReplicationSlotIsInvalid(s) || LogicalSlotIsNotConflicting(s, xid)); > + else > + return (ObsoleteSlotIsInvalid(s, false) || SlotIsFreshEnough(s, *oldestLSN)); > + > +} See above for some more comments. But please don't accept stuff via pointer if you don't have a reason for it. There's no reason for it for xid and oldestLSN afaict. > diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c > index dbe9394762..186e4ef600 100644 > --- a/src/backend/access/transam/xlogrecovery.c > +++ b/src/backend/access/transam/xlogrecovery.c > @@ -1935,6 +1935,30 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl > XLogRecoveryCtl->lastReplayedTLI = *replayTLI; > SpinLockRelease(&XLogRecoveryCtl->info_lck); > > + /* > + * Wakeup walsenders: > + * > + * On the standby, the WAL is flushed first (which will only wake up > + * physical walsenders) and then applied, which will only wake up logical > + * walsenders. > + * Indeed, logical walsenders on standby can't decode and send data until > + * it's been applied. > + * > + * Physical walsenders don't need to be waked up during replay unless s/waked/woken/ > + * cascading replication is allowed and time line change occured (so that > + * they can notice that they are on a new time line). > + * > + * That's why the wake up conditions are for: > + * > + * - physical walsenders in case of new time line and cascade > + * replication is allowed. > + * - logical walsenders in case of new time line or recovery is in progress > + * (logical decoding on standby). > + */ > + WalSndWakeup(switchedTLI && AllowCascadeReplication(), > + switchedTLI || RecoveryInProgress()); I don't think it's possible to get here without RecoveryInProgress() being true. So we don't need that condition. > @@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli) > /* Signal the startup process and walsender that new WAL has arrived */ > WakeupRecovery(); > if (AllowCascadeReplication()) > - WalSndWakeup(); > + WalSndWakeup(true, !RecoveryInProgress()); Same comment as earlier. > /* Report XLOG streaming progress in PS display */ > if (update_process_title) > diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c > index 2d908d1de2..5c68ebb79e 100644 > --- a/src/backend/replication/walsender.c > +++ b/src/backend/replication/walsender.c > @@ -2628,6 +2628,23 @@ InitWalSenderSlot(void) > walsnd->sync_standby_priority = 0; > walsnd->latch = &MyProc->procLatch; > walsnd->replyTime = 0; > + > + /* > + * The kind assignment is done here and not in StartReplication() > + * and StartLogicalReplication(). Indeed, the logical walsender > + * needs to read WAL records (like snapshot of running > + * transactions) during the slot creation. So it needs to be woken > + * up based on its kind. > + * > + * The kind assignment could also be done in StartReplication(), > + * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it > + * seems better to set it on one place. > + */ Doesn't that mean we'll wake up logical walsenders even if they're doing normal query processing? > + if (MyDatabaseId == InvalidOid) > + walsnd->kind = REPLICATION_KIND_PHYSICAL; > + else > + walsnd->kind = REPLICATION_KIND_LOGICAL; > + > SpinLockRelease(&walsnd->mutex); > /* don't need the lock anymore */ > MyWalSnd = (WalSnd *) walsnd; > @@ -3310,30 +3327,39 @@ WalSndShmemInit(void) > } > > /* > - * Wake up all walsenders > + * Wake up physical, logical or both walsenders kind > + * > + * The distinction between physical and logical walsenders is done, because: > + * - physical walsenders can't send data until it's been flushed > + * - logical walsenders on standby can't decode and send data until it's been > + * applied > + * > + * For cascading replication we need to wake up physical > + * walsenders separately from logical walsenders (see the comment before calling > + * WalSndWakeup() in ApplyWalRecord() for more details). > * > * This will be called inside critical sections, so throwing an error is not > * advisable. > */ > void > -WalSndWakeup(void) > +WalSndWakeup(bool physical, bool logical) > { > int i; > > for (i = 0; i < max_wal_senders; i++) > { > Latch *latch; > + ReplicationKind kind; > WalSnd *walsnd = &WalSndCtl->walsnds[i]; > > - /* > - * Get latch pointer with spinlock held, for the unlikely case that > - * pointer reads aren't atomic (as they're 8 bytes). > - */ > + /* get latch pointer and kind with spinlock helds */ > SpinLockAcquire(&walsnd->mutex); > latch = walsnd->latch; > + kind = walsnd->kind; > SpinLockRelease(&walsnd->mutex); > > - if (latch != NULL) > + if (latch != NULL && ((physical && kind == REPLICATION_KIND_PHYSICAL) || > + (logical && kind == REPLICATION_KIND_LOGICAL))) > SetLatch(latch); > } > } I'd consider rewriting this to something like: if (latch == NULL) continue; if ((physical && kind == REPLICATION_KIND_PHYSICAL)) || (logical && kind == REPLICATION_KIND_LOGICAL) SetLatch(latch) Greetings, Andres Freund
pgsql-hackers by date: