From cfb72d3e2e1f7a26008a52f407a3369479b87216 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 24 Oct 2025 13:34:17 -0700 Subject: [PATCH v22 2/2] Enable and disable logical decoding also when slot invalidation. --- doc/src/sgml/logicaldecoding.sgml | 14 +++--- src/backend/replication/logical/decode.c | 2 +- src/backend/replication/logical/logicalctl.c | 26 +++++----- src/backend/replication/slot.c | 49 +++++++++++++------ .../recovery/t/049_effective_wal_level.pl | 49 +++++++++++++++++-- 5 files changed, 99 insertions(+), 41 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index ff4d0e365e3..b0377523415 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -270,7 +270,7 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU When is set to replica - and at least one logical replication slot exists on the system. + and at least one valid logical replication slot exists on the system. @@ -285,17 +285,17 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU logical replication slot. This activation process involves several steps and requires synchronization among processes, ensuring system-wide consistency. Conversely, when the last logical replication slot is dropped - from a system with wal_level set to replica, - logical decoding is automatically disabled. Note that the deactivation of - logical decoding might take some time as it is performed asynchronously - by the checkpointer process. + from a system or invalidated with wal_level set to + replica, logical decoding is automatically disabled. + Note that the deactivation of logical decoding might take some time as it + is performed asynchronously by the checkpointer process. When wal_level is set to replica, - dropping the last logical slot disables logical decoding on the primary, - resulting in slots on standbys being invalidated. + dropping or invalidating the last logical slot disables logical decoding + on the primary, resulting in slots on standbys being invalidated. diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 85c502555a5..ab46a27a2b5 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -153,7 +153,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * Even if wal_level on the primary got decreased to 'replica', as - * long as there is at least one logical slot, the logical + * long as there is at least one valid logical slot, the logical * decoding remains enabled. So we don't check the logical * decoding availability here but do so in * XLOG_LOGICAL_DECODING_STATUS_CHANGE case. It covers the case diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c index 2d0c58a09e9..e7fe2ed46a0 100644 --- a/src/backend/replication/logical/logicalctl.c +++ b/src/backend/replication/logical/logicalctl.c @@ -4,7 +4,7 @@ * * This module enables dynamic control of logical decoding availability. * Logical decoding becomes active under two conditions: when the wal_level - * parameter is set to 'logical', or when at least one logical replication + * parameter is set to 'logical', or when at least one valid logical replication * slot exists with wal_level set to 'replica'. The system disables logical * decoding when neither condition is met. Therefore, the dynamic control * of logical decoding availability is required only when wal_level is set @@ -30,13 +30,13 @@ * exit without releasing temporary slots explicitly. This lazy approach has * a drawback: it may take longer to change the effective_wal_level and disable * logical decoding, especially when the checkpointer is busy with other tasks. - * However, since dropping the last slot should not happen frequently, we chose - * this approach in all deactivation cases for simpler code implementation, - * even though the lazy approach is required only in error cases or at process - * exit time in principle. In the future, we could address this limitation - * either by using a dedicated worker instead of the checkpointer, or by - * implementing synchronous waiting during slot drops if workloads are significantly - * affected by the lazy deactivation of logical decoding. + * However, since dropping or invalidating the last slot should not happen + * frequently, we chose this approach in all deactivation cases for simpler code + * implementation, even though the lazy approach is required only in error cases + * or at process exit time in principle. In the future, we could address this + * limitation either by using a dedicated worker instead of the checkpointer, or + * by implementing synchronous waiting during slot drops if workloads are + * significantly affected by the lazy deactivation of logical decoding. * * Standby servers inherit the logical decoding and logical WAL writing status * from the primary server. Unlike normal activation and deactivation, these @@ -358,9 +358,9 @@ retry: /* * When attempting to disable logical decoding, if there is at least one - * logical slot, we cannot disable it. We need to check it here since - * slots could be created or dropped while waiting for the status change - * below. + * valid logical slot, we cannot disable it. We need to check it here + * since slots could be created or dropped while waiting for the status + * change below. */ if (!new_status && CheckLogicalSlotExists()) { @@ -490,8 +490,8 @@ EnsureLogicalDecodingEnabled(void) /* * Initiate a request for disabling logical decoding. * - * This function expects to be called after dropping the possibly-last - * logical replication slot as it doesn't check the logical slot presence. + * This function expects to be called after dropping or invalidating the + * possibly-last logical replication slot as it doesn't check the logical slot presence. */ void RequestDisableLogicalDecoding(void) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 961618f812d..1d593e69738 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -856,12 +856,12 @@ ReplicationSlotCleanup(bool synced_only) { int i; bool dropped_logical = false; - int nlogicalslots; + int n_valid_logicalslots; Assert(MyReplicationSlot == NULL); restart: - nlogicalslots = 0; + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -870,8 +870,8 @@ restart: if (!s->in_use) continue; - if (SlotIsLogical(s)) - nlogicalslots++; + if (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; SpinLockAcquire(&s->mutex); if ((s->active_pid == MyProcPid && @@ -895,7 +895,7 @@ restart: LWLockRelease(ReplicationSlotControlLock); - if (dropped_logical && nlogicalslots == 0) + if (dropped_logical && n_valid_logicalslots == 0) RequestDisableLogicalDecoding(); } @@ -1462,14 +1462,14 @@ void ReplicationSlotsDropDBSlots(Oid dboid) { int i; - int nlogicalslots; + int n_valid_logicalslots; bool dropped = false; if (max_replication_slots <= 0) return; restart: - nlogicalslots = 0; + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -1487,13 +1487,14 @@ restart: if (!SlotIsLogical(s)) continue; - nlogicalslots++; + if (s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; /* not our database, skip */ if (s->data.database != dboid) continue; - /* NB: intentionally including invalidated slots */ + /* NB: intentionally including invalidated slots to drop */ /* acquire slot, so ReplicationSlotDropAcquired can be reused */ SpinLockAcquire(&s->mutex); @@ -1550,12 +1551,12 @@ restart: } LWLockRelease(ReplicationSlotControlLock); - if (dropped && nlogicalslots == 0) + if (dropped && n_valid_logicalslots == 0) RequestDisableLogicalDecoding(); } /* - * Returns true if there is at least in-use logical replication slot. + * Returns true if there is at least in-use valid logical replication slot. */ bool CheckLogicalSlotExists(void) @@ -1576,7 +1577,8 @@ CheckLogicalSlotExists(void) if (!s->in_use) continue; - /* NB: counting invalidated slots */ + if (s->data.invalidated != RS_INVAL_NONE) + continue; if (SlotIsLogical(s)) { @@ -2156,6 +2158,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, { XLogRecPtr oldestLSN; bool invalidated = false; + int n_valid_logicalslots; Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); @@ -2167,6 +2170,7 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); restart: + n_valid_logicalslots = 0; LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (int i = 0; i < max_replication_slots; i++) { @@ -2175,9 +2179,15 @@ restart: if (!s->in_use) continue; - /* Prevent invalidation of logical slots during binary upgrade */ - if (SlotIsLogical(s) && IsBinaryUpgrade) - continue; + if (SlotIsLogical(s)) + { + if (s->data.invalidated == RS_INVAL_NONE) + n_valid_logicalslots++; + + /* Prevent invalidation of logical slots during binary upgrade */ + if (IsBinaryUpgrade) + continue; + } if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, snapshotConflictHorizon, @@ -2196,6 +2206,15 @@ restart: { ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); + + /* + * Request the checkpointer process to disable logical decoding if no + * valid logical slots exist. While the checkpointer can call this + * function during a checkpoint, it doesn't perform the actual + * deactivation here to complete the checkpointing first. + */ + if (n_valid_logicalslots == 0) + RequestDisableLogicalDecoding(); } return invalidated; diff --git a/src/test/recovery/t/049_effective_wal_level.pl b/src/test/recovery/t/049_effective_wal_level.pl index 7d1db792faf..b5b1ea0165e 100644 --- a/src/test/recovery/t/049_effective_wal_level.pl +++ b/src/test/recovery/t/049_effective_wal_level.pl @@ -108,8 +108,47 @@ like( # Revert the modified settings. $primary->adjust_conf('postgresql.conf', 'wal_level', 'replica'); $primary->adjust_conf('postgresql.conf', 'max_wal_senders', '10'); + +# Add other settings to test if we disable logical decoding when invalidating the last +# logical slot. +$primary->append_conf( + 'postgresql.conf', + qq[ +min_wal_size = 32MB +max_wal_size = 32MB +max_slot_wal_keep_size = 16MB +]); $primary->start; +# Advance WAL and check if the slot gets invalidated. +$primary->advance_wal(2); +$primary->safe_psql('postgres', qq[CHECKPOINT]); +is( $primary->safe_psql( + 'postgres', + qq[ +select invalidation_reason = 'wal_removed' from pg_replication_slots where slot_name = 'test_slot'; + ]), + 't', + 'test_slot gets invalidated due to wal_removed'); + +# Check if logical decoding is disabled after invalidating the last logical slot. +wait_for_logical_decoding_disabled($primary); +test_wal_level($primary, "replica|replica", + "effective_wal_level got decreased to 'replica' after invalidating the last logical slot" +); + +# Revert the modified settings, and restart the server. +$primary->adjust_conf('postgresql.conf', 'max_slot_wal_keep_size', undef); +$primary->adjust_conf('postgresql.conf', 'min_wal_size', undef); +$primary->adjust_conf('postgresql.conf', 'max_wal_size', undef); +$primary->restart; + +# Recreate the logical slot to enable logical decoding again. +$primary->safe_psql('postgres', + qq[select pg_drop_replication_slot('test_slot')]); +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + # Take backup during the effective_wal_level being 'logical'. But note that # replication slots are not included in the backup. $primary->backup('my_backup'); @@ -275,11 +314,11 @@ test_wal_level($standby4, "replica|logical", "effective_wal_level remains 'logical' on standby even after setting wal_level to 'replica' on primary" ); -# Promote the standby4 and check if effective_wal_level remains 'logical' even -# after the promotion since it has an invalidated logical slot. +# Promote the standby4 and check if effective_wal_level is now 'logical' after +# the promotion since there is no valid logical slot. $standby4->promote; -test_wal_level($standby4, "replica|logical", - "effective_wal_level remains 'logical' even after promotion as it has one invalidated slot" +test_wal_level($standby4, "replica|replica", + "effective_wal_level got decreased to 'replica' as there is no valid logical slot" ); # Drop the invalidated slot, decreasing effective_wal_level to 'replica'. @@ -287,7 +326,7 @@ $standby4->safe_psql('postgres', qq[select pg_drop_replication_slot('standby4_slot')]); wait_for_logical_decoding_disabled($standby4); test_wal_level($standby4, "replica|replica", - "effective_wal_level got decreased to 'replica' after dropping the last invalidated slot" + "effective_wal_level doesn't change after dropping the last invalidated slot" ); $standby4->stop; -- 2.47.3