From f7bc80a31082e1744793ef23cbe990a9d8243d07 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v15 2/4] Alter slot option two_phase only when altering "true" to "false" MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Since the two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), the slot option must also be modified. Regarding the false->true case, the backend process alters the subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. After the subscription is enabled, a new logical replication worker requests to change the two_phase option of its slot from pending to true after the initial data synchronization is done. The code path is the same as the case in which two_phase is initially set to true, so there is no need to do something remarkable. However, for the true->false case, the backend must connect to the publisher and expressly change the parameter because the apply worker does not alter the option to false. Because this operation cannot be rolled back, altering the two_phase parameter from "true" to "false" within a transaction is prohibited. --- doc/src/sgml/ref/alter_subscription.sgml | 2 +- src/backend/commands/subscriptioncmds.c | 43 ++++++++++++++++--- .../libpqwalreceiver/libpqwalreceiver.c | 23 +++++++--- src/include/replication/walreceiver.h | 5 ++- src/test/subscription/t/021_twophase.pl | 41 +++++++++++------- 5 files changed, 83 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0b23df1b77..475a42a2e3 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -70,7 +70,7 @@ ALTER SUBSCRIPTION name RENAME TO < ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true, ALTER SUBSCRIPTION ... SET (failover = true|false) and - ALTER SUBSCRIPTION ... SET (two_phase = true|false) + ALTER SUBSCRIPTION ... SET (two_phase = off) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1d57f12942..67b1dc30a5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1085,6 +1085,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, Form_pg_subscription form; bits32 supported_opts; SubOpts opts = {0}; + bool update_failover; + bool update_two_phase; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -1181,10 +1183,25 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Resolve these transactions and try again"))); /* - * The changed two_phase option of the slot can't be - * rolled back. + * Altering the parameter from "true" to "false" within a + * transaction is prohibited. Since the apply worker does + * not alter the slot option to false, the backend must + * connect to the publisher and expressly change the + * parameter. + * + * There is no need to do something remarkable regarding + * the "false" to "true" case; the backend process alters + * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. + * After the subscription is enabled, a new logical + * replication worker requests to change the two_phase + * option of its slot from pending to true when the + * initial data synchronization is done. The code path is + * the same as the case in which two_phase is initially + * set to true. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)"); + if (!opts.twophase) + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = false)"); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = @@ -1542,14 +1559,24 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering slot. + * Check the need to alter the replication slot. Failover and two_phase + * options are controlled by both the publisher (as a slot option) and the + * subscriber (as a subscription option). The slot option must be altered + * only when changing "true" to "false". The reason has already been + * described in the ALTER_SUBSCRIPTION_OPTIONS section of this function. + */ + update_failover = replaces[Anum_pg_subscription_subfailover - 1]; + update_two_phase = (replaces[Anum_pg_subscription_subtwophasestate - 1] && + !opts.twophase); + + /* + * Try to acquire the connection necessary for altering slot, if needed. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1] || - replaces[Anum_pg_subscription_subtwophasestate - 1]) + if (update_failover || update_two_phase) { bool must_use_password; char *err; @@ -1569,7 +1596,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 2f035a0c3c..07dfec947d 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase); + const bool *failover, const bool *two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase) + const bool *failover, const bool *two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", - quote_identifier(slotname), - failover ? "true" : "false", - two_phase ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s", + *failover ? "true" : "false"); + + if (failover && two_phase) + appendStringInfo(&cmd, ", "); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s", + *two_phase ? "true" : "false"); + + appendStringInfoString(&cmd, " );"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31fa1257ec..7ffa5a58b3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover, - bool two_phase); + const bool *failover, + const bool *two_phase); + /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 4e8f627f7b..f56dff4b12 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -375,6 +375,12 @@ $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); # then verify that the altered subscription reflects the two_phase option. ############################### +# Confirm two-phase slot option is enabled before altering +$result = $node_publisher->safe_psql('postgres', + "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';" +); +is($result, qq(t), 'two-phase is enabled'); + # Alter subscription two_phase to false $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); @@ -393,7 +399,13 @@ $node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); $result = $node_subscriber->safe_psql('postgres', "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" ); -is($result, qq(d), 'two-phase should be disabled'); +is($result, qq(d), 'two-phase subscription option should be disabled'); + +# Make sure that the two-phase slot option is also disabled +$result = $node_publisher->safe_psql('postgres', + "SELECT two_phase FROM pg_replication_slots WHERE slot_name = 'tap_sub_copy';" +); +is($result, qq(f), 'two-phase slot option should be disabled'); # Now do a prepare on the publisher and make sure that it is not replicated. $node_publisher->safe_psql( @@ -411,6 +423,19 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'should be no prepared transactions on subscriber'); +# Toggle the two_phase to "true" *before* the COMMIT PREPARED. Since we are the +# special path for the case where both two_phase and failover are altered, it +# is also set to "true". +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" +); +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true, failover = true); + ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); + # Now commit the insert and verify that it is replicated $node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';"); @@ -422,20 +447,6 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); is($result, qq(3), 'replicated data in subscriber table'); -# Alter subscription two_phase to true -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); -$node_subscriber->poll_query_until('postgres', - "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" -); -$node_subscriber->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true); - ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); - -# Wait for subscription startup -$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); - # Make sure that the two-phase is enabled on the subscriber $result = $node_subscriber->safe_psql('postgres', "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" -- 2.43.0