From 78931fdd4952aa10ef9dbdca8570ea50739ed62b Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 26 Sep 2024 12:11:34 +0800 Subject: [PATCH v29 2/7] Maintain the replication slot in logical launcher to retain dead tuples This patch enables the logical replication launcher to create and maintain a replication slot named pg_conflict_detection. The launcher periodically collects the oldest_nonremovable_xid from all apply workers. It then computes the minimum transaction ID and advances the xmin value of the replication slot if it precedes the computed value. The interval for updating the slot (nap time) is dynamically adjusted based on the activity of the apply workers. The launcher waits for a certain period before performing the next update, with the duration varying depending on whether the xmin value of the replication slot was updated during the last cycle. --- doc/src/sgml/config.sgml | 2 + doc/src/sgml/func.sgml | 14 +- doc/src/sgml/protocol.sgml | 2 + doc/src/sgml/ref/create_subscription.sgml | 4 +- src/backend/access/transam/xlogrecovery.c | 2 +- src/backend/commands/subscriptioncmds.c | 2 +- src/backend/replication/logical/launcher.c | 228 +++++++++++++++++- .../replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 3 + src/backend/replication/slot.c | 34 ++- src/include/replication/logicallauncher.h | 1 + src/include/replication/slot.h | 11 +- 12 files changed, 293 insertions(+), 12 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 69fc93dffc4..f5c0c6b2493 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4899,6 +4899,8 @@ ANY num_sync ( The name of the slot to create. Must be a valid replication slot name (see ). + The name cannot be pg_conflict_detection, as it + is reserved for logical replication conflict detection. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 57dec28a5df..eec85cde880 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -169,7 +169,9 @@ CREATE SUBSCRIPTION subscription_name Name of the publisher's replication slot to use. The default is - to use the name of the subscription for the slot name. + to use the name of the subscription for the slot name. The name cannot + be pg_conflict_detection, as it is reserved for + logical replication conflict detection. diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 2c19013c98b..0ff8c3ae046 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -4754,7 +4754,7 @@ bool check_primary_slot_name(char **newval, void **extra, GucSource source) { if (*newval && strcmp(*newval, "") != 0 && - !ReplicationSlotValidateName(*newval, WARNING)) + !ReplicationSlotValidateName(*newval, false, WARNING)) return false; return true; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4aec73bcc6b..46d4e65da97 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -210,7 +210,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (strcmp(opts->slot_name, "none") == 0) opts->slot_name = NULL; else - ReplicationSlotValidateName(opts->slot_name, ERROR); + ReplicationSlotValidateName(opts->slot_name, false, ERROR); } else if (IsSet(supported_opts, SUBOPT_COPY_DATA) && strcmp(defel->defname, "copy_data") == 0) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 8e42787a426..86c1a191dc2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -32,6 +32,7 @@ #include "postmaster/interrupt.h" #include "replication/logicallauncher.h" #include "replication/origin.h" +#include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/worker_internal.h" #include "storage/ipc.h" @@ -46,6 +47,18 @@ /* max sleep time between cycles (3min) */ #define DEFAULT_NAPTIME_PER_CYCLE 180000L +/* + * Min sleep time (200ms) between cycles to update the xmin value of the + * replication slot. + */ +#define MIN_NAPTIME_PER_SLOT_UPDATE 200 + +/* + * Max sleep time between xmin update cycles (30 seconds) if any + * subscription has retain_conflict_info set to true. + */ +#define MAX_NAPTIME_FOR_SLOT_UPDATE 30000L + /* GUC variables */ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; @@ -91,7 +104,6 @@ static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; -static void ApplyLauncherWakeup(void); static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); @@ -100,6 +112,10 @@ static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); +static void create_conflict_slot_if_not_exists(void); +static bool advance_conflict_slot_xmin(FullTransactionId new_xmin); +static void compute_slot_update_naptime(bool slot_updated, long *sleep_time); +static void drop_conflict_slot_if_exists(void); /* @@ -1106,7 +1122,10 @@ ApplyLauncherWakeupAtCommit(void) on_commit_launcher_wakeup = true; } -static void +/* + * Wakeup the launcher immediately. + */ +void ApplyLauncherWakeup(void) { if (LogicalRepCtx->launcher_pid != 0) @@ -1119,6 +1138,9 @@ ApplyLauncherWakeup(void) void ApplyLauncherMain(Datum main_arg) { + bool slot_maybe_exist = true; + long slot_update_wait_time = MIN_NAPTIME_PER_SLOT_UPDATE; + ereport(DEBUG1, (errmsg_internal("logical replication launcher started"))); @@ -1147,6 +1169,8 @@ ApplyLauncherMain(Datum main_arg) MemoryContext subctx; MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; + bool can_advance_xmin = true; + FullTransactionId xmin = InvalidFullTransactionId; CHECK_FOR_INTERRUPTS(); @@ -1166,15 +1190,56 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; + /* + * Create the conflict slot before starting the worker to prevent + * it from unnecessarily maintaining its oldest_nonremovable_xid. + */ + create_conflict_slot_if_not_exists(); + if (!sub->enabled) + { + can_advance_xmin = false; continue; + } LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); if (w != NULL) + { + /* + * Collect non-removable transaction IDs from all apply + * workers to determine the xmin for advancing the replication + * slot used in conflict detection. + */ + if (can_advance_xmin) + { + FullTransactionId nonremovable_xid; + + SpinLockAcquire(&w->relmutex); + nonremovable_xid = w->oldest_nonremovable_xid; + SpinLockRelease(&w->relmutex); + + /* + * Stop advancing xmin if an invalid non-removable + * transaction ID is found, otherwise update xmin. + */ + if (!FullTransactionIdIsValid(nonremovable_xid)) + can_advance_xmin = false; + else if (!FullTransactionIdIsValid(xmin) || + FullTransactionIdPrecedes(nonremovable_xid, xmin)) + xmin = nonremovable_xid; + } + continue; /* worker is running already */ + } + + /* + * The worker has not yet started, so there is no valid + * non-removable transaction ID available for advancement. + */ + can_advance_xmin = false; /* * If the worker is eligible to start now, launch it. Otherwise, @@ -1207,6 +1272,33 @@ ApplyLauncherMain(Datum main_arg) } } + /* + * Maintain the xmin value of the replication slot for conflict + * detection if needed, and update the sleep time before the next + * attempt. + */ + if (sublist) + { + bool updated = false; + + if (can_advance_xmin) + updated = advance_conflict_slot_xmin(xmin); + + compute_slot_update_naptime(updated, &slot_update_wait_time); + wait_time = Min(wait_time, slot_update_wait_time); + + slot_maybe_exist = true; + } + + /* + * Drop the slot if we're no longer retaining dead tuples. + */ + else if (slot_maybe_exist) + { + drop_conflict_slot_if_exists(); + slot_maybe_exist = false; + } + /* Switch back to original memory context. */ MemoryContextSwitchTo(oldctx); /* Clean the temporary memory. */ @@ -1234,6 +1326,138 @@ ApplyLauncherMain(Datum main_arg) /* Not reachable */ } +/* + * Create and acquire the replication slot used to retain dead tuples for + * conflict detection, if not yet. + */ +static void +create_conflict_slot_if_not_exists(void) +{ + TransactionId xmin_horizon; + + /* Exit early if the replication slot is already created and acquired */ + if (MyReplicationSlot) + return; + + /* If the replication slot exists, acquire it and exit */ + if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + { + ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false); + return; + } + + ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, + RS_PERSISTENT, false, false, false); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + + xmin_horizon = GetOldestSafeDecodingTransactionId(false); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = xmin_horizon; + MyReplicationSlot->data.xmin = xmin_horizon; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(true); + + LWLockRelease(ProcArrayLock); + + /* Write this slot to disk */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); +} + +/* + * Attempt to advance the xmin value of the replication slot used to retain + * dead tuples for conflict detection. + */ +static bool +advance_conflict_slot_xmin(FullTransactionId new_xmin) +{ + FullTransactionId full_xmin; + FullTransactionId next_full_xid; + + Assert(MyReplicationSlot); + Assert(FullTransactionIdIsValid(new_xmin)); + + next_full_xid = ReadNextFullTransactionId(); + + /* + * Compute FullTransactionId for the current xmin. This handles the case + * where transaction ID wraparound has occurred. + */ + full_xmin = FullTransactionIdFromAllowableAt(next_full_xid, + MyReplicationSlot->data.xmin); + + if (FullTransactionIdPrecedesOrEquals(new_xmin, full_xmin)) + return false; + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.xmin = XidFromFullTransactionId(new_xmin); + SpinLockRelease(&MyReplicationSlot->mutex); + + /* first write new xmin to disk, so we know what's up after a crash */ + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin); + + /* + * Now the new xmin is safely on disk, we can let the global value + * advance. We do not take ProcArrayLock or similar since we only advance + * xmin here and there's not much harm done by a concurrent computation + * missing that. + */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->effective_xmin = MyReplicationSlot->data.xmin; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotsComputeRequiredXmin(false); + + return true; +} + +/* + * Update the sleep time before the next slot update. + * + * If there is no slot activity, the wait time between sync cycles will double + * (up to a maximum of 30 seconds). If there is some slot activity, the wait + * time between sync cycles is reset to the minimum (200ms). + */ +static void +compute_slot_update_naptime(bool slot_updated, long *sleep_time) +{ + + if (!slot_updated) + { + /* + * The slot was not updated, so double the sleep time, but not beyond + * the maximum allowable value. + */ + *sleep_time = Min(*sleep_time * 2, MAX_NAPTIME_FOR_SLOT_UPDATE); + } + else + { + /* + * The slot was updated since the last sleep, so reset the sleep time. + */ + *sleep_time = MIN_NAPTIME_PER_SLOT_UPDATE; + } +} + +/* + * Drop the replication slot used to retain dead tuples for conflict detection, + * if it exists. + */ +static void +drop_conflict_slot_if_exists(void) +{ + if (MyReplicationSlot) + ReplicationSlotDropAcquired(); + else if (SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true)) + ReplicationSlotDrop(CONFLICT_DETECTION_SLOT, true); +} + /* * Is current process the logical replication launcher? */ diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 977fbcd2474..ddf5dea602a 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -4787,7 +4787,7 @@ StartupReorderBuffer(void) continue; /* if it cannot be a slot, skip the directory */ - if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) + if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2)) continue; /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 0c592b10e09..095a48f6075 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4355,6 +4355,9 @@ wait_for_local_flush(RetainConflictInfoData *data) LSN_FORMAT_ARGS(data->remote_lsn), XidFromFullTransactionId(data->candidate_xid)); + /* Notify launcher to update the xmin of the conflict slot */ + ApplyLauncherWakeup(); + /* * Reset all data fields except those used to determine the timing for the * next round of transaction ID advancement. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 719e531eb90..13b9396dffb 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/interrupt.h" +#include "replication/logicallauncher.h" #include "replication/slotsync.h" #include "replication/slot.h" #include "replication/walsender_private.h" @@ -172,6 +173,7 @@ static SyncStandbySlotsConfigData *synchronized_standby_slots_config; static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); +static bool IsReservedSlotName(const char *name); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -258,13 +260,17 @@ ReplicationSlotShmemExit(int code, Datum arg) /* * Check whether the passed slot name is valid and report errors at elevel. * + * An error will be reported for a reserved replication slot name if + * allow_reserved_name is set to false. + * * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow * the name to be used as a directory name on every supported OS. * * Returns whether the directory name is valid or not if elevel < ERROR. */ bool -ReplicationSlotValidateName(const char *name, int elevel) +ReplicationSlotValidateName(const char *name, bool allow_reserved_name, + int elevel) { const char *cp; @@ -300,9 +306,29 @@ ReplicationSlotValidateName(const char *name, int elevel) return false; } } + + if (!allow_reserved_name && IsReservedSlotName(name)) + { + ereport(elevel, + errcode(ERRCODE_RESERVED_NAME), + errmsg("replication slot name \"%s\" is reserved", + name)); + + return false; + } + return true; } +/* + * Return true if the replication slot name is "pg_conflict_detection". + */ +static bool +IsReservedSlotName(const char *name) +{ + return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0); +} + /* * Create a new replication slot and mark it as used by this backend. * @@ -330,7 +356,11 @@ ReplicationSlotCreate(const char *name, bool db_specific, Assert(MyReplicationSlot == NULL); - ReplicationSlotValidateName(name, ERROR); + /* + * The logical launcher might be creating an internal slot, so using a + * reserved name is allowed in this case. + */ + ReplicationSlotValidateName(name, IsLogicalLauncher(), ERROR); if (failover) { diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index 82b202f3305..7b29f1814db 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -25,6 +25,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeup(void); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index f5a24ccfbf2..e4f1e69cb6b 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -20,6 +20,13 @@ /* directory to store replication slot data in */ #define PG_REPLSLOT_DIR "pg_replslot" +/* + * The reserved name for a replication slot used to retain dead tuples for + * conflict detection in logical replication. See + * maybe_advance_nonremovable_xid() for detail. + */ +#define CONFLICT_DETECTION_SLOT "pg_conflict_detection" + /* * Behaviour of replication slots, upon release or crash. * @@ -284,7 +291,9 @@ extern void ReplicationSlotMarkDirty(void); /* misc stuff */ extern void ReplicationSlotInitialize(void); -extern bool ReplicationSlotValidateName(const char *name, int elevel); +extern bool ReplicationSlotValidateName(const char *name, + bool allow_reserved_name, + int elevel); extern void ReplicationSlotReserveWal(void); extern void ReplicationSlotsComputeRequiredXmin(bool already_locked); extern void ReplicationSlotsComputeRequiredLSN(void); -- 2.30.0.windows.2