From 2e226b6bfd8bf795cf472be03936f3c990dbe81c Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Tue, 3 Jun 2025 11:40:30 +0800 Subject: [PATCH v33 2/7] Maintain the replication slot in logical launcher to retain dead tuples This patch enables the logical replication launcher to create and maintain a replication slot named pg_conflict_detection. The launcher periodically collects the oldest_nonremovable_xid from all apply workers. It then computes the minimum transaction ID and advances the xmin value of the replication slot if it precedes the computed value. --- doc/src/sgml/config.sgml | 2 + doc/src/sgml/func.sgml | 16 +- doc/src/sgml/protocol.sgml | 2 + doc/src/sgml/ref/create_subscription.sgml | 4 +- src/backend/access/transam/xlogrecovery.c | 2 +- src/backend/commands/subscriptioncmds.c | 2 +- src/backend/replication/logical/launcher.c | 170 +++++++++++++++++- .../replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 3 + src/backend/replication/slot.c | 34 +++- src/include/replication/logicallauncher.h | 1 + src/include/replication/slot.h | 11 +- 12 files changed, 236 insertions(+), 13 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 021153b2a5f..20b74d92006 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4961,6 +4961,8 @@ ANY num_sync ( The name of the slot to create. Must be a valid replication slot name (see ). + The name cannot be pg_conflict_detection as it + is reserved for the conflict detection. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..37fd40252a3 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -169,7 +169,9 @@ CREATE SUBSCRIPTION subscription_name Name of the publisher's replication slot to use. The default is - to use the name of the subscription for the slot name. + to use the name of the subscription for the slot name. The name cannot + be pg_conflict_detection as it is reserved for the + conflict detection. diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 6ce979f2d8b..2dcda37bc77 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4760,7 +4760,7 @@ bool check_primary_slot_name(char **newval, void **extra, GucSource source) { if (*newval && strcmp(*newval, "") != 0 && - !ReplicationSlotValidateName(*newval, WARNING)) + !ReplicationSlotValidateName(*newval, false, WARNING)) return false; return true; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..46d4e65da97 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -210,7 +210,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 9b155705dbb..4dcc6031659 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -90,8 +91,12 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; +/* + * Whether the slot used to retain dead tuples for conflict detection has been + * dropped. + */ +static bool conflict_slot_dropped = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +105,9 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void create_conflict_slot_if_not_exists(void); +static void advance_conflict_slot_xmin(TransactionId new_xmin); +static void drop_conflict_slot_if_exists(void); /* @@ -1106,7 +1114,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1147,6 +1158,8 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + TransactionId xmin = InvalidTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1166,15 +1179,56 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + /* + * Create the conflict slot before starting the worker to prevent + * it from unnecessarily maintaining its oldest_nonremovable_xid. + */ + create_conflict_slot_if_not_exists(); + if (!sub->enabled) + { + can_advance_xmin = false; continue; + } LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) + { + /* + * Collect non-removable transaction IDs from all apply + * workers to determine the xmin for advancing the replication + * slot used in conflict detection. + */ + if (can_advance_xmin) + { + TransactionId nonremovable_xid; + + SpinLockAcquire(&w->relmutex); + nonremovable_xid = w->oldest_nonremovable_xid; + SpinLockRelease(&w->relmutex); + + /* + * Stop advancing xmin if an invalid non-removable + * transaction ID is found, otherwise update xmin. + */ + if (!TransactionIdIsValid(nonremovable_xid)) + can_advance_xmin = false; + else if (!TransactionIdIsValid(xmin) || + TransactionIdPrecedes(nonremovable_xid, xmin)) + xmin = nonremovable_xid; + } + continue; /* worker is running already */ + } + + /* + * The worker has not yet started, so there is no valid + * non-removable transaction ID available for advancement. + */ + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1207,6 +1261,16 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Maintain the xmin value of the replication slot for conflict + * detection if needed. Otherwise, drop the slot if we're no longer + * retaining information useful for conflict detection. + */ + if (sublist && can_advance_xmin) + advance_conflict_slot_xmin(xmin); + else if (!sublist) + drop_conflict_slot_if_exists(); + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1234,6 +1298,108 @@ ApplyLauncherMain(Datum main_arg) /* Not reachable */ } +/* + * Create and acquire the replication slot used to retain dead tuples for + * conflict detection, if not yet. + */ +static void +create_conflict_slot_if_not_exists(void) +{ + TransactionId xmin_horizon; + + /* Exit early if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + /* If the replication slot exists, acquire it and exit */ + if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + { + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return; + } + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, + RS_PERSISTENT, false, false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + conflict_slot_dropped = false; +} + +/* + * Attempt to advance the xmin value of the replication slot used to retain + * dead tuples for conflict detection. + */ +static void +advance_conflict_slot_xmin(TransactionId new_xmin) +{ + Assert(MyReplicationSlot); + Assert(TransactionIdIsValid(new_xmin)); + Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, + new_xmin)); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.xmin = new_xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + /* first write new xmin to disk, so we know what's up after a crash */ + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only advance + * xmin here and there's not much harm done by a concurrent computation + * missing that. + */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + + return; +} + +/* + * Drop the replication slot used to retain dead tuples for conflict detection, + * if it exists. + */ +static void +drop_conflict_slot_if_exists(void) +{ + /* + * Avoid the overhead of scanning shared memory for a replication slot that + * is known to have been dropped. + */ + if (conflict_slot_dropped) + return; + + if (MyReplicationSlot) + ReplicationSlotDropAcquired(); + else if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + ReplicationSlotDrop(CONFLICT_DETECTION_SLOT, true); + + conflict_slot_dropped = true; +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 67655111875..85239f6c316 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4787,7 +4787,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a4f4772fdd5..e56b4e06767 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4386,6 +4386,9 @@ wait_for_local_flush(RetainConflictInfoData *rci_data) LSN_FORMAT_ARGS(rci_data->remote_lsn), rci_data->candidate_xid); + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + /* * Reset all data fields except those used to determine the timing for the * next round of transaction ID advancement. We can even use diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 600b87fa9cb..668279cb4e8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsReservedSlotName(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,9 +306,29 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsReservedSlotName(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name)); + + return false; + } + return true; } +/* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsReservedSlotName(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + /* * Create a new replication slot and mark it as used by this backend. * @@ -330,7 +356,11 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher might be creating an internal slot, so using a + * reserved name is allowed in this case. + */ + ReplicationSlotValidateName(name, IsLogicalLauncher(), ERROR); if (failover) { diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..7b29f1814db 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -25,6 +25,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index eb0b93b1114..e03e123a2ff 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -20,6 +20,13 @@ /* directory to store replication slot data in */ #define PG_REPLSLOT_DIR "pg_replslot" +/* + * The reserved name for a replication slot used to retain dead tuples for + * conflict detection in logical replication. See + * maybe_advance_nonremovable_xid() for detail. + */ +#define CONFLICT_DETECTION_SLOT "pg_conflict_detection" + /* * Behaviour of replication slots, upon release or crash. * @@ -284,7 +291,9 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern void ReplicationSlotInitialize(void); -extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern bool ReplicationSlotValidateName(const char *name, + bool allow_reserved_name, + int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); -- 2.30.0.windows.2