From d8c07a0bd72ffa32c8e2a553254d79bdc3fac9db Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 8 Apr 2024 12:39:12 +0000 Subject: [PATCH v10 3/4] Abort prepared transactions while altering two_phase to off If we alter the two_phase parameter from "on" to "off" and there are prepared transactions on the subscriber, they won't be resolved. To avoid this issue, we allow the backend to abort all prepared transactions while altering the subscription. --- doc/src/sgml/ref/alter_subscription.sgml | 11 ++- src/backend/access/transam/twophase.c | 17 ++--- src/backend/commands/subscriptioncmds.c | 69 +++++++++++-------- src/include/access/twophase.h | 3 +- src/test/subscription/t/099_twophase_added.pl | 45 ++++++++++++ 5 files changed, 104 insertions(+), 41 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 0c2894a94e..2c6502275a 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -233,8 +233,6 @@ ALTER SUBSCRIPTION name RENAME TO < failover, and two_phase. Only a superuser can set password_required = false. - The two_phase parameter can only be altered when the - subscription is disabled. @@ -256,6 +254,15 @@ ALTER SUBSCRIPTION name RENAME TO < failover option is enabled. + + + The two_phase parameter can only be altered when the + subscription is disabled. When altering the parameter from true + to false, the backend process checks for any incomplete + prepared transactions done by the logical replication worker (from when + two_phase parameter was still true) + and, if any are found, those are aborted. + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 66fa591eb5..f384bd8c0a 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2720,13 +2720,13 @@ IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid) } /* - * LookupGXactBySubid - * Check if the prepared transaction done by apply worker exists. + * GetGidListBySubid + * Get a list of GIDs which is PREPARE'd by the given subscription. */ -bool -LookupGXactBySubid(Oid subid) +List * +GetGidListBySubid(Oid subid) { - bool found = false; + List *list = NIL; LWLockAcquire(TwoPhaseStateLock, LW_SHARED); for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) @@ -2736,11 +2736,8 @@ LookupGXactBySubid(Oid subid) /* Ignore not-yet-valid GIDs. */ if (gxact->valid && IsTwoPhaseTransactionGidForSubid(subid, gxact->gid)) - { - found = true; - break; - } + list = lappend(list, pstrdup(gxact->gid)); } LWLockRelease(TwoPhaseStateLock); - return found; + return list; } diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index d9abf687d1..c0e5d1af45 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1176,36 +1176,49 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, logicalrep_workers_stop(subid); /* - * two_phase cannot be disabled if there are any - * uncommitted prepared transactions present. - */ - if (!opts.twophase && - form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && - LookupGXactBySubid(subid)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot disable two_phase when uncommitted prepared transactions present"), - errhint("Resolve these transactions and try again"))); - - /* - * Altering the parameter from "true" to "false" within a - * transaction is prohibited. Since the apply worker does - * not alter the slot option to false, the backend must - * connect to the publisher and expressly change the - * parameter. - * - * There is no need to do something remarkable regarding - * the "false" to "true" case; the backend process alters - * subtwophase to LOGICALREP_TWOPHASE_STATE_PENDING once. - * After the subscription is enabled, a new logical - * replication worker requests to change the two_phase - * option of its slot when the initial data synchronization - * is done. The code path is the same as the case in which - * two_phase is initially set to true. + * If two_phase was enabled, there is a possibility that + * transactions have already been PREPARE'd. They must be + * checked and rolled back. */ if (!opts.twophase) - PreventInTransactionBlock(isTopLevel, - "ALTER SUBSCRIPTION ... SET (two_phase = false)"); + { + List *prepared_xacts; + + /* + * Altering the parameter from "true" to "false" within + * a transaction is prohibited. Since the apply worker + * does not alter the slot option to false, the backend + * must connect to the publisher and expressly change + * the parameter. + * + * There is no need to do something remarkable + * regarding the "false" to "true" case; the backend + * process alters subtwophase to + * LOGICALREP_TWOPHASE_STATE_PENDING once. After the + * subscription is enabled, a new logical replication + * worker requests to change the two_phase option of + * its slot when the initial data synchronization is + * done. The code path is the same as the case in which + * two_phase is initially set to true. + */ + if (!opts.twophase) + PreventInTransactionBlock(isTopLevel, + "ALTER SUBSCRIPTION ... SET (two_phase = false)"); + + /* + * To prevent prepared transactions from being + * isolated, they must manually be aborted. + */ + if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + (prepared_xacts = GetGidListBySubid(subid)) != NIL) + { + /* Abort all listed transactions */ + foreach_ptr(char, gid, prepared_xacts) + FinishPreparedTransaction(gid, false); + + list_free_deep(prepared_xacts); + } + } /* Change system catalog acoordingly */ values[Anum_pg_subscription_subtwophasestate - 1] = diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index d37e06fdee..f7a5cf0c12 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -18,6 +18,7 @@ #include "access/xlogdefs.h" #include "datatype/timestamp.h" #include "storage/lock.h" +#include "nodes/pg_list.h" /* * GlobalTransactionData is defined in twophase.c; other places have no @@ -65,6 +66,6 @@ extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); -extern bool LookupGXactBySubid(Oid subid); +extern List *GetGidListBySubid(Oid subid); #endif /* TWOPHASE_H */ diff --git a/src/test/subscription/t/099_twophase_added.pl b/src/test/subscription/t/099_twophase_added.pl index b7c97f8454..7abe1c37d5 100644 --- a/src/test/subscription/t/099_twophase_added.pl +++ b/src/test/subscription/t/099_twophase_added.pl @@ -93,4 +93,49 @@ $result = $node_subscriber->safe_psql('postgres', is($result, q(5), "prepared transactions done before altering can be replicated"); +##################### +# Check the case that prepared transactions exist on the subscriber node +# +# If the two_phase is altering from "on" to "off" and there are prepared +# transactions on the subscriber, they must be aborted. This test checks it. + +# Prepare a transaction to insert some tuples into the table +$node_publisher->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO tab_full VALUES (generate_series(6, 10)); + PREPARE TRANSACTION 'test_prepared_tab_full';"); + +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify the prepared transaction has been replicated to the subscriber because +# two_phase is set to "on". +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(1), "transaction has been prepared on subscriber"); + +# Toggle the two_phase to "off" before the COMMIT PREPARED +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION regress_sub DISABLE; + ALTER SUBSCRIPTION regress_sub SET (two_phase = off); + ALTER SUBSCRIPTION regress_sub ENABLE;"); + +# Verify any prepared transactions are aborted because two_phase is changed to +# "off". +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, q(0), "prepared transaction done by worker is aborted"); + +# Do COMMIT PREPARED the prepared transaction +$node_publisher->safe_psql( 'postgres', + "COMMIT PREPARED 'test_prepared_tab_full';"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Verify inserted tuples are replicated +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(10) FROM tab_full;"); +is($result, q(10), + "prepared transactions on publisher can be replicated"); + done_testing(); -- 2.43.0