From 3ea787ae1d7abea14ac3a98a4df27b3a3bdc14e2 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Fri, 8 Apr 2022 11:10:05 +0530 Subject: [PATCH v14 1/2] Skip replication of non local data. This patch adds a new SUBSCRIPTION boolean option "only_local". The default is false. When a SUBSCRIPTION is created with this option enabled, the publisher will only publish data that originated at the publisher node. Usage: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999' PUBLICATION pub1 with (only_local = true); --- contrib/test_decoding/expected/replorigin.out | 55 ++++++ contrib/test_decoding/sql/replorigin.sql | 15 ++ doc/src/sgml/catalogs.sgml | 11 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 12 ++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 26 ++- .../libpqwalreceiver/libpqwalreceiver.c | 5 + src/backend/replication/logical/worker.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 20 ++- src/bin/pg_dump/pg_dump.c | 17 +- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 3 + src/include/replication/logicalproto.h | 7 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 142 ++++++++------- src/test/regress/sql/subscription.sql | 10 ++ src/test/subscription/t/032_onlylocal.pl | 162 ++++++++++++++++++ 22 files changed, 440 insertions(+), 72 deletions(-) create mode 100644 src/test/subscription/t/032_onlylocal.pl diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out index 2e9ef7c823..94ef390120 100644 --- a/contrib/test_decoding/expected/replorigin.out +++ b/contrib/test_decoding/expected/replorigin.out @@ -257,3 +257,58 @@ SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn (1 row) +-- Verify that remote origin data is not returned with only-local option +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_only_local', 'test_decoding'); + ?column? +---------- + init +(1 row) + +SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_only_local'); + pg_replication_origin_create +------------------------------ + 1 +(1 row) + +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_only_local'); + pg_replication_origin_session_setup +------------------------------------- + +(1 row) + +INSERT INTO origin_tbl(data) VALUES ('only_local, commit1'); +-- remote origin data returned when only-local option is not set +SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '0'); + data +--------------------------------------------------------------------------------- + BEGIN + table public.origin_tbl: INSERT: id[integer]:8 data[text]:'only_local, commit1' + COMMIT +(3 rows) + +INSERT INTO origin_tbl(data) VALUES ('only_local, commit2'); +-- remote origin data not returned when only-local option is set +SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '1'); + data +------ +(0 rows) + +-- Clean up +SELECT pg_replication_origin_session_reset(); + pg_replication_origin_session_reset +------------------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('regression_slot_only_local'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_only_local'); + pg_replication_origin_drop +---------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql index 2e28a48777..5d1045e105 100644 --- a/contrib/test_decoding/sql/replorigin.sql +++ b/contrib/test_decoding/sql/replorigin.sql @@ -119,3 +119,18 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot_no_lsn', NULL, NUL SELECT pg_replication_origin_session_reset(); SELECT pg_drop_replication_slot('regression_slot_no_lsn'); SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_no_lsn'); + +-- Verify that remote origin data is not returned with only-local option +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot_only_local', 'test_decoding'); +SELECT pg_replication_origin_create('regress_test_decoding: regression_slot_only_local'); +SELECT pg_replication_origin_session_setup('regress_test_decoding: regression_slot_only_local'); +INSERT INTO origin_tbl(data) VALUES ('only_local, commit1'); +-- remote origin data returned when only-local option is not set +SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '0'); +INSERT INTO origin_tbl(data) VALUES ('only_local, commit2'); +-- remote origin data not returned when only-local option is set +SELECT data FROM pg_logical_slot_get_changes('regression_slot_only_local', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0', 'only-local', '1'); +-- Clean up +SELECT pg_replication_origin_session_reset(); +SELECT pg_drop_replication_slot('regression_slot_only_local'); +SELECT pg_replication_origin_drop('regress_test_decoding: regression_slot_only_local'); \ No newline at end of file diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index d96c72e531..fcb6ff0331 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7861,6 +7861,17 @@ SCRAM-SHA-256$<iteration count>:&l + + + subonlylocal bool + + + If true, subscription will request the publisher to send locally + originated changes at the publisher node, or send any publisher node + changes regardless of their origin + + + substream bool diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 353ea5def2..45beca9b86 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -207,8 +207,9 @@ ALTER SUBSCRIPTION name RENAME TO < information. The parameters that can be altered are slot_name, synchronous_commit, - binary, streaming, and - disable_on_error. + binary, streaming, + disable_on_error, and + only_local. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 203bb41844..00580cc7ba 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -216,6 +216,18 @@ CREATE SUBSCRIPTION subscription_name + + only_local (boolean) + + + Specifies whether the subscription will request the publisher to send + locally originated changes at the publisher node, or send any + publisher node changes regardless of their origin. The default is + false. + + + + streaming (boolean) diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index add51caadf..f0c83aaf59 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->only_local = subform->subonlylocal; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..88bde866ed 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + subbinary, substream, subtwophasestate, subdisableonerr, + subonlylocal, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 690cdaa426..479d6ca372 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,6 +64,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_ONLY_LOCAL 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -86,6 +87,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + bool only_local; XLogRecPtr lsn; } SubOpts; @@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_ONLY_LOCAL)) + opts->only_local = false; /* Parse options */ foreach(lc, stmt_options) @@ -235,6 +239,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ONLY_LOCAL) && + strcmp(defel->defname, "only_local") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ONLY_LOCAL)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ONLY_LOCAL; + opts->only_local = defGetBoolean(defel); + } else if (strcmp(defel->defname, "two_phase") == 0) { /* @@ -531,7 +544,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ONLY_LOCAL); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -602,6 +615,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_subonlylocal - 1] = BoolGetDatum(opts.only_local); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -1015,7 +1029,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_ONLY_LOCAL); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1072,6 +1087,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_ONLY_LOCAL)) + { + values[Anum_pg_subscription_subonlylocal - 1] = + BoolGetDatum(opts.only_local); + replaces[Anum_pg_subscription_subonlylocal - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..56a07f0dce 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -453,6 +453,11 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + /* FIXME: 150000 should be changed to 160000 later for PG16. */ + if (options->proto.logical.only_local && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfoString(&cmd, ", only_local 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc210a9e7b..d41ba854b8 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3060,6 +3060,7 @@ maybe_reread_subscription(void) newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || newsub->owner != MySubscription->owner || + newsub->only_local != MySubscription->only_local || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, @@ -3740,6 +3741,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + options.proto.logical.only_local = MySubscription->only_local; if (!am_tablesync_worker()) { diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 42c06af239..82b2b8245e 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -287,11 +287,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool only_local_option_given = false; data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; + data->only_local = false; foreach(lc, options) { @@ -380,6 +382,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "only_local") == 0) + { + if (only_local_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + only_local_option_given = true; + + data->only_local = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1698,12 +1710,18 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Currently we always forward. + * Return true if data has originated remotely when only_local option is + * enabled, false otherwise. */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 7cc9c72e49..05ed85533b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4411,6 +4411,7 @@ getSubscriptions(Archive *fout) int i_subsynccommit; int i_subpublications; int i_subbinary; + int i_subonlylocal; int i, ntups; @@ -4455,13 +4456,19 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n", LOGICALREP_TWOPHASE_STATE_DISABLED); + /* FIXME: 150000 should be changed to 160000 later for PG16. */ + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, " s.subonlylocal\n"); + else + appendPQExpBufferStr(query, " false AS subonlylocal\n"); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4487,6 +4494,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_subonlylocal = PQfnumber(res, "subonlylocal"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4516,6 +4524,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subtwophasestate)); subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + subinfo[i].subonlylocal = + pg_strdup(PQgetvalue(res, i, i_subonlylocal)); /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); @@ -4589,6 +4599,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); + if (strcmp(subinfo->subonlylocal, "t") == 0) + appendPQExpBufferStr(query, ", only_local = true"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1d21c2906f..ddb855fd16 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -661,6 +661,7 @@ typedef struct _SubscriptionInfo char *subdisableonerr; char *subsynccommit; char *subpublications; + char *subonlylocal; } SubscriptionInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 1a5d924a23..0013e480d6 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6354,7 +6354,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6396,6 +6396,12 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Two phase commit"), gettext_noop("Disable on error")); + /* FIXME: 150000 should be changed to 160000 later for PG16 */ + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ", subonlylocal AS \"%s\"\n", + gettext_noop("Only local")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 55af9eb04e..989d4f3bcb 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1875,7 +1875,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + COMPLETE_WITH("binary", "only_local", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3157,7 +3157,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "enabled", "slot_name", "streaming", + "enabled", "only_local", "slot_name", "streaming", "synchronous_commit", "two_phase", "disable_on_error"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index d1260f590c..d47d4f3a5f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -70,6 +70,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool subonlylocal; /* Skip replication of remote origin data */ + char subtwophasestate; /* Stream two-phase transactions */ bool subdisableonerr; /* True if a worker error should cause the @@ -110,6 +112,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool only_local; /* Skip replication of remote origin data */ char twophasestate; /* Allow streaming two-phase transactions */ bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index a771ab8ff3..7bb6fee9c9 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -32,6 +32,13 @@ * * LOGICALREP_PROTO_TWOPHASE_VERSION_NUM is the minimum protocol version with * support for two-phase commit decoding (at prepare time). Introduced in PG15. + * + * LOGICALREP_PROTO_LOCALONLY_VERSION_NUM is the minimum protocol version with + * support for sending only locally originated data from the publisher. + * Introduced in PG16. + * + * FIXME: LOGICALREP_PROTO_LOCALONLY_VERSION_NUM needs to be bumped to 4 in + * PG16. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a..0461f4e634 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + bool only_local; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 81184aa92f..796c04db4e 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,7 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + bool only_local; /* publish only locally originated data */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 7fcfad1591..a9351b426b 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +-- fail - only_local must be boolean +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = foo); +ERROR: only_local requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | t | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub4 SET (only_local = false); +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 +(1 row) + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -96,10 +118,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -179,19 +201,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -202,19 +224,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -247,10 +269,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -284,10 +306,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -296,10 +318,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -323,18 +345,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Only local | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 74c38ead5d..28eb91fc47 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU ALTER SUBSCRIPTION regress_testsub3 ENABLE; ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; +-- fail - only_local must be boolean +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, only_local = true); +\dRs+ regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (only_local = false); +\dRs+ regress_testsub4 + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/032_onlylocal.pl b/src/test/subscription/t/032_onlylocal.pl new file mode 100644 index 0000000000..5ff5a0d9dc --- /dev/null +++ b/src/test/subscription/t/032_onlylocal.pl @@ -0,0 +1,162 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test logical replication using only_local option. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################################################################### +# Setup a bidirectional logical replication between Node_A & Node_B +############################################################################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_B->start; + +# Create tables on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Create the same tables on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B1 = 'tap_sub_B1'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B1 + CONNECTION '$node_A_connstr application_name=$appname_B1' + PUBLICATION tap_pub_A + WITH (only_local = on)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_A = 'tap_sub_A'; +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (only_local = on, copy_data = off)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is(1, 1, "Circular replication setup is complete"); + +my $result; + +############################################################################### +# check that bidirectional logical replication setup does not cause infinite +# recursive insertion. +############################################################################### + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); + +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# check that transaction was committed on subscriber(s) +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); + +############################################################################### +# check that remote data that is originated from node_C to node_B is not +# published to node_A +############################################################################### +# Initialize node node_C +my $node_C = PostgreSQL::Test::Cluster->new('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); +$node_C->start; + +$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_C (pub) -> node_B (sub) +my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; +$node_C->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_C FOR TABLE tab_full"); + +my $appname_B2 = 'tap_sub_B2'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B2 + CONNECTION '$node_C_connstr application_name=$appname_B2' + PUBLICATION tap_pub_C + WITH (only_local = on)"); + +$node_C->wait_for_catchup($appname_B2); + +$node_C->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert a record +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);"); + +$node_C->wait_for_catchup($appname_B2); +$node_B->wait_for_catchup($appname_A); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12 +13), 'Node_C data replicated to Node_B' +); + +# check that the data published from node_C to node_B is not sent to node_A +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Remote data originated from other node is not replicated when only_local option is ON' +); + +# shutdown +$node_B->stop('fast'); +$node_A->stop('fast'); +$node_C->stop('fast'); + +done_testing(); -- 2.32.0