From c0e734988024bcf6001d52259a197a3e2df385e4 Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Fri, 1 Aug 2025 15:25:53 +0530 Subject: [PATCH v3_PG16] Fix ALTER SUBSCRIPTION ... SET PUBLICATION ... command. The problem is that ALTER SUBSCRIPTION ... SET PUBLICATION ... will lead to restarting of apply worker and after the restart, the apply worker will use the existing slot and replication origin corresponding to the subscription. Now, it is possible that before the restart, the origin has not been updated, and the WAL start location points to a location before where PUBLICATION pointed to by SET PUBLICATION doesn't exist, and that can lead to an error like: "ERROR: publication "pub1" does not exist". Once this error occurs, apply worker will never be able to proceed and will always return the same error. We decided to skip loading the publication if the publication does not exist. The publication is loaded later and updates the relation entry when the publication gets created. --- src/backend/replication/pgoutput/pgoutput.c | 16 ++++++- src/test/subscription/t/024_add_drop_pub.pl | 49 ++++++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 32b74bb4752..d83da138081 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1737,6 +1737,11 @@ pgoutput_shutdown(LogicalDecodingContext *ctx) /* * Load publications from the list of publication names. + * + * Here, we skip the publications that don't exist yet. This will allow us + * to silently continue the replication in the absence of a missing publication. + * This is required because we allow the users to create publications after they + * have specified the required publications at the time of replication start. */ static List * LoadPublications(List *pubnames) @@ -1747,9 +1752,16 @@ LoadPublications(List *pubnames) foreach(lc, pubnames) { char *pubname = (char *) lfirst(lc); - Publication *pub = GetPublicationByName(pubname, false); + Publication *pub = GetPublicationByName(pubname, true); - result = lappend(result, pub); + if (pub) + result = lappend(result, pub); + else + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipped loading publication: %s", pubname), + errdetail("The publication does not exist at this point in the WAL."), + errhint("Create the publication if it does not exist.")); } return result; diff --git a/src/test/subscription/t/024_add_drop_pub.pl b/src/test/subscription/t/024_add_drop_pub.pl index 8614b1b5b34..5266fe30172 100644 --- a/src/test/subscription/t/024_add_drop_pub.pl +++ b/src/test/subscription/t/024_add_drop_pub.pl @@ -1,7 +1,9 @@ # Copyright (c) 2021-2023, PostgreSQL Global Development Group -# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION +# This test checks behaviour of ALTER SUBSCRIPTION ... ADD/DROP PUBLICATION and +# ensures that creating a publication associated with a subscription at a later +# point of time does not break logical replication. use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -80,6 +82,51 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_1"); is($result, qq(20|1|10), 'check initial data is copied to subscriber'); +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +# Ensure that setting a missing publication to the subscription does not +# disrupt existing logical replication. Instead, it should log a warning +# while allowing replication to continue. Additionally, verify that replication +# resumes after the missing publication is created for the publication table. + +# Create table on publisher and subscriber +$node_publisher->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); +$node_subscriber->safe_psql('postgres', "CREATE TABLE tab_3 (a int)"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub_3" +); + +# Wait for initial table sync to finish +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); + +my $offset = -s $node_publisher->logfile; + +$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_3"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(1)"); + +# Verify that a warning is logged. +$node_publisher->wait_for_log( + qr/WARNING: ( [A-Z0-9]+:)? skipped loading publication: tap_pub_3/, + $offset); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_3 FOR TABLE tab_3"); + +$node_publisher->safe_psql('postgres', "INSERT INTO tab_3 values(2)"); + +$node_publisher->wait_for_catchup('tap_sub'); + +# Verify that the insert operation gets replicated to subscriber after +# publication is created. +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab_3"); +is($result, qq(2), + 'check that the incremental data is replicated after the publication is created' +); + # shutdown $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.43.0