From 684b35a9a4c62f60a405ff32f1b14fb90b4f025e Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Fri, 5 Sep 2025 10:42:24 +0800 Subject: [PATCH v1] Allow logical replication in the same cluster Thare is a known issue discussed by [1] in 2017 where creating a logical replication in the same cluster will fail. This patch provides a solution by adding a new option "local=true" to "CREATE SUBSCRIPTION". [1] https://www.postgresql.org/message-id/20170426165954.GK14000%40momjian.us Author: Chao Li --- src/backend/commands/subscriptioncmds.c | 51 ++++++++++++++++++- src/backend/replication/logical/snapbuild.c | 54 ++++++++++++++++++++- src/include/replication/logical.h | 5 ++ 3 files changed, 106 insertions(+), 4 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 82cf65fae73..28fa1b068a3 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "nodes/makefuncs.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" #include "replication/origin.h" @@ -75,6 +76,7 @@ #define SUBOPT_MAX_RETENTION_DURATION 0x00008000 #define SUBOPT_LSN 0x00010000 #define SUBOPT_ORIGIN 0x00020000 +#define SUBOPT_LOCAL 0x00040000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -101,6 +103,7 @@ typedef struct SubOpts bool runasowner; bool failover; bool retaindeadtuples; + bool local; int32 maxretention; char *origin; XLogRecPtr lsn; @@ -118,7 +121,7 @@ static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); static void CheckAlterSubOption(Subscription *sub, const char *option, bool slot_needs_update, bool isTopLevel); - +static void mark_local_subscription_creation(bool set); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -170,6 +173,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->failover = false; if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES)) opts->retaindeadtuples = false; + if (IsSet(supported_opts, SUBOPT_LOCAL)) + opts->local = false; if (IsSet(supported_opts, SUBOPT_MAX_RETENTION_DURATION)) opts->maxretention = 0; if (IsSet(supported_opts, SUBOPT_ORIGIN)) @@ -385,6 +390,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_LSN; opts->lsn = lsn; } + else if (IsSet(supported_opts, SUBOPT_LOCAL) && + strcmp(defel->defname, "local") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_LOCAL)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_LOCAL; + opts->local = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -593,7 +607,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_RETAIN_DEAD_TUPLES | - SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN); + SUBOPT_MAX_RETENTION_DURATION | SUBOPT_ORIGIN | + SUBOPT_LOCAL); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -828,9 +843,15 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, if (opts.twophase && !opts.copy_data && tables != NIL) twophase_enabled = true; + if (opts.local) + mark_local_subscription_creation(true); + walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL); + if (opts.local) + mark_local_subscription_creation(false); + if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -2893,3 +2914,29 @@ defGetStreamingMode(DefElem *def) def->defname))); return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ } + +static void +mark_local_subscription_creation(bool set) +{ + bool found; + LogicalLocalCreateSubscriptioinXactInfo *plx; + plx = ShmemInitStruct("pg_command_subscription_local", + sizeof(LogicalLocalCreateSubscriptioinXactInfo), + &found); + + /* when unset, the memory must be found */ + Assert(!set && !found); + + if (set) + { + plx->backend_proc = MyProcNumber; + plx->xid = GetCurrentTransactionId(); + ereport(LOG, + (errmsg("marked local subscription creation xid %d", plx->xid))); + } + else + { + plx->backend_proc = INVALID_PROC_NUMBER; + plx->xid = InvalidTransactionId; + } +} diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 98ddee20929..e2afca2beb3 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -175,6 +175,9 @@ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path); +static bool is_xact_local_subscription_creation(TransactionId xid); +static bool is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid); + /* * Allocate a new snapshot builder. * @@ -1291,7 +1294,8 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn * NB: We might have already started to incrementally assemble a snapshot, * so we need to be careful to deal with that. */ - if (running->oldestRunningXid == running->nextXid) + if (running->oldestRunningXid == running->nextXid || + is_xact_local_subscription_creation_only_running(running->oldestRunningXid, running->nextXid)) { if (builder->start_decoding_at == InvalidXLogRecPtr || builder->start_decoding_at <= lsn) @@ -1299,7 +1303,7 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn builder->start_decoding_at = lsn + 1; /* As no transactions were running xmin/xmax can be trivially set. */ - builder->xmin = running->nextXid; /* < are finished */ + builder->xmin = running->oldestRunningXid; /* < are finished */ builder->xmax = running->nextXid; /* >= are running */ /* so we can safely use the faster comparisons */ @@ -1448,6 +1452,9 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) if (TransactionIdIsCurrentTransactionId(xid)) elog(ERROR, "waiting for ourselves"); + if (is_xact_local_subscription_creation(xid)) + continue; + if (TransactionIdFollows(xid, cutoff)) continue; @@ -2074,3 +2081,46 @@ SnapBuildSnapshotExists(XLogRecPtr lsn) return ret == 0; } + +/* + * Check if the transaction with id 'xid' is a local subscription creation + * transaction. + */ +static bool +is_xact_local_subscription_creation(TransactionId xid) +{ + bool found; + PgBackendStatus *backendState; + LogicalLocalCreateSubscriptioinXactInfo *plx; + + plx = ShmemInitStruct("pg_command_subscription_local", + sizeof(LogicalLocalCreateSubscriptioinXactInfo), + &found); + if (!found) { + return false; + } + + if (plx->xid != xid) { + return true; + } + + backendState = pgstat_get_beentry_by_proc_number(plx->backend_proc); + if (backendState != NULL && backendState->st_state == STATE_RUNNING) { + return true; + } + + return false; +} + +/* + * Check if the only running transaction is a local subscription creation + * transaction. + */ +static bool +is_xact_local_subscription_creation_only_running(TransactionId oldestRunningXid, TransactionId nextXid) +{ + if (oldestRunningXid + 1 != nextXid) { + return false; + } + return is_xact_local_subscription_creation(oldestRunningXid); +} diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 2e562bee5a9..980c236bc27 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -114,6 +114,11 @@ typedef struct LogicalDecodingContext bool processing_required; } LogicalDecodingContext; +typedef struct LogicalLocalCreateSubscriptioinXactInfo +{ + TransactionId xid; + ProcNumber backend_proc; +} LogicalLocalCreateSubscriptioinXactInfo; extern void CheckLogicalDecodingRequirements(void); -- 2.39.5 (Apple Git-154)