Re: Minimal logical decoding on standbys - Mailing list pgsql-hackers
From | Drouvot, Bertrand |
---|---|
Subject | Re: Minimal logical decoding on standbys |
Date | |
Msg-id | da7184cf-c7b9-c333-801e-0e7507a23ddf@gmail.com Whole thread Raw |
In response to | Re: Minimal logical decoding on standbys (Andres Freund <andres@anarazel.de>) |
Responses |
Re: Minimal logical decoding on standbys
|
List | pgsql-hackers |
Hi, On 3/30/23 9:04 AM, Andres Freund wrote: > Hi, > > On 2023-03-04 12:19:57 +0100, Drouvot, Bertrand wrote: >> Subject: [PATCH v52 1/6] Add info in WAL records in preparation for logical >> slot conflict handling. > > This is a very nice commit message. Thanks! Melanie and Robert did provide great feedback/input to help make it as it is now. > I think this commit is ready to go. Unless somebody thinks differently, I > think I might push it tomorrow. Great! Once done, I'll submit a new patch so that GlobalVisTestFor() can make use of the heap relation in vacuumRedirectAndPlaceholder() (which will be possible once 0001 is committed). > >> Subject: [PATCH v52 2/6] Handle logical slot conflicts on standby. > > >> @@ -6807,7 +6808,8 @@ CreateCheckPoint(int flags) >> */ >> XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); >> KeepLogSeg(recptr, &_logSegNo); >> - if (InvalidateObsoleteReplicationSlots(_logSegNo)) >> + InvalidateObsoleteReplicationSlots(_logSegNo, &invalidated, InvalidOid, NULL); >> + if (invalidated) >> { >> /* >> * Some slots have been invalidated; recalculate the old-segment > > I don't really understand why you changed InvalidateObsoleteReplicationSlots > to return void instead of bool, and then added an output boolean argument via > a pointer? > > I gave a second thought and it looks like I over complicated that part. I removed the pointer parameter in V53 attached (and it now returns bool as before). > >> @@ -7964,6 +7968,22 @@ xlog_redo(XLogReaderState *record) >> /* Update our copy of the parameters in pg_control */ >> memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); >> >> + /* >> + * Invalidate logical slots if we are in hot standby and the primary does not >> + * have a WAL level sufficient for logical decoding. No need to search >> + * for potentially conflicting logically slots if standby is running >> + * with wal_level lower than logical, because in that case, we would >> + * have either disallowed creation of logical slots or invalidated existing >> + * ones. >> + */ >> + if (InRecovery && InHotStandby && >> + xlrec.wal_level < WAL_LEVEL_LOGICAL && >> + wal_level >= WAL_LEVEL_LOGICAL) >> + { >> + TransactionId ConflictHorizon = InvalidTransactionId; >> + InvalidateObsoleteReplicationSlots(InvalidXLogRecPtr, NULL, InvalidOid, &ConflictHorizon); >> + } >> + > > Are there races around changing wal_level? > Humm, not that I can think of right now. Do you have one/some in mind? > >> @@ -855,8 +855,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) >> SpinLockAcquire(&s->mutex); >> effective_xmin = s->effective_xmin; >> effective_catalog_xmin = s->effective_catalog_xmin; >> - invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) && >> - XLogRecPtrIsInvalid(s->data.restart_lsn)); >> + invalidated = ((!XLogRecPtrIsInvalid(s->data.invalidated_at) && >> + XLogRecPtrIsInvalid(s->data.restart_lsn)) >> + || (!TransactionIdIsValid(s->data.xmin) && >> + !TransactionIdIsValid(s->data.catalog_xmin))); >> SpinLockRelease(&s->mutex); >> >> /* invalidated slots need not apply */ > > I still would like a wrapper function to determine whether a slot has been > invalidated. This This is too complicated to be repeated in other places. > > Agree, so adding ObsoleteSlotIsInvalid() and SlotIsInvalid() in V53 attached. ObsoleteSlotIsInvalid() could also be done in a dedicated patch outside this patch series, though. >> @@ -1224,20 +1226,21 @@ ReplicationSlotReserveWal(void) >> } >> >> /* >> - * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot >> - * and mark it invalid, if necessary and possible. >> + * Helper for InvalidateObsoleteReplicationSlots >> + * >> + * Acquires the given slot and mark it invalid, if necessary and possible. >> * >> * Returns whether ReplicationSlotControlLock was released in the interim (and >> * in that case we're not holding the lock at return, otherwise we are). >> * >> - * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.) >> + * Sets *invalidated true if an obsolete slot was invalidated. (Untouched otherwise.) >> * >> * This is inherently racy, because we release the LWLock >> * for syscalls, so caller must restart if we return true. >> */ >> static bool >> -InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> - bool *invalidated) >> +InvalidatePossiblyObsoleteOrConflictingLogicalSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> + bool *invalidated, TransactionId *xid) > > This is too long a name. I'd probably just leave it at the old name. > > Done in V53 attached. > >> @@ -1261,18 +1267,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> * Check if the slot needs to be invalidated. If it needs to be >> * invalidated, and is not currently acquired, acquire it and mark it >> * as having been invalidated. We do this with the spinlock held to >> - * avoid race conditions -- for example the restart_lsn could move >> - * forward, or the slot could be dropped. >> + * avoid race conditions -- for example the restart_lsn (or the >> + * xmin(s) could) move forward or the slot could be dropped. >> */ >> SpinLockAcquire(&s->mutex); >> >> restart_lsn = s->data.restart_lsn; >> + slot_xmin = s->data.xmin; >> + slot_catalog_xmin = s->data.catalog_xmin; >> + >> + /* slot has been invalidated (logical decoding conflict case) */ >> + if ((xid && >> + ((LogicalReplicationSlotIsInvalid(s)) >> + || >> > > Uh, huh? > > That's very odd formatting. > >> /* >> - * If the slot is already invalid or is fresh enough, we don't need to >> - * do anything. >> + * We are not forcing for invalidation because the xid is valid and >> + * this is a non conflicting slot. >> */ >> - if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) >> + (TransactionIdIsValid(*xid) && !( >> + (TransactionIdIsValid(slot_xmin) && TransactionIdPrecedesOrEquals(slot_xmin,*xid)) >> + || >> + (TransactionIdIsValid(slot_catalog_xmin) && TransactionIdPrecedesOrEquals(slot_catalog_xmin,*xid)) >> + )) >> + )) >> + || >> + /* slot has been invalidated (obsolete LSN case) */ >> + (!xid && (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN))) >> { >> SpinLockRelease(&s->mutex); >> if (released_lock) > > > This needs some cleanup. Added a new macro LogicalReplicationSlotXidsConflict() and reformatted a bit. Also ran pgindent on it, hope it's cleaner now. > > >> @@ -1292,9 +1313,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> { >> MyReplicationSlot = s; >> s->active_pid = MyProcPid; >> - s->data.invalidated_at = restart_lsn; >> - s->data.restart_lsn = InvalidXLogRecPtr; >> - >> + if (xid) >> + { >> + s->data.xmin = InvalidTransactionId; >> + s->data.catalog_xmin = InvalidTransactionId; >> + } >> + else >> + { >> + s->data.invalidated_at = restart_lsn; >> + s->data.restart_lsn = InvalidXLogRecPtr; >> + } >> /* Let caller know */ >> *invalidated = true; >> } >> @@ -1327,15 +1355,39 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> */ >> if (last_signaled_pid != active_pid) >> { >> - ereport(LOG, >> - errmsg("terminating process %d to release replication slot \"%s\"", >> - active_pid, NameStr(slotname)), >> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> - LSN_FORMAT_ARGS(restart_lsn), >> - (unsigned long long) (oldestLSN - restart_lsn)), >> - errhint("You might need to increase max_slot_wal_keep_size.")); >> + if (xid) >> + { >> + if (TransactionIdIsValid(*xid)) >> + { >> + ereport(LOG, >> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery", >> + active_pid, NameStr(slotname)), >> + errdetail("The slot conflicted with xid horizon %u.", >> + *xid)); >> + } >> + else >> + { >> + ereport(LOG, >> + errmsg("terminating process %d because replication slot \"%s\" conflicts with recovery", >> + active_pid, NameStr(slotname)), >> + errdetail("Logical decoding on standby requires wal_level to be at least logical on theprimary server")); >> + } >> + >> + (void) SendProcSignal(active_pid, PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT, InvalidBackendId); >> + } >> + else >> + { >> + ereport(LOG, >> + errmsg("terminating process %d to release replication slot \"%s\"", >> + active_pid, NameStr(slotname)), >> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> + LSN_FORMAT_ARGS(restart_lsn), >> + (unsigned long long) (oldestLSN - restart_lsn)), >> + errhint("You might need to increase max_slot_wal_keep_size.")); >> + >> + (void) kill(active_pid, SIGTERM); > > I think it ought be possible to deduplicate this a fair bit. For one, two of > the errmsg()s above are identical. But I think this could be consolidated > further, e.g. by using the same message style for the three cases, and passing > in a separately translated reason for the termination? > deduplication done in V53 so that there is a single ereport() call. I'm not sure the translation is fine the way I did it, please advice if that's not right. > >> + } >> >> - (void) kill(active_pid, SIGTERM); >> last_signaled_pid = active_pid; >> } >> >> @@ -1369,13 +1421,33 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, >> ReplicationSlotSave(); >> ReplicationSlotRelease(); >> >> - ereport(LOG, >> - errmsg("invalidating obsolete replication slot \"%s\"", >> - NameStr(slotname)), >> - errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> - LSN_FORMAT_ARGS(restart_lsn), >> - (unsigned long long) (oldestLSN - restart_lsn)), >> - errhint("You might need to increase max_slot_wal_keep_size.")); >> + if (xid) >> + { >> + pgstat_drop_replslot(s); > > Why is this done here now? > > Oops, moved above the if() in V53. >> + if (TransactionIdIsValid(*xid)) >> + { >> + ereport(LOG, >> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)), >> + errdetail("The slot conflicted with xid horizon %u.", *xid)); >> + } >> + else >> + { >> + ereport(LOG, >> + errmsg("invalidating slot \"%s\" because it conflicts with recovery", NameStr(slotname)), >> + errdetail("Logical decoding on standby requires wal_level to be at least logical on the primaryserver")); >> + } >> + } >> + else >> + { >> + ereport(LOG, >> + errmsg("invalidating obsolete replication slot \"%s\"", >> + NameStr(slotname)), >> + errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", >> + LSN_FORMAT_ARGS(restart_lsn), >> + (unsigned long long) (oldestLSN - restart_lsn)), >> + errhint("You might need to increase max_slot_wal_keep_size.")); >> + } >> > > I don't like all these repeated elogs... deduplication done in V53 so that there is a single ereport() call. I'm not sure the translation is fine the way I did it, please advice if that's not right. > > > >> @@ -3057,6 +3060,27 @@ RecoveryConflictInterrupt(ProcSignalReason reason) >> case PROCSIG_RECOVERY_CONFLICT_LOCK: >> case PROCSIG_RECOVERY_CONFLICT_TABLESPACE: >> case PROCSIG_RECOVERY_CONFLICT_SNAPSHOT: >> + case PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT: >> + >> + /* >> + * For conflicts that require a logical slot to be >> + * invalidated, the requirement is for the signal receiver to >> + * release the slot, so that it could be invalidated by the >> + * signal sender. So for normal backends, the transaction >> + * should be aborted, just like for other recovery conflicts. >> + * But if it's walsender on standby, we don't want to go >> + * through the following IsTransactionOrTransactionBlock() >> + * check, so break here. >> + */ >> + if (am_cascading_walsender && >> + reason == PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT && >> + MyReplicationSlot && SlotIsLogical(MyReplicationSlot)) >> + { >> + RecoveryConflictPending = true; >> + QueryCancelPending = true; >> + InterruptPending = true; >> + break; >> + } >> >> /* >> * If we aren't in a transaction any longer then ignore. > > I can't see any reason for this to be mixed into the same case "body" as LOCK > etc? > Oh right, nice catch. I don't know how it ended up done that way. Fixed in V53. > >> diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c >> index 38c6f18886..290d4b45f4 100644 >> --- a/src/backend/replication/slot.c >> +++ b/src/backend/replication/slot.c >> @@ -51,6 +51,7 @@ >> #include "storage/proc.h" >> #include "storage/procarray.h" >> #include "utils/builtins.h" >> +#include "access/xlogrecovery.h" > > Add new includes in the "alphabetically" right place... Fixed in 0003 in V53 and the other places (aka other sub-patches) where it was needed. Regards, -- Bertrand Drouvot PostgreSQL Contributors Team RDS Open Source Databases Amazon Web Services: https://aws.amazon.com
Attachment
- v53-0006-Doc-changes-describing-details-about-logical-dec.patch
- v53-0005-New-TAP-test-for-logical-decoding-on-standby.patch
- v53-0004-Fixing-Walsender-corner-case-with-logical-decodi.patch
- v53-0003-Allow-logical-decoding-on-standby.patch
- v53-0002-Handle-logical-slot-conflicts-on-standby.patch
- v53-0001-Add-info-in-WAL-records-in-preparation-for-logic.patch
pgsql-hackers by date: