From a7bd2863fd2c15c3acc5221252673a7715025124 Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Wed, 27 Aug 2025 18:11:38 +0800 Subject: [PATCH v1] Avoid retaining conflict-related data when no tables are subscribed This commit fixes an issue where conflict-related data was unnecessarily retained when the subscription does not have a table. --- .../replication/logical/applyparallelworker.c | 2 +- src/backend/replication/logical/tablesync.c | 18 +++++++++++++----- src/backend/replication/logical/worker.c | 14 ++++++++++++-- src/include/replication/worker_internal.h | 2 +- 4 files changed, 27 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 31a92d1a24a..5b4fb6a08c4 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -309,7 +309,7 @@ pa_can_start(void) * should_apply_changes_for_rel) as we won't know remote_final_lsn by that * time. So, we don't start the new parallel apply worker in this case. */ - if (!AllTablesyncsReady()) + if (!AllTablesyncsReady(false)) return false; return true; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d3356bc84ee..d0dca0ebf5e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -664,7 +664,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) { CommandCounterIncrement(); /* make updates visible */ - if (AllTablesyncsReady()) + if (AllTablesyncsReady(false)) { ereport(LOG, (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", @@ -1759,15 +1759,19 @@ TablesyncWorkerMain(Datum main_arg) } /* - * If the subscription has no tables then return false. + * Check if all tablesyncs are READY for the current subscription. * - * Otherwise, are all tablesyncs READY? + * If the subscription has no tables, return the value determined by + * 'ready_if_no_tables'. + * + * Otherwise, return whether all the tables for the subscription are in the + * READY state. * * Note: This function is not suitable to be called from outside of apply or * tablesync workers because MySubscription needs to be already initialized. */ bool -AllTablesyncsReady(void) +AllTablesyncsReady(bool ready_if_no_tables) { bool started_tx = false; bool has_subrels = false; @@ -1781,11 +1785,15 @@ AllTablesyncsReady(void) pgstat_report_stat(true); } + /* If there are no tables, decide readiness based on the parameter */ + if (!has_subrels) + return ready_if_no_tables; + /* * Return false when there are no tables in subscription or not all tables * are in ready state; true otherwise. */ - return has_subrels && (table_states_not_ready == NIL); + return table_states_not_ready == NIL; } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 22ad9051db3..136584d4569 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4547,8 +4547,18 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data) * It is safe to add new tables with initial states to the subscription * after this check because any changes applied to these tables should * have a WAL position greater than the rdt_data->remote_lsn. + * + * Advancing the transaction ID is also necessary when no tables are + * subscribed, as it prevents unnecessary retention of dead tuples. Although + * it seem feasible to skip all phases and directly assign candidate_xid to + * oldest_nonremovable_xid in the RDT_GET_CANDIDATE_XID phase when no tables + * are currently subscribed, this approach is unsafe. This is because new + * tables may be added to the subscription after the initial table check, + * requiring tuples deleted before candidate_xid for conflict detection in + * upcoming transactions. Therefore, it remains necessary to wait for all + * concurrent transactions to be fully applied. */ - if (!AllTablesyncsReady()) + if (!AllTablesyncsReady(true)) return; /* @@ -5345,7 +5355,7 @@ run_apply_worker() * work. */ if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + AllTablesyncsReady(false)) { /* Start streaming with two_phase enabled */ options.proto.logical.twophase = true; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 7c0204dd6f4..08a8dcaba20 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -268,7 +268,7 @@ extern int logicalrep_sync_worker_count(Oid subid); extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); -extern bool AllTablesyncsReady(void); +extern bool AllTablesyncsReady(bool ready_if_no_tables); extern void UpdateTwoPhaseState(Oid suboid, char new_state); extern void process_syncing_tables(XLogRecPtr current_lsn); -- 2.51.0.windows.1