From 4ec216d520dd1f8fb1b66fb7124b9e0323ba5f6b Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 7 Jul 2025 14:28:45 +0800 Subject: [PATCH v49 2/2] refactor launcher slot creation --- doc/src/sgml/ref/create_subscription.sgml | 37 +++++++ src/backend/replication/logical/launcher.c | 121 ++++++++++----------- 2 files changed, 95 insertions(+), 63 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7782246727e..c1f1445dbe5 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -469,6 +469,43 @@ CREATE SUBSCRIPTION subscription_nameretain_conflict_info if the subscription will be inactive for an extended period. + + + Additionally when enabling retain_conflict_info for + conflict detection in logical replication, it is important to design the + replication topology to balance data retention requirements with + overall system performance. This option provides minimal performance + overhead when applied appropriately. The following scenarios illustrate + effective usage patterns when enabling this option. + + + + a. Large Tables with Bidirectional Writes: + For large tables subject to concurrent writes on both publisher and + subscriber nodes, publishers can define row filters when creating + publications to segment data. This allows multiple subscriptions + to replicate exclusive subsets of the table, minimizing conflict + potential and optimizing throughput. + + + + b. Write-Enabled Subscribers: + If a subscriber node is expected to perform write operations, replication + can be structured using multiple publications and subscriptions. By + distributing tables across these publications, the workload is spread among + several apply workers, improving concurrency and reducing contention. + + + + c. Read-Only Subscribers: + In configurations involving single or multiple publisher nodes + performing concurrent write operations, read-only subscriber nodes may + replicate changes without seeing a performance impact if it does index + scan. However, if the subscriber is impacted due to replication lag or + scan performance (say due to sequential scans), it needs to follow one + of the two previous strategies to distribute the workload on the + subscriber. + diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index f18fee0333e..e0f101784c7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -102,8 +102,7 @@ static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); static void compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, - TransactionId *xmin, - bool *can_advance_xmin); + TransactionId *xmin); static bool acquire_conflict_slot_if_exists(void); static void advance_conflict_slot_xmin(TransactionId new_xmin); @@ -1204,22 +1203,41 @@ ApplyLauncherMain(Datum main_arg) TimestampTz now; long elapsed; - retain_conflict_info |= sub->retainconflictinfo; - - if (!sub->enabled) + if (sub->retainconflictinfo) { + retain_conflict_info = true; + /* - * This is required to ensure that we don't advance the xmin - * of CONFLICT_DETECTION_SLOT if one of the subscriptions is - * not enabled. Otherwise, we won't be able to detect - * conflicts reliably for such a subscription even though it - * has set the retain_conflict_info option. + * Can't advance xmin of the slot unless all the subscriptions + * with retain_conflict_info are enabled. This is required to + * ensure that we don't advance the xmin of + * CONFLICT_DETECTION_SLOT if one of the subscriptions is not + * enabled. Otherwise, we won't be able to detect conflicts + * reliably for such a subscription even though it has set the + * retain_conflict_info option. */ - compute_min_nonremovable_xid(NULL, sub->retainconflictinfo, - &xmin, &can_advance_xmin); - continue; + can_advance_xmin &= sub->enabled; + + /* + * Create a replication slot to retain information necessary + * for conflict detection such as dead tuples, commit + * timestamps, and origins. + * + * The slot is created before starting the apply worker to + * prevent it from unnecessarily maintaining its + * oldest_nonremovable_xid. + * + * The slot is created even for a disabled subscription to + * ensure that conflict-related information is available when + * applying remote changes that occurred before the + * subscription was enabled. + */ + CreateConflictDetectionSlot(); } + if (!sub->enabled) + continue; + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); w = logicalrep_worker_find(sub->oid, InvalidOid, false); LWLockRelease(LogicalRepWorkerLock); @@ -1228,12 +1246,20 @@ ApplyLauncherMain(Datum main_arg) * Compute the minimum xmin required to protect deleted tuples * required for conflict detection. */ - compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin, - &can_advance_xmin); + if (can_advance_xmin) + compute_min_nonremovable_xid(w, sub->retainconflictinfo, &xmin); if (w != NULL) continue; /* worker is running already */ + /* + * Can't advance xmin of the slot unless all the workers + * corresponding to subscriptions with retain_conflict_info are + * running. + */ + if (sub->retainconflictinfo) + can_advance_xmin = false; + /* * If the worker is eligible to start now, launch it. Otherwise, * adjust wait_time so that we'll wake up as soon as it can be @@ -1322,62 +1348,31 @@ ApplyLauncherMain(Datum main_arg) * Determine the minimum non-removable transaction ID across all apply workers * for subscriptions that have retain_conflict_info enabled. Store the result * in *xmin. - * - * If the replication slot cannot be advanced during this cycle, due to either - * a disabled subscription or an inactive worker, set *can_advance_xmin to - * false. */ static void -compute_min_nonremovable_xid(LogicalRepWorker *worker, - bool retain_conflict_info, TransactionId *xmin, - bool *can_advance_xmin) +compute_min_nonremovable_xid(LogicalRepWorker *worker, bool retain_conflict_info, + TransactionId *xmin) { - if (!retain_conflict_info || !*can_advance_xmin) - return; + TransactionId nonremovable_xid; - if (worker) - { - TransactionId nonremovable_xid; + if (!retain_conflict_info || !worker) + return; - /* - * The replication slot for conflict detection must be created before - * the worker starts. - */ - Assert(MyReplicationSlot); + /* + * The replication slot for conflict detection must be created before the + * worker starts. + */ + Assert(MyReplicationSlot); - SpinLockAcquire(&worker->relmutex); - nonremovable_xid = worker->oldest_nonremovable_xid; - SpinLockRelease(&worker->relmutex); + SpinLockAcquire(&worker->relmutex); + nonremovable_xid = worker->oldest_nonremovable_xid; + SpinLockRelease(&worker->relmutex); - Assert(TransactionIdIsValid(nonremovable_xid)); + Assert(TransactionIdIsValid(nonremovable_xid)); - if (!TransactionIdIsValid(*xmin) || - TransactionIdPrecedes(nonremovable_xid, *xmin)) - *xmin = nonremovable_xid; - } - else - { - /* - * Create a replication slot to retain information necessary for - * conflict detection such as dead tuples, commit timestamps, and - * origins. - * - * The slot is created before starting the apply worker to prevent it - * from unnecessarily maintaining its oldest_nonremovable_xid. - * - * The slot is created even for a disabled subscription to ensure that - * conflict-related information is available when applying remote - * changes that occurred before the subscription was enabled. - */ - CreateConflictDetectionSlot(); - - /* - * Can't advance xmin of the slot unless all the subscriptions with - * retain_conflict_info are enabled and the corresponding workers are - * running. - */ - *can_advance_xmin = false; - } + if (!TransactionIdIsValid(*xmin) || + TransactionIdPrecedes(nonremovable_xid, *xmin)) + *xmin = nonremovable_xid; } /* -- 2.50.1.windows.1