From 412699853c450f10627c4b991e11eee4f2e42a86 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 3 Jun 2025 12:52:22 -0700 Subject: [PATCH v1 2/2] Enable logical decoding dynamically based on logical slot presence. Author: Reviewed-by: Discussion: https://postgr.es/m/ --- src/backend/access/transam/xlog.c | 121 +++++- src/backend/commands/publicationcmds.c | 7 +- src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/decode.c | 48 ++- src/backend/replication/logical/logical.c | 34 +- src/backend/replication/logical/logicalctl.c | 348 ++++++++++++++++++ src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 12 +- src/backend/replication/slot.c | 24 +- src/backend/replication/slotfuncs.c | 3 + src/backend/replication/walsender.c | 2 + src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procsignal.c | 4 + src/backend/storage/ipc/standby.c | 7 +- .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/init/postinit.c | 4 + src/backend/utils/misc/guc_tables.c | 11 + src/include/access/xlog.h | 5 +- src/include/catalog/pg_control.h | 3 + src/include/replication/logicalctl.h | 52 +++ src/include/replication/slot.h | 3 + src/include/storage/lwlocklist.h | 1 + src/include/storage/procsignal.h | 2 + src/include/utils/guc_hooks.h | 1 + .../t/035_standby_logical_decoding.pl | 2 +- .../recovery/t/045_effective_wal_level.pl | 98 +++++ src/test/subscription/t/001_rep_changes.pl | 2 +- 27 files changed, 727 insertions(+), 74 deletions(-) create mode 100644 src/backend/replication/logical/logicalctl.c create mode 100644 src/include/replication/logicalctl.h create mode 100644 src/test/recovery/t/045_effective_wal_level.pl diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1914859b2ee..19873dbb799 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -78,6 +78,7 @@ #include "postmaster/walsummarizer.h" #include "postmaster/walwriter.h" #include "replication/origin.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/walreceiver.h" @@ -141,6 +142,7 @@ bool XLOG_DEBUG = false; #endif int wal_segment_size = DEFAULT_XLOG_SEG_SIZE; +int effective_wal_level = WAL_LEVEL_REPLICA; /* * Number of WAL insertion locks to use. A higher value allows more insertions @@ -5011,6 +5013,50 @@ show_in_hot_standby(void) return RecoveryInProgress() ? "on" : "off"; } +/* + * GUC show_hook for effective_wal_level + */ +const char * +show_effective_wal_level(void) +{ + char *str; + + switch (wal_level) + { + case WAL_LEVEL_MINIMAL: + str = "minimal"; + break; + case WAL_LEVEL_REPLICA: + { + bool in_transition; + + /* check if the logical decoding status is being changed */ + LWLockAcquire(LogicalDecodingControlLock, LW_SHARED); + in_transition = LogicalDecodingCtl->transition_in_progress; + LWLockRelease(LogicalDecodingControlLock); + + /* + * With wal_level='replica', XLogLogicalInfo indicates the + * actual WAL level unless we're in the status change. + */ + if (XLogLogicalInfo && !in_transition) + str = "logical"; + else + str = "replica"; + + break; + } + case WAL_LEVEL_LOGICAL: + str = "logical"; + break; + default: + str = "?"; + break; + } + + return str; +} + /* * Read the control file, set respective GUCs. * @@ -5779,6 +5825,12 @@ StartupXLOG(void) */ RelationCacheInitFileRemove(); + /* + * Startup the logical decoding status, needs to be setup before + * initializing replication slots as it requires logical decoding status. + */ + StartupLogicalDecodingStatus(ControlFile->logicalDecodingEnabled); + /* * Initialize replication slots, before there's a chance to remove * required resources. @@ -6314,6 +6366,8 @@ StartupXLOG(void) Insert->fullPageWrites = lastFullPageWrites; UpdateFullPageWrites(); + UpdateLogicalDecodingStatusEndOfRecovery(); + /* * Emit checkpoint or end-of-recovery record in XLOG, if required. */ @@ -7453,6 +7507,8 @@ CreateCheckPoint(int flags) */ ControlFile->unloggedLSN = pg_atomic_read_membarrier_u64(&XLogCtl->unloggedLSN); + ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled(); + UpdateControlFile(); LWLockRelease(ControlFileLock); @@ -8690,19 +8746,26 @@ xlog_redo(XLogReaderState *record) memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change)); /* - * Invalidate logical slots if we are in hot standby and the primary - * does not have a WAL level sufficient for logical decoding. No need - * to search for potentially conflicting logically slots if standby is - * running with wal_level lower than logical, because in that case, we - * would have either disallowed creation of logical slots or - * invalidated existing ones. + * Change the logical decoding status upon wal_level change on the + * primary server. */ - if (InRecovery && InHotStandby && - xlrec.wal_level < WAL_LEVEL_LOGICAL && - wal_level >= WAL_LEVEL_LOGICAL) - InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, - 0, InvalidOid, - InvalidTransactionId); + if (xlrec.wal_level == WAL_LEVEL_LOGICAL) + { + /* + * If the primary increase WAL level to 'logical', we can + * unconditionally enable the logical decoding. + */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + } + else if (xlrec.wal_level == WAL_LEVEL_REPLICA && + pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) == 0) + { + /* + * Disable the logical decoding if there is no in-use logical slot + * on the standby. + */ + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + } LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); ControlFile->MaxConnections = xlrec.MaxConnections; @@ -8712,6 +8775,7 @@ xlog_redo(XLogReaderState *record) ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact; ControlFile->wal_level = xlrec.wal_level; ControlFile->wal_log_hints = xlrec.wal_log_hints; + ControlFile->logicalDecodingEnabled = IsLogicalDecodingEnabled(); /* * Update minRecoveryPoint to ensure that if recovery is aborted, we @@ -8771,6 +8835,39 @@ xlog_redo(XLogReaderState *record) { /* nothing to do here, just for informational purposes */ } + else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE) + { + bool logical_decoding; + + memcpy(&logical_decoding, XLogRecGetData(record), sizeof(bool)); + + if (logical_decoding) + { + /* Enable the logical decoding */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + } + else + { + /* Disable the logical decoding */ + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + + /* + * Invalidate logical slots if we are in hot standby and the + * primary disabled the logical decoding. + */ + if (InRecovery && InHotStandby) + { + InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL, + 0, InvalidOid, + InvalidTransactionId); + } + } + + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); + ControlFile->logicalDecodingEnabled = logical_decoding; + UpdateControlFile(); + LWLockRelease(ControlFileLock); + } } /* diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 0b23d94c38e..75b2e2fdc87 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -38,6 +38,7 @@ #include "parser/parse_clause.h" #include "parser/parse_collate.h" #include "parser/parse_relation.h" +#include "replication/logicalctl.h" #include "rewrite/rewriteHandler.h" #include "storage/lmgr.h" #include "utils/acl.h" @@ -960,11 +961,11 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); - if (wal_level != WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) ereport(WARNING, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("\"wal_level\" is insufficient to publish logical changes"), - errhint("Set \"wal_level\" to \"logical\" before creating subscriptions."))); + errmsg("logical decoding needs to be enabled to publish logical changes"), + errhint("Set \"wal_level\" to \"logical\" or create a logical replication slot with \"replica\" \"wal_level\" before creating subscriptions."))); return myself; } diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 1e08bbbd4eb..50ec127e9ef 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -20,6 +20,7 @@ OBJS = \ decode.o \ launcher.o \ logical.o \ + logicalctl.o \ logicalfuncs.o \ message.o \ origin.o \ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..269be167be5 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -150,21 +150,30 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ break; case XLOG_PARAMETER_CHANGE: + + /* + * XXX: even if wal_level on the primary got decreased to + * 'replica' it doesn't necessarily mean to disable the logical + * decoding as long as we have at least one logical slot. So we + * can ignore wal_level change here. + */ + break; + case XLOG_NOOP: + case XLOG_NEXTOID: + case XLOG_SWITCH: + case XLOG_BACKUP_END: + case XLOG_RESTORE_POINT: + case XLOG_FPW_CHANGE: + case XLOG_FPI_FOR_HINT: + case XLOG_FPI: + case XLOG_OVERWRITE_CONTRECORD: + case XLOG_CHECKPOINT_REDO: + break; + case XLOG_LOGICAL_DECODING_STATUS_CHANGE: { - xl_parameter_change *xlrec = - (xl_parameter_change *) XLogRecGetData(buf->record); + bool *logical_decoding = (bool *) XLogRecGetData(buf->record); - /* - * If wal_level on the primary is reduced to less than - * logical, we want to prevent existing logical slots from - * being used. Existing logical slots on the standby get - * invalidated when this WAL record is replayed; and further, - * slot creation fails when wal_level is not sufficient; but - * all these operations are not synchronized, so a logical - * slot may creep in while the wal_level is being reduced. - * Hence this extra check. - */ - if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + if (!(*logical_decoding)) { /* * This can occur only on a standby, as a primary would @@ -174,20 +183,9 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Assert(RecoveryInProgress()); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); + errmsg("logical decoding must be enabled on the primary"))); } - break; } - case XLOG_NOOP: - case XLOG_NEXTOID: - case XLOG_SWITCH: - case XLOG_BACKUP_END: - case XLOG_RESTORE_POINT: - case XLOG_FPW_CHANGE: - case XLOG_FPI_FOR_HINT: - case XLOG_FPI: - case XLOG_OVERWRITE_CONTRECORD: - case XLOG_CHECKPOINT_REDO: break; default: elog(ERROR, "unexpected RM_XLOG_ID record type: %u", info); diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2a34d2bf846..45b159e5fa0 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -35,6 +35,7 @@ #include "pgstat.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/reorderbuffer.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" @@ -115,31 +116,24 @@ CheckLogicalDecodingRequirements(void) * needs the same check. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding requires \"wal_level\" >= \"logical\""))); + if (!IsLogicalDecodingEnabled()) + { + if (RecoveryInProgress()) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding needs to be enabled on the primary"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot on the primary ."))); + else + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding is not enabled"), + errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot."))); + } if (MyDatabaseId == InvalidOid) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - - if (RecoveryInProgress()) - { - /* - * This check may have race conditions, but whenever - * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we - * verify that there are no existing logical replication slots. And to - * avoid races around creating a new slot, - * CheckLogicalDecodingRequirements() is called once before creating - * the slot, and once when logical decoding is initially starting up. - */ - if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"))); - } } /* diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c new file mode 100644 index 00000000000..8d1c42ee6f0 --- /dev/null +++ b/src/backend/replication/logical/logicalctl.c @@ -0,0 +1,348 @@ +/*------------------------------------------------------------------------- + * logicalctl.c + * Functionality to control logical decoding status. + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/replication/logical/logicalctl.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/xlog_internal.h" +#include "access/xlogutils.h" +#include "access/xloginsert.h" +#include "catalog/pg_control.h" +#include "port/atomics.h" +#include "miscadmin.h" +#include "storage/lwlock.h" +#include "storage/procarray.h" +#include "storage/procsignal.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/shmem.h" +#include "storage/standby.h" +#include "replication/logicalctl.h" +#include "replication/slot.h" +#include "utils/guc.h" +#include "utils/wait_event_types.h" + +LogicalDecodingCtlData *LogicalDecodingCtl = NULL; + +/* + * Process local cache of LogicalDecodingCtl->xlog_logical_info. This is + * initialized at process startup time, and could be updated when absorbing + * process barrier in ProcessBarrierUpdateXLogLogicalInfo(). + */ +bool XLogLogicalInfo = false; + +Size +LogicalDecodingCtlShmemSize(void) +{ + return sizeof(LogicalDecodingCtlData); +} + +void +LogicalDecodingCtlShmemInit(void) +{ + bool found; + + LogicalDecodingCtl = ShmemInitStruct("Logical information control", + LogicalDecodingCtlShmemSize(), + &found); + + if (!found) + { + LogicalDecodingCtl->transition_in_progress = false; + ConditionVariableInit(&LogicalDecodingCtl->transition_cv); + pg_atomic_init_flag(&LogicalDecodingCtl->xlog_logical_info); + pg_atomic_init_flag(&LogicalDecodingCtl->logical_decoding_enabled); + } +} + +/* + * Initialize the logical decoding status on shmem at server startup. This + * must be called ONCE during postmaster or standalone-backend startup, + * before initializing replication slots. + */ +void +StartupLogicalDecodingStatus(bool status_in_control_file) +{ + if (wal_level == WAL_LEVEL_MINIMAL) + return; + + /* + * We enable both logical info WAL logging and logical decoding if we're + * using 'logical' WAL level or the control file tells so. + */ + if (wal_level >= WAL_LEVEL_LOGICAL || status_in_control_file) + { + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + } + + /* + * ... but in standby mode, if wal_level is 'logical' but the primary + * disables logical decoding, we need to disable it also on the standby. + * + * We don't need to disable logical info WAL logging because we can use + * the 'logical' WAL level even after the promotion. + */ + if (StandbyMode && wal_level == WAL_LEVEL_LOGICAL && !status_in_control_file) + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); +} + +/* + * Update the XLogLogicalInfo cache. + */ +static void +update_xlog_logical_info(void) +{ + XLogLogicalInfo = !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->xlog_logical_info)); +} + +/* + * Initialize XLogLogicalInfo backend-private cache. + */ +void +InitializeProcessXLogLogicalInfo(void) +{ + update_xlog_logical_info(); +} + +bool +ProcessBarrierUpdateXLogLogicalInfo(void) +{ + update_xlog_logical_info(); + return true; +} + +/* + * Is the logical decoding enabled on the system? + */ +bool +IsLogicalDecodingEnabled(void) +{ + return !pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); +} + +/* + * A PG_ENSURE_ERROR_CLEANUP callback for making the logical decoding enabled. + */ +static void +abort_enabling_logical_decoding(int code, Datum arg) +{ + Assert(LogicalDecodingCtl->transition_in_progress); + + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + /* XXX really no need to wait here? */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Let waiters know the WAL level change completed */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); +} + +/* + * Enable the logical decoding if disabled. + */ +void +EnsureLogicalDecodingEnabled(void) +{ + if (IsLogicalDecodingEnabled()) + return; + + /* Standby cannot enable the logical decoding */ + if (RecoveryInProgress()) + return; + +retry: + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + + if (LogicalDecodingCtl->transition_in_progress) + { + LWLockRelease(LogicalDecodingControlLock); + + /* Wait for someone to complete the transition */ + ConditionVariableSleep(&LogicalDecodingCtl->transition_cv, + WAIT_EVENT_LOGICAL_DECODING_STATUS_CHANGE); + + goto retry; + } + + if (IsLogicalDecodingEnabled()) + { + LWLockRelease(LogicalDecodingControlLock); + return; + } + + LogicalDecodingCtl->transition_in_progress = true; + LWLockRelease(LogicalDecodingControlLock); + + PG_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0); + { + RunningTransactions running; + + /* + * Set logical info WAL logging on the shmem. All process starts after + * this point will include the information required by the logical + * decoding to WAL records. + */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + running = GetRunningTransactionData(); + LWLockRelease(ProcArrayLock); + LWLockRelease(XidGenLock); + + /* + * Order all running processes to reflect the xlog_logical_info + * update, and wait. + */ + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + + /* + * Wait for all running transactions to finish as some transaction + * might have started with the old state. + */ + for (int i = 0; i < running->xcnt; i++) + { + TransactionId xid = running->xids[i]; + + if (TransactionIdIsCurrentTransactionId(xid)) + continue; + + XactLockTableWait(xid, NULL, NULL, XLTW_None); + } + + /* + * Here, we can ensure that all running transactions are using the new + * xlog_logical_info value, writing logical information to WAL + * records. So now enable the logical decoding globally. + */ + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + } + PG_END_ENSURE_ERROR_CLEANUP(abort_enabling_logical_decoding, (Datum) 0); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); + + if (XLogStandbyInfoActive() && !RecoveryInProgress()) + { + bool logical_decoding = true; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + } +} + +/* + * Disable the logical decoding if enabled. + * + * XXX: This function could write a WAL record in order to tell the standbys + * know the logical decoding got disabled. However, we need to note that this + * function could be called during process exits (e.g., by ReplicationSlotCleanup() + * via before_shmem_exit callbacks), which looks something that we want to + * avoid. + */ +void +DisableLogicalDecodingIfNecessary(void) +{ + if (wal_level >= WAL_LEVEL_LOGICAL || !IsLogicalDecodingEnabled()) + return; + + if (pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) > 0) + return; + + if (XLogStandbyInfoActive() && !RecoveryInProgress()) + { + bool logical_decoding = false; + + XLogBeginInsert(); + XLogRegisterData(&logical_decoding, sizeof(bool)); + XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE); + } + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = true; + LWLockRelease(LogicalDecodingControlLock); + + pg_atomic_clear_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + /* + * XXX is it okay not to wait for the signal to be absorbed? + */ + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO); + + /* XXX need to wait for transaction finishes? */ + + LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE); + LogicalDecodingCtl->transition_in_progress = false; + LWLockRelease(LogicalDecodingControlLock); + + /* Let waiters know the work finished */ + ConditionVariableBroadcast(&LogicalDecodingCtl->transition_cv); +} + +/* + * Update the logical decoding status at end of the recovery. This function + * must be called ONCE before accepting writes. + */ +void +UpdateLogicalDecodingStatusEndOfRecovery(void) +{ + if (wal_level == WAL_LEVEL_MINIMAL) + return; + + if (wal_level == WAL_LEVEL_LOGICAL) + { + /* + * Ensure to enable the logical decoding as it might have been + * disabled based on the primary's status. + * + * xlog_logical_info must have been enabled since we've using the + * 'logical' WAL level from the server starts. + */ + Assert(!pg_atomic_unlocked_test_flag(&(LogicalDecodingCtl->xlog_logical_info))); + pg_atomic_test_set_flag(&(LogicalDecodingCtl->logical_decoding_enabled)); + } + else if (wal_level == WAL_LEVEL_REPLICA && IsLogicalDecodingEnabled()) + { + if (pg_atomic_read_u32(&ReplicationSlotCtl->n_inuse_logical_slots) >= 0) + { + /* + * if there are in-use logical slots already, we have enabled the + * logical decoding during recovery. So we need to enable only + * xlog_logical_info and let all processes to reflect it before + * the server accepting writes. + */ + Assert(IsLogicalDecodingEnabled()); + pg_atomic_test_set_flag(&(LogicalDecodingCtl->xlog_logical_info)); + + WaitForProcSignalBarrier( + EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO)); + } + else + { + /* + * This is a situation like where the primary enabled logical + * decoding but no slot has been created on the standby. So + * disable the logical decoding. + */ + pg_atomic_clear_flag(&(LogicalDecodingCtl->xlog_logical_info)); + } + } +} diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 6f19614c79d..19c7130b961 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -6,6 +6,7 @@ backend_sources += files( 'decode.c', 'launcher.c', 'logical.c', + 'logicalctl.c', 'logicalfuncs.c', 'message.c', 'origin.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 656e66e0ae0..98a7cf06476 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -57,6 +57,7 @@ #include "pgstat.h" #include "postmaster/interrupt.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/ipc.h" @@ -1058,15 +1059,16 @@ bool ValidateSlotSyncParams(int elevel) { /* - * Logical slot sync/creation requires wal_level >= logical. + * Logical slot sync/creation requires to the logical decoding to be + * enabled. * * Since altering the wal_level requires a server restart, so error out in * this case regardless of elevel provided by caller. */ - if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\"")); + if (!IsLogicalDecodingEnabled()) + ereport(elevel, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding is not enabled")); /* * A physical replication slot(primary_slot_name) is required on the diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..7019cdbd01e 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -219,6 +220,8 @@ ReplicationSlotsShmemInit(void) /* First time through, so initialize */ MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize()); + pg_atomic_init_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 0); + for (i = 0; i < max_replication_slots; i++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i]; @@ -458,7 +461,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) + { pgstat_create_replslot(slot); + pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); + } /* * Now that the slot has been marked as in_use and active, it's safe to @@ -805,6 +811,8 @@ restart: } LWLockRelease(ReplicationSlotControlLock); + + DisableLogicalDecodingIfNecessary(); } /* @@ -919,6 +927,7 @@ void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; + bool was_logical_slot = SlotIsLogical(slot); Assert(MyReplicationSlot != NULL); @@ -926,6 +935,9 @@ ReplicationSlotDropAcquired(void) MyReplicationSlot = NULL; ReplicationSlotDropPtr(slot); + + if (was_logical_slot) + DisableLogicalDecodingIfNecessary(); } /* @@ -1026,7 +1038,10 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) * another session. */ if (SlotIsLogical(slot)) + { pgstat_drop_replslot(slot); + pg_atomic_sub_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); + } /* * We release this at the very end, so that nobody starts trying to create @@ -2523,12 +2538,12 @@ RestoreSlotFromDisk(const char *name) */ if (cp.slotdata.database != InvalidOid) { - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"", + errmsg("logical replication slot \"%s\" exists, but logical decoding is not enabled", NameStr(cp.slotdata.name)), - errhint("Change \"wal_level\" to be \"logical\" or higher."))); + errhint("Change \"wal_level\" to be \"replica\" or higher."))); /* * In standby mode, the hot standby must be enabled. This check is @@ -2590,6 +2605,9 @@ RestoreSlotFromDisk(const char *name) ReplicationSlotSetInactiveSince(slot, now, false); restored = true; + + if (SlotIsLogical(slot)) + pg_atomic_add_fetch_u32(&ReplicationSlotCtl->n_inuse_logical_slots, 1); break; } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ccbc3732e95..d4f45be793d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -18,6 +18,7 @@ #include "access/xlogutils.h" #include "funcapi.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "utils/builtins.h" @@ -211,6 +212,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) CheckSlotPermissions(); + EnsureLogicalDecodingEnabled(); CheckLogicalDecodingRequirements(); create_logical_replication_slot(NameStr(*name), @@ -934,6 +936,7 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS) errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slots can only be synchronized to a standby server")); + EnsureLogicalDecodingEnabled(); ValidateSlotSyncParams(ERROR); /* Load the libpq-specific functions */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 85668ef807c..edb4297c5e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -71,6 +71,7 @@ #include "postmaster/interrupt.h" #include "replication/decode.h" #include "replication/logical.h" +#include "replication/logicalctl.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/snapbuild.h" @@ -1218,6 +1219,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(cmd->kind == REPLICATION_KIND_LOGICAL); + EnsureLogicalDecodingEnabled(); CheckLogicalDecodingRequirements(); /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2fa045e6b0f..f1ef837755c 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -31,6 +31,7 @@ #include "postmaster/bgworker_internals.h" #include "postmaster/bgwriter.h" #include "postmaster/walsummarizer.h" +#include "replication/logicalctl.h" #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" @@ -150,6 +151,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, InjectionPointShmemSize()); size = add_size(size, SlotSyncShmemSize()); size = add_size(size, AioShmemSize()); + size = add_size(size, LogicalDecodingCtlShmemSize()); /* include additional requested shmem from preload libraries */ size = add_size(size, total_addin_request); @@ -343,6 +345,7 @@ CreateOrAttachShmemStructs(void) WaitEventCustomShmemInit(); InjectionPointShmemInit(); AioShmemInit(); + LogicalDecodingCtlShmemInit(); } /* diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index a9bb540b55a..0c3788cb836 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -22,6 +22,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "port/pg_bitutils.h" +#include "replication/logicalctl.h" #include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/condition_variable.h" @@ -576,6 +577,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SMGRRELEASE: processed = ProcessBarrierSmgrRelease(); break; + case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO: + processed = ProcessBarrierUpdateXLogLogicalInfo(); + break; } /* diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c index 7fa8d9247e0..c34403c6e8c 100644 --- a/src/backend/storage/ipc/standby.c +++ b/src/backend/storage/ipc/standby.c @@ -24,6 +24,7 @@ #include "access/xlogutils.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "storage/bufmgr.h" #include "storage/proc.h" @@ -499,7 +500,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon, * seems OK, given that this kind of conflict should not normally be * reached, e.g. due to using a physical replication slot. */ - if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel) + if (IsLogicalDecodingEnabled() && isCatalogRel) InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid, snapshotConflictHorizon); } @@ -1325,13 +1326,13 @@ LogStandbySnapshot(void) * record. Fortunately this routine isn't executed frequently, and it's * only a shared lock. */ - if (wal_level < WAL_LEVEL_LOGICAL) + if (!IsLogicalDecodingEnabled()) LWLockRelease(ProcArrayLock); recptr = LogCurrentRunningXacts(running); /* Release lock if we kept it longer ... */ - if (wal_level >= WAL_LEVEL_LOGICAL) + if (IsLogicalDecodingEnabled()) LWLockRelease(ProcArrayLock); /* GetRunningTransactionData() acquired XidGenLock, we must release it */ diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 5d9e04d6823..43c05dfd24a 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -133,6 +133,7 @@ HASH_GROW_BUCKETS_ELECT "Waiting to elect a Parallel Hash participant to allocat HASH_GROW_BUCKETS_REALLOCATE "Waiting for an elected Parallel Hash participant to finish allocating more buckets." HASH_GROW_BUCKETS_REINSERT "Waiting for other Parallel Hash participants to finish inserting tuples into new buckets." LOGICAL_APPLY_SEND_DATA "Waiting for a logical replication leader apply process to send data to a parallel apply process." +LOGICAL_DECODING_STATUS_CHANGE "Waiting for the logical decoding status change." LOGICAL_PARALLEL_APPLY_STATE_CHANGE "Waiting for a logical replication parallel apply process to change state." LOGICAL_SYNC_DATA "Waiting for a logical replication remote server to send data for initial table synchronization." LOGICAL_SYNC_STATE_CHANGE "Waiting for a logical replication remote server to change state." @@ -352,6 +353,7 @@ DSMRegistry "Waiting to read or update the dynamic shared memory registry." InjectionPoint "Waiting to read or update information related to injection points." SerialControl "Waiting to read or update shared pg_serial state." AioWorkerSubmissionQueue "Waiting to access AIO worker submission queue." +LogicalDecodingControl "Waiting to access logical decoding status information." # # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE) diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index c86ceefda94..87bceae3406 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -40,6 +40,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/logicalctl.h" #include "replication/slot.h" #include "replication/slotsync.h" #include "replication/walsender.h" @@ -658,6 +659,9 @@ BaseInit(void) /* Initialize lock manager's local structs */ InitLockManagerAccess(); + /* Initialize logical info WAL logging state */ + InitializeProcessXLogLogicalInfo(); + /* * Initialize replication slots after pgstat. The exit hook might need to * drop ephemeral slots, which in turn triggers stats reporting. diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 2f8cbd86759..364f6124f23 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -5234,6 +5234,17 @@ struct config_enum ConfigureNamesEnum[] = NULL, NULL, NULL }, + { + {"effective_wal_level", PGC_INTERNAL, PRESET_OPTIONS, + gettext_noop("Show the effective WAL level."), + NULL, + GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE + }, + &effective_wal_level, + WAL_LEVEL_REPLICA, wal_level_options, + NULL, NULL, show_effective_wal_level + }, + { {"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM, gettext_noop("Selects the dynamic shared memory implementation used."), diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index d313099c027..08ec587ef77 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -94,6 +94,9 @@ typedef enum RecoveryState } RecoveryState; extern PGDLLIMPORT int wal_level; +extern PGDLLEXPORT int effective_wal_level; + +extern bool XLogLogicalInfo; /* Is WAL archiving enabled (always or only while server is running normally)? */ #define XLogArchivingActive() \ @@ -123,7 +126,7 @@ extern PGDLLIMPORT int wal_level; #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA) /* Do we need to WAL-log information required only for logical replication? */ -#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL) +#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo) #ifdef WAL_DEBUG extern PGDLLIMPORT bool XLOG_DEBUG; diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h index 63e834a6ce4..5e9b44c82f0 100644 --- a/src/include/catalog/pg_control.h +++ b/src/include/catalog/pg_control.h @@ -80,6 +80,7 @@ typedef struct CheckPoint /* 0xC0 is used in Postgres 9.5-11 */ #define XLOG_OVERWRITE_CONTRECORD 0xD0 #define XLOG_CHECKPOINT_REDO 0xE0 +#define XLOG_LOGICAL_DECODING_STATUS_CHANGE 0xF0 /* @@ -136,6 +137,8 @@ typedef struct ControlFileData XLogRecPtr unloggedLSN; /* current fake LSN value, for unlogged rels */ + bool logicalDecodingEnabled; + /* * These two values determine the minimum point we must recover up to * before starting up: diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h new file mode 100644 index 00000000000..d7d2fc0fde6 --- /dev/null +++ b/src/include/replication/logicalctl.h @@ -0,0 +1,52 @@ +/*------------------------------------------------------------------------- + * + * logicalctl.h + * Definitions for logical decoding status control facility. + * + * Portions Copyright (c) 2013-2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/include/replication/logicalctl.h + * + *------------------------------------------------------------------------- + */ +#ifndef XLOGLEVEL_H +#define XLOGLEVEL_H + +#include "access/xlog.h" +#include "port/atomics.h" +#include "storage/condition_variable.h" + +typedef struct LogicalDecodingCtlData +{ + /* True while the logical decoding status is being changed */ + bool transition_in_progress; + + /* Condition variable signaled when a transition completes */ + ConditionVariable transition_cv; + + /* + * xlog_logical_info is the authoritative value used by the all process to + * determine whether to write additional information required by logical + * decoding to WAL. Each process caches this values in XLogLogicalInfo. + */ + pg_atomic_flag xlog_logical_info; + + /* + * True if logical decoding is available on the system. + */ + pg_atomic_flag logical_decoding_enabled; +} LogicalDecodingCtlData; +extern LogicalDecodingCtlData * LogicalDecodingCtl; + +extern Size LogicalDecodingCtlShmemSize(void); +extern void LogicalDecodingCtlShmemInit(void); +extern void StartupLogicalDecodingStatus(bool status_in_control_file); +extern void InitializeProcessXLogLogicalInfo(void); +extern bool ProcessBarrierUpdateXLogLogicalInfo(void); +extern bool IsLogicalDecodingEnabled(void); +extern void EnsureLogicalDecodingEnabled(void); +extern void DisableLogicalDecodingIfNecessary(void); +extern void UpdateLogicalDecodingStatusEndOfRecovery(void); + +#endif diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index eb0b93b1114..809eba4ea5f 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,6 +15,7 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" +#include "port/atomics.h" #include "replication/walreceiver.h" /* directory to store replication slot data in */ @@ -225,6 +226,8 @@ typedef struct ReplicationSlot */ typedef struct ReplicationSlotCtlData { + pg_atomic_uint32 n_inuse_logical_slots; + /* * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some * reason you can't do that in an otherwise-empty struct. diff --git a/src/include/storage/lwlocklist.h b/src/include/storage/lwlocklist.h index a9681738146..4d78367878b 100644 --- a/src/include/storage/lwlocklist.h +++ b/src/include/storage/lwlocklist.h @@ -84,3 +84,4 @@ PG_LWLOCK(50, DSMRegistry) PG_LWLOCK(51, InjectionPoint) PG_LWLOCK(52, SerialControl) PG_LWLOCK(53, AioWorkerSubmissionQueue) +PG_LWLOCK(54, LogicalDecodingControl) diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index afeeb1ca019..8e428f298c6 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -54,6 +54,8 @@ typedef enum typedef enum { PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */ + PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO, /* ask to update + * XLogLogicalInfo */ } ProcSignalBarrierType; /* diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 799fa7ace68..d6f3f4cdf09 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou extern void assign_default_text_search_config(const char *newval, void *extra); extern bool check_default_with_oids(bool *newval, void **extra, GucSource source); +extern const char *show_effective_wal_level(void); extern bool check_huge_page_size(int *newval, void **extra, GucSource source); extern void assign_io_method(int newval, void *extra); extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source); diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index 921813483e3..344e6b2f5c9 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -876,7 +876,7 @@ $handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr); # We are not able to read from the slot as it requires wal_level >= logical on the primary server check_pg_recvlogical_stderr($handle, - "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary" + "logical decoding needs to be enabled on the primary" ); # Restore primary wal_level diff --git a/src/test/recovery/t/045_effective_wal_level.pl b/src/test/recovery/t/045_effective_wal_level.pl new file mode 100644 index 00000000000..6002e38696b --- /dev/null +++ b/src/test/recovery/t/045_effective_wal_level.pl @@ -0,0 +1,98 @@ + +# Copyright (c) 2024-2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init( + allows_streaming => 1 + ); +$primary->start(); + +# Check both wal_level and effective_wal_level values. +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "wal_level and effective_wal_level starts with the same value 'replica'"); + +$primary->safe_psql('postgres', + qq[select pg_create_physical_replication_slot('test_phy_slot', false, false)]); +is( $primary->safe_psql('postgres', qq[show effective_wal_level]), + "replica", + "effective_wal_level doesn't change with a new physical slot"); + +# Create a new logical slot +$primary->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]); + +# effective_wal_level must be bumped to 'logical' +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level bumped to logical upon logical slot creation"); + +# restart the server and check again. +$primary->restart(); +is( $primary->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level becomes logical during startup"); + +# Take backup during the effective_wal_level being 'logical'. +$primary->backup('my_backup'); + +# Initialize standby1 node from the backup 'my_backup'. Note that the +# backup was taken during the logical decoding begin enabled on the +# primary because of one logical slot, but replication slots are not +# included in the basebackup. +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, 'my_backup', + has_streaming => 1); +$standby1->set_standby_mode(); +$standby1->start; +is( $standby1->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level='logical' on standby"); + +# Promote the standby1 node that doesn't have any logical slot. So +# the logical decoding must be disabled at promotion. +$standby1->promote; +is( $standby1->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|replica", + "effective_wal_level got decrased to 'replica' during promotion"); +$standby1->stop; + +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, 'my_backup', + has_streaming => 1); +$standby2->set_standby_mode(); +$standby2->start; + +# Create a logical slot on the standby. +$standby2->create_logical_slot_on_standby($primary, 'standby2_slot', 'postgres'); + +# Promote the standby2 node that has one logical slot. So the logical decoding +# keeps enabled even after the promotion. +$standby2->promote; +is( $standby2->safe_psql('postgres', qq[ +select current_setting('wal_level'), current_setting('effective_wal_level'); + ]), + "replica|logical", + "effective_wal_level keeps 'logical' even after the promotion"); +$standby2->safe_psql('postgres', + qq[select pg_create_logical_replication_slot('standby2_slot2', 'pgoutput')]); +$standby2->stop; + +$primary->stop; +done_testing(); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index 916fdb48b3b..51f102f0c9f 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -589,7 +589,7 @@ CREATE PUBLICATION tap_pub2 FOR TABLE skip_wal; ROLLBACK; }); ok( $reterr =~ - m/WARNING: "wal_level" is insufficient to publish logical changes/, + m/WARNING: logical decoding needs to be enabled to publish logical changes/, 'CREATE PUBLICATION while "wal_level=minimal"'); done_testing(); -- 2.43.5