From bb19db9bd4e73d8892f202fb7b8771c25a033681 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 4 Apr 2024 01:17:29 -0400 Subject: [PATCH v1] Allow altering of two_phase option in subscribers This patch allows user to alter two_phase option on a subscriber provided no uncommitted prepared transactions are pending on that subscription. --- src/backend/access/transam/twophase.c | 42 ++++++++++++++++++++++++++++++ src/backend/commands/subscriptioncmds.c | 35 ++++++++++++++++++++++--- src/backend/replication/logical/launcher.c | 22 +++++++++++++--- src/backend/replication/logical/worker.c | 3 --- src/include/access/twophase.h | 3 +++ src/include/replication/logicallauncher.h | 2 +- 6 files changed, 96 insertions(+), 11 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 8090ac9..b0aae25 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2682,3 +2682,45 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, LWLockRelease(TwoPhaseStateLock); return found; } + +/* + * checkGid + */ +static bool +checkGid(char *gid, Oid subid) +{ + int ret; + Oid subid_written, xid; + + ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid); + + if (ret != 2 || subid != subid_written) + return false; + + return true; +} + +/* + * LookupGXactBySubid + * Check if the prepared transaction done by apply worker exists. + */ +bool +LookupGXactBySubid(Oid subid) +{ + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if (gxact->valid && checkGid(gxact->gid, subid)) + { + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + return found; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5a47fa9..8306929 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -16,6 +16,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -868,7 +869,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, pgstat_create_subscription(subid); if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); ObjectAddressSet(myself, SubscriptionRelationId, subid); @@ -1165,7 +1166,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); @@ -1173,6 +1175,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, parse_subscription_options(pstate, stmt->options, supported_opts, &opts); + /* XXX */ + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) + { + /* Stop corresponding worker */ + logicalrep_worker_stop(subid, InvalidOid); + + /* Request to start worker at the end of transaction */ + ApplyLauncherWakeupAtEOXact(false); + + /* Check whether the number of prepared transactions */ + if (!opts.twophase && + form->subtwophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + LookupGXactBySubid(subid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot disable two_phase when uncommitted prepared transactions present"))); + + /* Change system catalog acoordingly */ + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { /* @@ -1299,7 +1326,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_subenabled - 1] = true; if (opts.enabled) - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); update_tuple = true; break; @@ -1962,7 +1989,7 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) form->oid, 0); /* Wake up related background processes to handle this change quickly. */ - ApplyLauncherWakeupAtCommit(); + ApplyLauncherWakeupAtEOXact(true); LogicalRepWorkersWakeupAtCommit(form->oid); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 66070e9..899ec22 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -89,6 +89,7 @@ static dsa_area *last_start_times_dsa = NULL; static dshash_table *last_start_times = NULL; static bool on_commit_launcher_wakeup = false; +static bool launcher_wakeup = false; static void ApplyLauncherWakeup(void); @@ -1085,13 +1086,22 @@ ApplyLauncherForgetWorkerStartTime(Oid subid) void AtEOXact_ApplyLauncher(bool isCommit) { + bool kicked = false; + if (isCommit) { if (on_commit_launcher_wakeup) + { ApplyLauncherWakeup(); + kicked = true; + } } + if (!kicked && launcher_wakeup) + ApplyLauncherWakeup(); + on_commit_launcher_wakeup = false; + launcher_wakeup = false; } /* @@ -1102,10 +1112,16 @@ AtEOXact_ApplyLauncher(bool isCommit) * tuple was added to the pg_subscription catalog. */ void -ApplyLauncherWakeupAtCommit(void) +ApplyLauncherWakeupAtEOXact(bool on_commit) { - if (!on_commit_launcher_wakeup) - on_commit_launcher_wakeup = true; + if (on_commit) + { + if (!on_commit_launcher_wakeup) + on_commit_launcher_wakeup = true; + } + else + if (!launcher_wakeup) + launcher_wakeup = true; } static void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe..ca3d260 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3911,9 +3911,6 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); - /* two-phase should not be altered */ - Assert(newsub->twophasestate == MySubscription->twophasestate); - /* * Exit if any parameter that affects the remote connection was changed. * The launcher will start a new worker but note that the parallel apply diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 56248c0..d493ed2 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -62,4 +62,7 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp); + +extern bool LookupGXactBySubid(Oid subid); + #endif /* TWOPHASE_H */ diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index ff0438b..075842c 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -24,7 +24,7 @@ extern void ApplyLauncherShmemInit(void); extern void ApplyLauncherForgetWorkerStartTime(Oid subid); -extern void ApplyLauncherWakeupAtCommit(void); +extern void ApplyLauncherWakeupAtEOXact(bool on_commit); extern void AtEOXact_ApplyLauncher(bool isCommit); extern bool IsLogicalLauncher(void); -- 1.8.3.1