From b3853307efcdbee2c9053688b3860f0b2347462b Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Wed, 17 Apr 2024 06:18:23 +0000 Subject: [PATCH v8 2/4] Alter slot option two_phase only when altering "on" to "off" Since the two_phase option is controlled by both the publisher (as a slot option) and the subscriber (as a subscription option), the slot option must also be modified. Regarding the off->on case, the logical replication already has a mechanism for it, so there is no need to do anything special for the on->off case; however, we must connect to the publisher and expressly change the parameter. The operation cannot be rolled back, and altering the parameter from "on" to "off" within a transaction is prohibited. --- doc/src/sgml/ref/alter_subscription.sgml | 2 +- src/backend/commands/subscriptioncmds.c | 30 ++++-- .../libpqwalreceiver/libpqwalreceiver.c | 23 +++-- src/include/replication/walreceiver.h | 5 +- src/test/subscription/meson.build | 1 + src/test/subscription/t/099_twophase_added.pl | 95 +++++++++++++++++++ 6 files changed, 140 insertions(+), 16 deletions(-) create mode 100644 src/test/subscription/t/099_twophase_added.pl diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 88e9a72147..0c2894a94e 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -70,7 +70,7 @@ ALTER SUBSCRIPTION name RENAME TO < ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... with refresh option as true, ALTER SUBSCRIPTION ... SET (failover = on|off) and - ALTER SUBSCRIPTION ... SET (two_phase = on|off) + ALTER SUBSCRIPTION ... SET (two_phase = off) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 90d967eb7c..6b2cb71dac 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1097,6 +1097,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, Form_pg_subscription form; bits32 supported_opts; SubOpts opts = {0}; + bool update_failover; + bool update_two_phase; rel = table_open(SubscriptionRelationId, RowExclusiveLock); @@ -1186,10 +1188,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Resolve these transactions and try again"))); /* - * The changed two_phase option of the slot can't be rolled - * back. + * Since the altering two_phase option of subscriptions + * also leads to the change of slot option, this command + * cannot be rolled back. So prevent we are in the + * transaction block. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (two_phase)"); + if (!opts.twophase) + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = off)"); /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = @@ -1547,14 +1553,22 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, } /* - * Try to acquire the connection necessary for altering slot. + * Check the need to alter the replication slot. Failover and two_phase + * options are controlled by both the publisher (as a slot option) and the + * subscriber (as a subscription option). + */ + update_failover = replaces[Anum_pg_subscription_subfailover - 1]; + update_two_phase = (replaces[Anum_pg_subscription_subtwophasestate - 1] && + !opts.twophase); + + /* + * Try to acquire the connection necessary for altering slot, if needed. * * This has to be at the end because otherwise if there is an error while * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1] || - replaces[Anum_pg_subscription_subtwophasestate - 1]) + if (update_failover || update_two_phase) { bool must_use_password; char *err; @@ -1574,7 +1588,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); + walrcv_alter_slot(wrconn, sub->slotname, + update_failover ? &opts.failover : NULL, + update_two_phase ? &opts.twophase : NULL); } PG_FINALLY(); { diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 998bbd517a..2800597f4c 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase); + const bool *failover, const bool *two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,16 +1121,27 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover, bool two_phase) + const bool *failover, const bool *two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", - quote_identifier(slotname), - failover ? "true" : "false", - two_phase ? "true" : "false"); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ", + quote_identifier(slotname)); + + if (failover) + appendStringInfo(&cmd, "FAILOVER %s", + *failover ? "true" : "false"); + + if (failover && two_phase) + appendStringInfo(&cmd, ", "); + + if (two_phase) + appendStringInfo(&cmd, "TWO_PHASE %s", + *two_phase ? "true" : "false"); + + appendStringInfoString(&cmd, ");"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 31fa1257ec..7ffa5a58b3 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -377,8 +377,9 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover, - bool two_phase); + const bool *failover, + const bool *two_phase); + /* * walrcv_get_backend_pid_fn diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index c591cd7d61..b4bd522c3d 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -40,6 +40,7 @@ tests += { 't/031_column_list.pl', 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', + 't/099_twophase_added.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl new file mode 100644 index 0000000000..62f4acad27 --- /dev/null +++ b/src/test/subscription/t/099_twophase_added.pl @@ -0,0 +1,95 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Additional tests for altering two_phase option +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Initialize publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10)); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->append_conf('postgresql.conf', + qq(max_prepared_transactions = 10 + log_min_messages = debug1)); +$node_subscriber->start; + +# Define tables on both nodes +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY);"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication, with two_phase = "off" +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub FOR ALL TABLES"); + +my $log_offset = -s $node_subscriber->logfile; + +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION regress_sub + CONNECTION '$publisher_connstr' PUBLICATION pub + WITH (two_phase = off, copy_data = off, failover = off)"); + +# Verify the started worker recognized two_phase was disabled +$node_subscriber->wait_for_log( + 'logical replication apply worker for subscription "regress_sub" two_phase is DISABLED', $log_offset); + +# Check the case that prepared transactions exist on the publisher node. +# +# Since the two_phase is "off", then normally, this PREPARE will do nothing +# until the COMMIT PREPARED, but in this test, we toggle the two_phase to "on" +# again before the COMMIT PREPARED happens. + +# Prepare a transaction to insert some tuples into the table +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(1, 5)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the prepared transaction is not yet replicated to the subscriber +# because two_phase is set to "off". +my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "transaction is not prepared on subscriber"); + +$log_offset = -s $node_subscriber->logfile; + +# Toggle the two_phase to "on" *before* the COMMIT PREPARED. Since we are the +# special path for the case where both two_phase and failover are altered, it +# is also set to "on". +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_sub DISABLE; + ALTER SUBSCRIPTION regress_sub SET (two_phase = on, failover = on); + ALTER SUBSCRIPTION regress_sub ENABLE;"); + +# Verify the started worker recognized two_phase was enabled +$node_subscriber->wait_for_log( + 'logical replication apply worker for subscription "regress_sub" two_phase is ENABLED', $log_offset); + +# And do COMMIT PREPARED the prepared transaction +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify inserted tuples are replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM tab_full;"); +is($result, q(5), + "prepared transactions done before altering can be replicated"); + +done_testing(); -- 2.43.0