From e521c9ddd8816e45c4eb19b1f3977c0dd607e203 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Thu, 26 May 2022 19:33:13 +0530 Subject: [PATCH v17 2/2] Skip replication of non local data. This patch adds a new SUBSCRIPTION parameter "origin". It Specifies whether the subscription will request the publisher to only send changes that originated locally, or to send any changes regardless of origin. Setting it to "local" means that the subscription will request the publisher to only send changes that originated locally. Setting it to "any" means that that the publisher sends any changes regardless of their origin. The default is "any". Usage: CREATE SUBSCRIPTION sub1 CONNECTION 'dbname=postgres port=9999' PUBLICATION pub1 with (origin = local); --- doc/src/sgml/catalogs.sgml | 14 ++ doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 16 ++ src/backend/catalog/pg_subscription.c | 10 ++ src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 36 ++++- .../libpqwalreceiver/libpqwalreceiver.c | 6 + src/backend/replication/logical/worker.c | 10 ++ src/backend/replication/pgoutput/pgoutput.c | 22 ++- src/bin/pg_dump/pg_dump.c | 20 ++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 4 +- src/include/catalog/pg_subscription.h | 6 + src/include/replication/pgoutput.h | 1 + src/include/replication/walreceiver.h | 2 + src/test/regress/expected/subscription.out | 142 ++++++++++------- src/test/regress/sql/subscription.sql | 10 ++ src/test/subscription/t/032_origin.pl | 147 ++++++++++++++++++ 19 files changed, 392 insertions(+), 72 deletions(-) create mode 100644 src/test/subscription/t/032_origin.pl diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index c00c93dd7b..96d917878a 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7903,6 +7903,20 @@ SCRAM-SHA-256$<iteration count>:&l + + + suborigin text + + + Possible origin values are local, + any, or NULL if none is specified. + If local, the subscription will request the + publisher to only send changes that originated locally. If + any, the publisher sends any changes regardless of + their origin. + + + subconninfo text diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 353ea5def2..64efc21f53 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -207,8 +207,9 @@ ALTER SUBSCRIPTION name RENAME TO < information. The parameters that can be altered are slot_name, synchronous_commit, - binary, streaming, and - disable_on_error. + binary, streaming, + disable_on_error, and + origin. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 203bb41844..406fc273d6 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -302,6 +302,22 @@ CREATE SUBSCRIPTION subscription_name + + + origin (string) + + + Specifies whether the subscription will request the publisher to only + send changes that originated locally, or to send any changes + regardless of origin. Setting origin to + local means that the subscription will request the + publisher to only send changes that originated locally. Setting + origin to any means that that + the publisher sends any changes regardless of their origin. The + default is any. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index add51caadf..59376f40de 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,16 @@ GetSubscription(Oid subid, bool missing_ok) sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + datum = SysCacheGetAttr(SUBSCRIPTIONOID, + tup, + Anum_pg_subscription_suborigin, + &isnull); + + if (!isnull) + sub->origin = TextDatumGetCString(datum); + else + sub->origin = NULL; + /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..46305e2931 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + subbinary, substream, subtwophasestate, subdisableonerr, + suborigin, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 690cdaa426..6449669f29 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,6 +64,7 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_LSN 0x00000800 +#define SUBOPT_ORIGIN 0x00001000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -86,6 +87,7 @@ typedef struct SubOpts bool streaming; bool twophase; bool disableonerr; + char *origin; XLogRecPtr lsn; } SubOpts; @@ -137,6 +139,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) opts->disableonerr = false; + if (IsSet(supported_opts, SUBOPT_ORIGIN)) + opts->origin = NULL; /* Parse options */ foreach(lc, stmt_options) @@ -265,6 +269,21 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_DISABLE_ON_ERR; opts->disableonerr = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_ORIGIN) && + strcmp(defel->defname, "origin") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_ORIGIN)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_ORIGIN; + opts->origin = defGetString(defel); + + if ((strcmp(opts->origin, "local") != 0) && + (strcmp(opts->origin, "any") != 0)) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unrecognized origin value: \"%s\"", opts->origin))); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -531,7 +550,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | - SUBOPT_DISABLE_ON_ERR); + SUBOPT_DISABLE_ON_ERR | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -607,6 +626,11 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + if (opts.origin) + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); + else + nulls[Anum_pg_subscription_suborigin - 1] = true; values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1015,7 +1039,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, { supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | - SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR); + SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | + SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1072,6 +1097,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, = true; } + if (IsSet(opts.specified_opts, SUBOPT_ORIGIN)) + { + values[Anum_pg_subscription_suborigin - 1] = + CStringGetTextDatum(opts.origin); + replaces[Anum_pg_subscription_suborigin - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 0d89db4e6a..467a11e3a9 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -453,6 +453,12 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 150000) appendStringInfoString(&cmd, ", two_phase 'on'"); + /* FIXME: 150000 should be changed to 160000 later for PG16. */ + if (options->proto.logical.origin && + PQserverVersion(conn->streamConn) >= 150000) + appendStringInfo(&cmd, ", origin '%s'", + options->proto.logical.origin); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc210a9e7b..552f507a56 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -276,6 +276,10 @@ static TransactionId stream_xid = InvalidTransactionId; static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; #define is_skipping_changes() (unlikely(!XLogRecPtrIsInvalid(skip_xact_finish_lsn))) +/* Macro for comparing string fields that might be NULL */ +#define equalstr(a, b) \ + (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b)) + /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; @@ -3059,6 +3063,7 @@ maybe_reread_subscription(void) strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || + equalstr(newsub->origin, MySubscription->origin) || newsub->owner != MySubscription->owner || !equal(newsub->publications, MySubscription->publications)) { @@ -3741,6 +3746,11 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.streaming = MySubscription->stream; options.proto.logical.twophase = false; + if (MySubscription->origin) + options.proto.logical.origin = pstrdup(MySubscription->origin); + else + options.proto.logical.origin = NULL; + if (!am_tablesync_worker()) { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 42c06af239..9b47614472 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -287,11 +287,13 @@ parse_output_parameters(List *options, PGOutputData *data) bool messages_option_given = false; bool streaming_given = false; bool two_phase_option_given = false; + bool origin_option_given = false; data->binary = false; data->streaming = false; data->messages = false; data->two_phase = false; + data->origin = NULL; foreach(lc, options) { @@ -380,6 +382,16 @@ parse_output_parameters(List *options, PGOutputData *data) data->two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "origin") == 0) + { + if (origin_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + origin_option_given = true; + + data->origin = defGetString(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -1698,12 +1710,20 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* - * Currently we always forward. + * Return true if the data source (origin) is remote and user has requested + * only local data, false otherwise. */ static bool pgoutput_origin_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) { + PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + + if (data->origin && + (strcmp(data->origin, "local") == 0) && + origin_id != InvalidRepOriginId) + return true; + return false; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 7cc9c72e49..62a9d9b26a 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4406,6 +4406,7 @@ getSubscriptions(Archive *fout) int i_substream; int i_subtwophasestate; int i_subdisableonerr; + int i_suborigin; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4455,13 +4456,19 @@ getSubscriptions(Archive *fout) if (fout->remoteVersion >= 150000) appendPQExpBufferStr(query, " s.subtwophasestate,\n" - " s.subdisableonerr\n"); + " s.subdisableonerr,\n"); else appendPQExpBuffer(query, " '%c' AS subtwophasestate,\n" - " false AS subdisableonerr\n", + " false AS subdisableonerr,\n", LOGICALREP_TWOPHASE_STATE_DISABLED); + /* FIXME: 150000 should be changed to 160000 later for PG16. */ + if (fout->remoteVersion >= 150000) + appendPQExpBufferStr(query, " s.suborigin\n"); + else + appendPQExpBufferStr(query, " NULL AS suborigin\n"); + appendPQExpBufferStr(query, "FROM pg_subscription s\n" "WHERE s.subdbid = (SELECT oid FROM pg_database\n" @@ -4487,6 +4494,7 @@ getSubscriptions(Archive *fout) i_substream = PQfnumber(res, "substream"); i_subtwophasestate = PQfnumber(res, "subtwophasestate"); i_subdisableonerr = PQfnumber(res, "subdisableonerr"); + i_suborigin = PQfnumber(res, "suborigin"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4517,6 +4525,11 @@ getSubscriptions(Archive *fout) subinfo[i].subdisableonerr = pg_strdup(PQgetvalue(res, i, i_subdisableonerr)); + if (PQgetisnull(res, i, i_suborigin)) + subinfo[i].suborigin = NULL; + else + subinfo[i].suborigin = pg_strdup(PQgetvalue(res, i, i_suborigin)); + /* Decide whether we want to dump it */ selectDumpableObject(&(subinfo[i].dobj), fout); } @@ -4589,6 +4602,9 @@ dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo) if (strcmp(subinfo->subdisableonerr, "t") == 0) appendPQExpBufferStr(query, ", disable_on_error = true"); + if (subinfo->suborigin) + appendPQExpBuffer(query, ", origin = %s", subinfo->suborigin); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 1d21c2906f..69ee939d44 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -659,6 +659,7 @@ typedef struct _SubscriptionInfo char *substream; char *subtwophasestate; char *subdisableonerr; + char *suborigin; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 1a5d924a23..a9487fd43a 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6354,7 +6354,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6396,6 +6396,12 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Two phase commit"), gettext_noop("Disable on error")); + /* FIXME: 150000 should be changed to 160000 later for PG16 */ + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ", suborigin AS \"%s\"\n", + gettext_noop("Origin")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index e1cc753489..43d25f45ba 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1875,7 +1875,7 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("(", "PUBLICATION"); /* ALTER SUBSCRIPTION SET ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SET", "(")) - COMPLETE_WITH("binary", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); + COMPLETE_WITH("binary", "origin", "slot_name", "streaming", "synchronous_commit", "disable_on_error"); /* ALTER SUBSCRIPTION SKIP ( */ else if (HeadMatches("ALTER", "SUBSCRIPTION", MatchAny) && TailMatches("SKIP", "(")) COMPLETE_WITH("lsn"); @@ -3157,7 +3157,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "enabled", "slot_name", "streaming", + "enabled", "origin", "slot_name", "streaming", "synchronous_commit", "two_phase", "disable_on_error"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index d1260f590c..c2fcb45028 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -87,6 +87,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW /* List of publications subscribed to */ text subpublications[1] BKI_FORCE_NOT_NULL; + + /* Publish the data originated from the specified origin */ + text suborigin; #endif } FormData_pg_subscription; @@ -118,6 +121,9 @@ typedef struct Subscription char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ List *publications; /* List of publication names to subscribe to */ + char *origin; /* Publish the data originated from the + * specified origin */ + } Subscription; extern Subscription *GetSubscription(Oid subid, bool missing_ok); diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index eafedd610a..02027550e2 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -29,6 +29,7 @@ typedef struct PGOutputData bool streaming; bool messages; bool two_phase; + char *origin; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 81184aa92f..1168bd42f2 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -183,6 +183,8 @@ typedef struct bool streaming; /* Streaming of large transactions */ bool twophase; /* Streaming of two-phase transactions at * prepare time */ + char *origin; /* Publish the data originated from the + * specified origin */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 7fcfad1591..c5d2acb514 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -70,16 +70,38 @@ ALTER SUBSCRIPTION regress_testsub3 ENABLE; ERROR: cannot enable subscription that does not have a slot name ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; ERROR: ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions +-- fail - origin must be either local or any +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); +ERROR: unrecognized origin value: "foo" +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = local); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | local | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); +\dRs+ regress_testsub4 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | f | d | f | any | off | dbname=regress_doesnotexist | 0/0 +(1 row) + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -96,10 +118,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -108,10 +130,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -143,10 +165,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | d | f | | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -179,19 +201,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -202,19 +224,19 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -229,10 +251,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more then once @@ -247,10 +269,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -284,10 +306,10 @@ ERROR: two_phase requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | p | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -296,10 +318,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -308,10 +330,10 @@ DROP SUBSCRIPTION regress_testsub; CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true, two_phase = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | p | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -323,18 +345,18 @@ ERROR: disable_on_error requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, disable_on_error = false); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | f | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Disable on error | Origin | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | d | t | | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 74c38ead5d..fff63ce538 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -54,7 +54,17 @@ CREATE SUBSCRIPTION regress_testsub3 CONNECTION 'dbname=regress_doesnotexist' PU ALTER SUBSCRIPTION regress_testsub3 ENABLE; ALTER SUBSCRIPTION regress_testsub3 REFRESH PUBLICATION; +-- fail - origin must be either local or any +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (slot_name = NONE, connect = false, origin = local); +\dRs+ regress_testsub4 +ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); +\dRs+ regress_testsub4 + DROP SUBSCRIPTION regress_testsub3; +DROP SUBSCRIPTION regress_testsub4; -- fail - invalid connection string ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; diff --git a/src/test/subscription/t/032_origin.pl b/src/test/subscription/t/032_origin.pl new file mode 100644 index 0000000000..9ab44d4294 --- /dev/null +++ b/src/test/subscription/t/032_origin.pl @@ -0,0 +1,147 @@ + +# Copyright (c) 2021-2022, PostgreSQL Global Development Group + +# Test logical replication using origin option. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes +# node_A +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->start; +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->start; + +# Create tables on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Create the same tables on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_A FOR TABLE tab_full"); +my $appname_B1 = 'tap_sub_B1'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B1 + CONNECTION '$node_A_connstr application_name=$appname_B1' + PUBLICATION tap_pub_A + WITH (origin = local)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_B FOR TABLE tab_full"); +my $appname_A = 'tap_sub_A'; +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A + CONNECTION '$node_B_connstr application_name=$appname_A' + PUBLICATION tap_pub_B + WITH (origin = local, copy_data = off)"); + +# Wait for subscribers to finish initialization +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# Also wait for initial table sync to finish +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +is(1, 1, 'Bidirectional replication setup is complete'); + +my $result; + +############################################################################### +# Check that bidirectional logical replication setup does not cause infinite +# recursive insertion. +############################################################################### + +# insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab_full VALUES (11);"); +$node_B->safe_psql('postgres', "INSERT INTO tab_full VALUES (12);"); + +$node_A->wait_for_catchup($appname_B1); +$node_B->wait_for_catchup($appname_A); + +# check that transaction was committed on subscriber(s) +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Inserted successfully without leading to infinite recursion in bidirectional replication setup' +); + +############################################################################### +# Check that remote data of node_B (that originated from node_C) is not +# published to node_A. +############################################################################### +# Initialize node node_C +my $node_C = PostgreSQL::Test::Cluster->new('node_C'); +$node_C->init(allows_streaming => 'logical'); +$node_C->start; + +$node_C->safe_psql('postgres', "CREATE TABLE tab_full (a int PRIMARY KEY)"); + +# Setup logical replication +# node_C (pub) -> node_B (sub) +my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; +$node_C->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_C FOR TABLE tab_full"); + +my $appname_B2 = 'tap_sub_B2'; +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_B2 + CONNECTION '$node_C_connstr application_name=$appname_B2' + PUBLICATION tap_pub_C + WITH (origin = local)"); + +$node_C->wait_for_catchup($appname_B2); + +$node_C->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert a record +$node_C->safe_psql('postgres', "INSERT INTO tab_full VALUES (13);"); + +$node_C->wait_for_catchup($appname_B2); +$node_B->wait_for_catchup($appname_A); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12 +13), 'The node_C data replicated to node_B' +); + +# check that the data published from node_C to node_B is not sent to node_A +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab_full ORDER BY 1;"); +is($result, qq(11 +12), 'Remote data originating from another node (not the publisher) is not replicated when origin option is local' +); + +# shutdown +$node_B->stop('fast'); +$node_A->stop('fast'); +$node_C->stop('fast'); + +done_testing(); -- 2.32.0