From 3bb372f1f02554f36738abc75e650cde79649c69 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Fri, 5 Apr 2024 06:47:18 -0400 Subject: [PATCH v19 1/3] Allow altering of two_phase option of a SUBSCRIPTION This patch allows the user to alter the 'two_phase' option of a subscriber provided no uncommitted prepared transactions are pending on that subscription. Author: Cherian Ajin, Hayato Kuroda --- doc/src/sgml/protocol.sgml | 46 ++++-- doc/src/sgml/ref/alter_subscription.sgml | 12 +- src/backend/access/transam/twophase.c | 72 ++++++++++ src/backend/commands/subscriptioncmds.c | 136 +++++++++++++----- .../libpqwalreceiver/libpqwalreceiver.c | 9 +- src/backend/replication/logical/launcher.c | 10 +- src/backend/replication/logical/worker.c | 25 +--- src/backend/replication/slot.c | 44 ++++-- src/backend/replication/walsender.c | 32 +++-- src/bin/psql/tab-complete.c | 2 +- src/include/access/twophase.h | 5 + src/include/replication/slot.h | 3 +- src/include/replication/walreceiver.h | 11 +- src/include/replication/worker_internal.h | 3 +- src/test/regress/expected/subscription.out | 5 +- src/test/regress/sql/subscription.sql | 5 +- src/test/subscription/t/021_twophase.pl | 86 ++++++++++- 17 files changed, 376 insertions(+), 130 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 1b27d0a547..cba6661cf0 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2049,21 +2049,6 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" The following options are supported: - - TWO_PHASE [ boolean ] - - - If true, this logical replication slot supports decoding of two-phase - commit. With this option, commands related to two-phase commit such as - PREPARE TRANSACTION, COMMIT PREPARED - and ROLLBACK PREPARED are decoded and transmitted. - The transaction will be decoded and transmitted at - PREPARE TRANSACTION time. - The default is false. - - - - RESERVE_WAL [ boolean ] @@ -2104,6 +2089,21 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + TWO_PHASE [ boolean ] + + + If true, this logical replication slot supports decoding of two-phase + commit. With this option, commands related to two-phase commit such as + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + The default is false. + + + @@ -2206,6 +2206,22 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + TWO_PHASE [ boolean ] + + + If true, this logical replication slot supports decoding of two-phase + commit. With this option, commands related to two-phase commit such as + PREPARE TRANSACTION, COMMIT PREPARED + and ROLLBACK PREPARED are decoded and transmitted. + The transaction will be decoded and transmitted at + PREPARE TRANSACTION time. + + + + + diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 476f195622..0b23df1b77 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -68,8 +68,9 @@ ALTER SUBSCRIPTION name RENAME TO < Commands ALTER SUBSCRIPTION ... REFRESH PUBLICATION, ALTER SUBSCRIPTION ... {SET|ADD|DROP} PUBLICATION ... - with refresh option as true and - ALTER SUBSCRIPTION ... SET (failover = true|false) + with refresh option as true, + ALTER SUBSCRIPTION ... SET (failover = true|false) and + ALTER SUBSCRIPTION ... SET (two_phase = true|false) cannot be executed inside a transaction block. These commands also cannot be executed when the subscription has @@ -228,9 +229,12 @@ ALTER SUBSCRIPTION name RENAME TO < disable_on_error, password_required, run_as_owner, - origin, and - failover. + origin, + failover, and + two_phase. Only a superuser can set password_required = false. + The two_phase parameter can only be altered when the + subscription is disabled. diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 9a8257fcaf..f7100306f7 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -2681,3 +2681,75 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, LWLockRelease(TwoPhaseStateLock); return found; } + +/* + * TwoPhaseTransactionGid + * Form the prepared transaction GID for two_phase transactions. + * + * Return the GID in the supplied buffer. + */ +void +TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) +{ + Assert(subid != InvalidRepOriginId); + + if (!TransactionIdIsValid(xid)) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("invalid two-phase transaction ID"))); + + snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); +} + +/* + * IsTwoPhaseTransactionGidForSubid + * Check whether the given GID (as formed by TwoPhaseTransactionGid) is + * for the specified 'subid'. + */ +static bool +IsTwoPhaseTransactionGidForSubid(Oid subid, char *gid) +{ + int ret; + Oid subid_written; + TransactionId xid; + char gid_generated[GIDSIZE]; + + ret = sscanf(gid, "pg_gid_%u_%u", &subid_written, &xid); + + /* Return false if the given GID has different format */ + if (ret != 2 || subid != subid_written) + return false; + + /* Construct the format GID based on the got xid */ + TwoPhaseTransactionGid(subid, xid, gid_generated, sizeof(gid)); + + /* ...And check whether the given GID is exact same as the format GID */ + return strcmp(gid, gid_generated) == 0; +} + +/* + * LookupGXactBySubid + * Check if the prepared transaction done by apply worker exists. + */ +bool +LookupGXactBySubid(Oid subid) +{ + bool found = false; + + LWLockAcquire(TwoPhaseStateLock, LW_SHARED); + for (int i = 0; i < TwoPhaseState->numPrepXacts; i++) + { + GlobalTransaction gxact = TwoPhaseState->prepXacts[i]; + + /* Ignore not-yet-valid GIDs. */ + if (gxact->valid && + IsTwoPhaseTransactionGidForSubid(subid, gxact->gid)) + { + found = true; + break; + } + } + LWLockRelease(TwoPhaseStateLock); + + return found; +} diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 16d83b3253..a8e4faacbe 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -16,6 +16,7 @@ #include "access/htup_details.h" #include "access/table.h" +#include "access/twophase.h" #include "access/xact.h" #include "catalog/catalog.h" #include "catalog/dependency.h" @@ -109,6 +110,8 @@ static void check_publications_origin(WalReceiverConn *wrconn, static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); +static void CheckAlterSubOption(Subscription *sub, const char *option, + bool isTopLevel); /* @@ -259,21 +262,9 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetStreamingMode(defel); } - else if (strcmp(defel->defname, "two_phase") == 0) + else if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) && + strcmp(defel->defname, "two_phase") == 0) { - /* - * Do not allow toggling of two_phase option. Doing so could cause - * missing of transactions and lead to an inconsistent replica. - * See comments atop worker.c - * - * Note: Unsupported twophase indicates that this call originated - * from AlterSubscription. - */ - if (!IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); - if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) errorConflictingDefElem(defel, pstate); @@ -1079,6 +1070,47 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, table_close(rel, NoLock); } +/* + * Common checks for altering failover and two_phase option + */ +static void +CheckAlterSubOption(Subscription *sub, const char *option, bool isTopLevel) +{ + StringInfoData cmd; + + Assert(strcmp(option, "failover") == 0 || + strcmp(option, "two_phase") == 0); + + if (!sub->slotname) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for a subscription that does not have a slot name", + option))); + + /* + * Do not allow changing the option if the subscription is enabled. This + * is because both failover and two_phase options of the slot on the + * publisher cannot be modified if the slot is currently acquired by the + * apply worker. + */ + if (sub->enabled) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot set %s for enabled subscription", + option))); + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER SUBSCRIPTION ... SET (%s)", option); + + /* + * The changed option of the slot can't be rolled back, so disallow if we + * are in a transaction block. + */ + PreventInTransactionBlock(isTopLevel, cmd.data); + + pfree(cmd.data); +} + /* * Alter the existing subscription. */ @@ -1145,7 +1177,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | + SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN); @@ -1229,33 +1262,61 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) { - if (!sub->slotname) + CheckAlterSubOption(sub, "failover", isTopLevel); + + values[Anum_pg_subscription_subfailover - 1] = + BoolGetDatum(opts.failover); + replaces[Anum_pg_subscription_subfailover - 1] = true; + } + + if (IsSet(opts.specified_opts, SUBOPT_TWOPHASE_COMMIT)) + { + CheckAlterSubOption(sub, "two_phase", isTopLevel); + + /* + * slot_name and two_phase cannot be altered + * simultaneously. The latter part refers to the pre-set + * slot name and tries to modify the slot option, so + * changing both does not make sense. + */ + if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot set %s for a subscription that does not have a slot name", - "failover"))); + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("slot_name and two_phase cannot be altered at the same time"))); /* - * Do not allow changing the failover state if the - * subscription is enabled. This is because the failover - * state of the slot on the publisher cannot be modified - * if the slot is currently acquired by the apply worker. + * Workers may still survive even if the subscription has + * been disabled. They may read the pg_subscription + * catalog and detect that the two_phase parameter is + * updated, which causes an assertion failure that + * two_phase should not be updated while the worker + * exists. Ensure workers have already been exited to + * avoid it. */ - if (sub->enabled) + if (logicalrep_workers_find(subid, true, true)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("cannot set %s for enabled subscription", - "failover"))); + errmsg("cannot alter two_phase when logical replication worker is still running"), + errhint("Wait certain time and try again."))); /* - * The changed failover option of the slot can't be rolled - * back. + * two_phase cannot be disabled if there are any + * uncommitted prepared transactions present. */ - PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... SET (failover)"); - - values[Anum_pg_subscription_subfailover - 1] = - BoolGetDatum(opts.failover); - replaces[Anum_pg_subscription_subfailover - 1] = true; + if (!opts.twophase && + sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && + LookupGXactBySubid(subid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot disable two_phase when prepared transactions are present"), + errhint("Resolve these transactions and try again."))); + + /* Change system catalog accordingly */ + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(opts.twophase ? + LOGICALREP_TWOPHASE_STATE_PENDING : + LOGICALREP_TWOPHASE_STATE_DISABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; } if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) @@ -1507,7 +1568,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, * doing the database operations we won't be able to rollback altered * slot. */ - if (replaces[Anum_pg_subscription_subfailover - 1]) + if (replaces[Anum_pg_subscription_subfailover - 1] || + replaces[Anum_pg_subscription_subtwophasestate - 1]) { bool must_use_password; char *err; @@ -1528,7 +1590,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, PG_TRY(); { - walrcv_alter_slot(wrconn, sub->slotname, opts.failover); + walrcv_alter_slot(wrconn, sub->slotname, opts.failover, opts.twophase); } PG_FINALLY(); { @@ -1675,9 +1737,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * New workers won't be started because we hold an exclusive lock on the * subscription till the end of the transaction. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - subworkers = logicalrep_workers_find(subid, false); - LWLockRelease(LogicalRepWorkerLock); + subworkers = logicalrep_workers_find(subid, false, true); foreach(lc, subworkers) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 6c42c209d2..1cb601a148 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -80,7 +80,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover); + bool failover, bool two_phase); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -1121,15 +1121,16 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover) + bool failover, bool two_phase) { StringInfoData cmd; PGresult *res; initStringInfo(&cmd); - appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s, TWO_PHASE %s )", quote_identifier(slotname), - failover ? "true" : "false"); + failover ? "true" : "false", + two_phase ? "true" : "false"); res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 27c3a91fb7..c566d50a07 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -272,11 +272,14 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) * the subscription, instead of just one. */ List * -logicalrep_workers_find(Oid subid, bool only_running) +logicalrep_workers_find(Oid subid, bool only_running, bool acquire_lock) { int i; List *res = NIL; + if (acquire_lock) + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + Assert(LWLockHeldByMe(LogicalRepWorkerLock)); /* Search for attached worker for a given subscription id. */ @@ -288,6 +291,9 @@ logicalrep_workers_find(Oid subid, bool only_running) res = lappend(res, w); } + if (acquire_lock) + LWLockRelease(LogicalRepWorkerLock); + return res; } @@ -759,7 +765,7 @@ logicalrep_worker_detach(void) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true); + workers = logicalrep_workers_find(MyLogicalRepWorker->subid, true, false); foreach(lc, workers) { LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c0bda6269b..6c798cd5b4 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -401,9 +401,6 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, LogicalRepTupleData *newtup, CmdType operation); -/* Compute GID for two_phase transactions */ -static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); - /* Functions for skipping changes */ static void maybe_start_skipping_changes(XLogRecPtr finish_lsn); static void stop_skipping_changes(void); @@ -3911,7 +3908,7 @@ maybe_reread_subscription(void) /* !slotname should never happen when enabled is true. */ Assert(newsub->slotname); - /* two-phase should not be altered */ + /* two-phase cannot be altered while the worker exists */ Assert(newsub->twophasestate == MySubscription->twophasestate); /* @@ -4396,24 +4393,6 @@ cleanup_subxact_info() subxact_data.nsubxacts_max = 0; } -/* - * Form the prepared transaction GID for two_phase transactions. - * - * Return the GID in the supplied buffer. - */ -static void -TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) -{ - Assert(subid != InvalidRepOriginId); - - if (!TransactionIdIsValid(xid)) - ereport(ERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg_internal("invalid two-phase transaction ID"))); - - snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); -} - /* * Common function to run the apply loop with error handling. Disable the * subscription, if necessary. @@ -5014,7 +4993,7 @@ AtEOXact_LogicalRepWorkers(bool isCommit) List *workers; ListCell *lc2; - workers = logicalrep_workers_find(subid, true); + workers = logicalrep_workers_find(subid, true, false); foreach(lc2, workers) { LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index baf9b89dc4..90494cb858 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -804,9 +804,12 @@ ReplicationSlotDrop(const char *name, bool nowait) * Change the definition of the slot identified by the specified name. */ void -ReplicationSlotAlter(const char *name, bool failover) +ReplicationSlotAlter(const char *name, bool *failover, bool *two_phase) { + bool update_slot = false; + Assert(MyReplicationSlot == NULL); + Assert(failover || two_phase); ReplicationSlotAcquire(name, false); @@ -832,28 +835,45 @@ ReplicationSlotAlter(const char *name, bool failover) * Do not allow users to enable failover on the standby as we do not * support sync to the cascading standby. */ - if (failover) + if (failover && *failover) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot enable failover for a replication slot" " on the standby")); } - /* - * Do not allow users to enable failover for temporary slots as we do not - * support syncing temporary slots to the standby. - */ - if (failover && MyReplicationSlot->data.persistency == RS_TEMPORARY) - ereport(ERROR, - errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot enable failover for a temporary replication slot")); + if (failover) + { + /* + * Do not allow users to enable failover for temporary slots as we do + * not support syncing temporary slots to the standby. + */ + if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot enable failover for a temporary replication slot")); + + if (MyReplicationSlot->data.failover != *failover) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = *failover; + SpinLockRelease(&MyReplicationSlot->mutex); + + update_slot = true; + } + } - if (MyReplicationSlot->data.failover != failover) + if (two_phase && MyReplicationSlot->data.two_phase != *two_phase) { SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.failover = failover; + MyReplicationSlot->data.two_phase = *two_phase; SpinLockRelease(&MyReplicationSlot->mutex); + update_slot = true; + } + + if (update_slot) + { ReplicationSlotMarkDirty(); ReplicationSlotSave(); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2d1a9ec900..9b40b1cf7d 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1407,12 +1407,15 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) } /* - * Process extra options given to ALTER_REPLICATION_SLOT. + * Change the definition of a replication slot. */ static void -ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) { bool failover_given = false; + bool two_phase_given = false; + bool failover; + bool two_phase; /* Parse options */ foreach_ptr(DefElem, defel, cmd->options) @@ -1424,23 +1427,24 @@ ParseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options"))); failover_given = true; - *failover = defGetBoolean(defel); + failover = defGetBoolean(defel); + } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (two_phase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + two_phase_given = true; + two_phase = defGetBoolean(defel); } else elog(ERROR, "unrecognized option: %s", defel->defname); } -} - -/* - * Change the definition of a replication slot. - */ -static void -AlterReplicationSlot(AlterReplicationSlotCmd *cmd) -{ - bool failover = false; - ParseAlterReplSlotOptions(cmd, &failover); - ReplicationSlotAlter(cmd->slotname, failover); + ReplicationSlotAlter(cmd->slotname, + failover_given ? &failover : NULL, + two_phase_given ? &two_phase : NULL); } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d453e224d9..891face1b6 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1948,7 +1948,7 @@ psql_completion(const char *text, int start, int end) else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("binary", "disable_on_error", "failover", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit"); + "streaming", "synchronous_commit", "two_phase"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h index 56248c0006..d37e06fdee 100644 --- a/src/include/access/twophase.h +++ b/src/include/access/twophase.h @@ -62,4 +62,9 @@ extern void PrepareRedoRemove(TransactionId xid, bool giveWarning); extern void restoreTwoPhaseData(void); extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn, TimestampTz origin_prepare_timestamp); + +extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, + int szgid); +extern bool LookupGXactBySubid(Oid subid); + #endif /* TWOPHASE_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index c9675ee87c..cde164472a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -243,7 +243,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(void); -extern void ReplicationSlotAlter(const char *name, bool failover); +extern void ReplicationSlotAlter(const char *name, bool *failover, + bool *two_phase); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 12f71fa99b..31fa1257ec 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -372,12 +372,13 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, /* * walrcv_alter_slot_fn * - * Change the definition of a replication slot. Currently, it only supports - * changing the failover property of the slot. + * Change the definition of a replication slot. Currently, it supports + * changing the failover and the two_phase property of the slot. */ typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, const char *slotname, - bool failover); + bool failover, + bool two_phase); /* * walrcv_get_backend_pid_fn @@ -455,8 +456,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) #define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) -#define walrcv_alter_slot(conn, slotname, failover) \ - WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) +#define walrcv_alter_slot(conn, slotname, failover, two_phase) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..9646261d7e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -240,7 +240,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); -extern List *logicalrep_workers_find(Oid subid, bool only_running); +extern List *logicalrep_workers_find(Oid subid, bool only_running, + bool acquire_lock); extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype, Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 5c2f1ee517..17d48b1685 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -377,10 +377,7 @@ HINT: To initiate replication, you must manually create the replication slot, e regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); -ERROR: unrecognized subscription parameter: "two_phase" --- but can alter streaming when two_phase enabled +-- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ List of subscriptions diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 3e5ba4cb8c..007c9e7037 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -256,10 +256,7 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); \dRs+ ---fail - alter of two_phase option not supported. -ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); - --- but can alter streaming when two_phase enabled +-- we can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 9437cd4c3b..66265c729f 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -367,6 +367,90 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); is($result, qq(2), 'replicated data in subscriber table'); +# Clean up +$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); + +############################### +# Alter the subscription to two_phase = false. +# Verify that the altered subscription reflects the two_phase option. +############################### + +# Alter subscription two_phase to false +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" +); +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = false); + ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); + +# Wait for subscription startup +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); + +# Make sure that the two-phase is disabled on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" +); +is($result, qq(d), 'two-phase should be disabled'); + +############################### +# Now do a prepare on the publisher. +# Verify that it is not replicated. +############################### +$node_publisher->safe_psql( + 'postgres', qq{ + BEGIN; + INSERT INTO tab_copy VALUES (100); + PREPARE TRANSACTION 'newgid'; + }); + +# Wait for the subscriber to catchup +$node_publisher->wait_for_catchup($appname_copy); + +# Make sure there are no prepared transactions on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(0), 'should be no prepared transactions on subscriber'); + +############################### +# Now commit the insert. +# Verify that it is replicated. +############################### +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'newgid';"); + +# Wait for the subscriber to catchup +$node_publisher->wait_for_catchup($appname_copy); + +# Make sure that the committed transaction is replicated. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_copy;"); +is($result, qq(3), 'replicated data in subscriber table'); + +############################### +# Alter the subscription to two_phase = true. +# Verify that the altered subscription reflects the two_phase option. +############################### +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub_copy DISABLE;"); +$node_subscriber->poll_query_until('postgres', + "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication worker'" +); +$node_subscriber->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_copy SET (two_phase = true); + ALTER SUBSCRIPTION tap_sub_copy ENABLE;"); + +# Wait for subscription startup +$node_subscriber->wait_for_subscription_sync($node_publisher, $appname_copy); + +# Make sure that the two-phase is enabled on the subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT subtwophasestate FROM pg_subscription WHERE subname = 'tap_sub_copy';" +); +is($result, qq(e), 'two-phase should be enabled'); + $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_copy;"); $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); @@ -374,8 +458,6 @@ $node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_copy;"); # check all the cleanup ############################### -$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub"); - $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); is($result, qq(0), 'check subscription was dropped on subscriber'); -- 2.43.0