From b0cb3cdfd44721fadae82c8dfc0d7ad4cc5b316d Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 14 Dec 2020 12:07:14 +0530 Subject: [PATCH v32 7/9] Support 2PC txn - Subscription option. This patch implements new SUBSCRIPTION option "two_phase". Usage: CREATE SUBSCRIPTION ... WITH (two_phase = on) Default is off. Note: The tablesync worker slot always has two_phase disabled, regardless of the option. --- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 15 ++++ src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 44 ++++++++++-- .../libpqwalreceiver/libpqwalreceiver.c | 4 ++ src/backend/replication/logical/worker.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 36 +++++++++- src/bin/pg_dump/pg_dump.c | 16 ++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 10 +-- src/include/catalog/pg_subscription.h | 3 + src/include/replication/logicalproto.h | 4 ++ src/include/replication/walreceiver.h | 1 + src/test/regress/expected/subscription.out | 79 ++++++++++++++-------- src/test/regress/sql/subscription.sql | 15 ++++ src/test/subscription/t/020_twophase.pl | 3 +- src/test/subscription/t/021_twophase_stream.pl | 2 +- src/test/subscription/t/022_twophase_cascade.pl | 6 +- .../subscription/t/023_twophase_cascade_stream.pl | 4 +- 20 files changed, 202 insertions(+), 51 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index db5e59f..dbe2a43 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -166,8 +166,9 @@ ALTER SUBSCRIPTION name RENAME TO < information. The parameters that can be altered are slot_name, synchronous_commit, - binary, and - streaming. + binary, + streaming, and + two_phase. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index e812bee..1332a83 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -239,6 +239,21 @@ CREATE SUBSCRIPTION subscription_name + + two_phase (boolean) + + + Specifies whether two-phase commit is enabled for this subscription. + The default is false. + + + + When two-phase commit is enabled then the decoded transactions are sent + to the subscriber on the PREPARE TRANSACTION. By default, the transaction + preapred on publisher is decoded as normal transaction at commit. + + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index ca78d39..886839e 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -67,6 +67,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->enabled = subform->subenabled; sub->binary = subform->subbinary; sub->stream = subform->substream; + sub->twophase = subform->subtwophase; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index b140c21..5f4e191 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1149,7 +1149,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are readable. REVOKE ALL ON pg_subscription FROM public; -GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subslotname, subpublications) +GRANT SELECT (subdbid, subname, subowner, subenabled, subbinary, substream, subtwophase, subslotname, subpublications) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 1696454..b0745d5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -64,7 +64,8 @@ parse_subscription_options(List *options, char **synchronous_commit, bool *refresh, bool *binary_given, bool *binary, - bool *streaming_given, bool *streaming) + bool *streaming_given, bool *streaming, + bool *twophase_given, bool *twophase) { ListCell *lc; bool connect_given = false; @@ -105,6 +106,11 @@ parse_subscription_options(List *options, *streaming_given = false; *streaming = false; } + if (twophase) + { + *twophase_given = false; + *twophase = false; + } /* Parse options */ foreach(lc, options) @@ -210,6 +216,15 @@ parse_subscription_options(List *options, *streaming_given = true; *streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0 && twophase) + { + if (*twophase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + *twophase_given = true; + *twophase = defGetBoolean(defel); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -355,6 +370,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) bool copy_data; bool streaming; bool streaming_given; + bool twophase; + bool twophase_given; char *synchronous_commit; char *conninfo; char *slotname; @@ -379,7 +396,8 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &twophase_given, &twophase); /* * Since creating a replication slot is not transactional, rolling back @@ -447,6 +465,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(binary); values[Anum_pg_subscription_substream - 1] = BoolGetDatum(streaming); + values[Anum_pg_subscription_subtwophase - 1] = BoolGetDatum(twophase); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (slotname) @@ -720,6 +739,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) bool binary; bool streaming_given; bool streaming; + bool twophase_given; + bool twophase; parse_subscription_options(stmt->options, NULL, /* no "connect" */ @@ -730,7 +751,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) &synchronous_commit, NULL, /* no "refresh" */ &binary_given, &binary, - &streaming_given, &streaming); + &streaming_given, &streaming, + &twophase_given, &twophase); if (slotname_given) { @@ -769,6 +791,13 @@ AlterSubscription(AlterSubscriptionStmt *stmt) replaces[Anum_pg_subscription_substream - 1] = true; } + if (twophase_given) + { + values[Anum_pg_subscription_subtwophase - 1] = + BoolGetDatum(twophase); + replaces[Anum_pg_subscription_subtwophase - 1] = true; + } + update_tuple = true; break; } @@ -787,7 +816,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no streaming */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ Assert(enabled_given); if (!sub->slotname && enabled) @@ -832,7 +862,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "synchronous_commit" */ &refresh, NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); replaces[Anum_pg_subscription_subpublications - 1] = true; @@ -875,7 +906,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt) NULL, /* no "synchronous_commit" */ NULL, /* no "refresh" */ NULL, NULL, /* no "binary" */ - NULL, NULL); /* no "streaming" */ + NULL, NULL, /* no "streaming" */ + NULL, NULL); /* no "two_phase" */ AlterSubscription_refresh(sub, copy_data); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 24f8b3e..1f404cd 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -429,6 +429,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, PQserverVersion(conn->streamConn) >= 140000) appendStringInfoString(&cmd, ", streaming 'on'"); + if (options->proto.logical.twophase && + PQserverVersion(conn->streamConn) >= 140000) + appendStringInfoString(&cmd, ", two_phase 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 2ddb832..2362fba 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2794,6 +2794,7 @@ maybe_reread_subscription(void) strcmp(newsub->slotname, MySubscription->slotname) != 0 || newsub->binary != MySubscription->binary || newsub->stream != MySubscription->stream || + (!am_tablesync_worker() && newsub->twophase != MySubscription->twophase) || !equal(newsub->publications, MySubscription->publications)) { ereport(LOG, @@ -3440,6 +3441,7 @@ ApplyWorkerMain(Datum main_arg) options.proto.logical.publication_names = MySubscription->publications; options.proto.logical.binary = MySubscription->binary; options.proto.logical.streaming = MySubscription->stream; + options.proto.logical.twophase = MySubscription->twophase && !am_tablesync_worker(); /* Start normal logical streaming replication. */ walrcv_startstreaming(wrconn, &options); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index a6bafaa..dd819c9 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -178,13 +178,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) static void parse_output_parameters(List *options, uint32 *protocol_version, List **publication_names, bool *binary, - bool *enable_streaming) + bool *enable_streaming, + bool *enable_twophase) { ListCell *lc; bool protocol_version_given = false; bool publication_names_given = false; bool binary_option_given = false; bool streaming_given = false; + bool twophase_given = false; *binary = false; @@ -252,6 +254,16 @@ parse_output_parameters(List *options, uint32 *protocol_version, *enable_streaming = defGetBoolean(defel); } + else if (strcmp(defel->defname, "two_phase") == 0) + { + if (twophase_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + twophase_given = true; + + *enable_twophase = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -265,6 +277,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool is_init) { bool enable_streaming = false; + bool enable_twophase = false; PGOutputData *data = palloc0(sizeof(PGOutputData)); /* Create our memory context for private allocations. */ @@ -289,7 +302,8 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, &data->protocol_version, &data->publication_names, &data->binary, - &enable_streaming); + &enable_streaming, + &enable_twophase); /* Check if we support requested protocol */ if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM) @@ -330,6 +344,24 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, /* Also remember we're currently not streaming any transaction. */ in_streaming = false; + /* + * Decide whether to enable two-phase commit. It is disabled by default, in + * which case we just update the flag in decoding context. Otherwise + * we only allow it with sufficient version of the protocol, and when + * the output plugin supports it. + */ + if (!enable_twophase) + ctx->twophase = false; + else if (data->protocol_version < LOGICALREP_PROTO_2PC_VERSION_NUM) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("requested proto_version=%d does not support two-phase commit, need %d or higher", + data->protocol_version, LOGICALREP_PROTO_2PC_VERSION_NUM))); + else if (!ctx->twophase) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("two-phase commit requested, but not supported by output plugin"))); + /* Init publication state. */ data->publications = NIL; publications_valid = false; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 673a670..cb707bf 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4221,6 +4221,7 @@ getSubscriptions(Archive *fout) int i_subname; int i_rolname; int i_substream; + int i_subtwophase; int i_subconninfo; int i_subslotname; int i_subsynccommit; @@ -4264,9 +4265,14 @@ getSubscriptions(Archive *fout) appendPQExpBufferStr(query, " false AS subbinary,\n"); if (fout->remoteVersion >= 140000) - appendPQExpBufferStr(query, " s.substream\n"); + appendPQExpBufferStr(query, " s.substream,\n"); else - appendPQExpBufferStr(query, " false AS substream\n"); + appendPQExpBufferStr(query, " false AS substream,\n"); + + if (fout->remoteVersion >= 140000) + appendPQExpBufferStr(query, " s.subtwophase\n"); + else + appendPQExpBufferStr(query, " false AS subtwophase\n"); appendPQExpBufferStr(query, "FROM pg_subscription s\n" @@ -4287,6 +4293,7 @@ getSubscriptions(Archive *fout) i_subpublications = PQfnumber(res, "subpublications"); i_subbinary = PQfnumber(res, "subbinary"); i_substream = PQfnumber(res, "substream"); + i_subtwophase = PQfnumber(res, "subtwophase"); subinfo = pg_malloc(ntups * sizeof(SubscriptionInfo)); @@ -4312,6 +4319,8 @@ getSubscriptions(Archive *fout) pg_strdup(PQgetvalue(res, i, i_subbinary)); subinfo[i].substream = pg_strdup(PQgetvalue(res, i, i_substream)); + subinfo[i].subtwophase = + pg_strdup(PQgetvalue(res, i, i_subtwophase)); if (strlen(subinfo[i].rolname) == 0) pg_log_warning("owner of subscription \"%s\" appears to be invalid", @@ -4380,6 +4389,9 @@ dumpSubscription(Archive *fout, SubscriptionInfo *subinfo) if (strcmp(subinfo->substream, "f") != 0) appendPQExpBufferStr(query, ", streaming = on"); + if (strcmp(subinfo->subtwophase, "f") != 0) + appendPQExpBufferStr(query, ", two_phase = on"); + if (strcmp(subinfo->subsynccommit, "off") != 0) appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit)); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 317bb83..22e4e6c 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -629,6 +629,7 @@ typedef struct _SubscriptionInfo char *subslotname; char *subbinary; char *substream; + char *subtwophase; char *subsynccommit; char *subpublications; } SubscriptionInfo; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 14150d0..47306a2 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -5997,7 +5997,7 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false}; + false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6023,13 +6023,15 @@ describeSubscriptions(const char *pattern, bool verbose) if (verbose) { - /* Binary mode and streaming are only supported in v14 and higher */ + /* Binary mode and streaming and Two phase commit are only supported in v14 and higher */ if (pset.sversion >= 140000) appendPQExpBuffer(&buf, ", subbinary AS \"%s\"\n" - ", substream AS \"%s\"\n", + ", substream AS \"%s\"\n" + ", subtwophase AS \"%s\"\n", gettext_noop("Binary"), - gettext_noop("Streaming")); + gettext_noop("Streaming"), + gettext_noop("Two phase commit")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 3fa02af..e07eed0 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -53,6 +53,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool substream; /* Stream in-progress transactions. */ + bool subtwophase; /* Decode 2PC PREPARE? */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -90,6 +92,7 @@ typedef struct Subscription bool binary; /* Indicates if the subscription wants data in * binary format */ bool stream; /* Allow streaming in-progress transactions. */ + bool twophase; /* Decode 2PC PREPARE? */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 427a8ee..252f43c 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -28,10 +28,14 @@ * * LOGICALREP_PROTO_STREAM_VERSION_NUM is the minimum protocol version with * support for streaming large transactions. + * + * LOGICALREP_PROTO_2PC_VERSION_NUM is the minimum protocol version with + * support for two-phase commit PREPARE decoding. */ #define LOGICALREP_PROTO_MIN_VERSION_NUM 1 #define LOGICALREP_PROTO_VERSION_NUM 1 #define LOGICALREP_PROTO_STREAM_VERSION_NUM 2 +#define LOGICALREP_PROTO_2PC_VERSION_NUM 2 #define LOGICALREP_PROTO_MAX_VERSION_NUM LOGICALREP_PROTO_STREAM_VERSION_NUM /* diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 1b05b39..f96c891 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -179,6 +179,7 @@ typedef struct List *publication_names; /* String list of publications */ bool binary; /* Ask publisher to use binary */ bool streaming; /* Streaming of large transactions */ + bool twophase; /* Enable 2PC decoding of PREPARE */ } logical; } proto; } WalRcvStreamOptions; diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index 2fa9bce..23d876e 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -76,10 +76,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -91,10 +91,10 @@ ERROR: subscription "regress_doesnotexist" does not exist ALTER SUBSCRIPTION regress_testsub SET (create_slot = false); ERROR: unrecognized subscription parameter: "create_slot" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | off | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | f | f | off | dbname=regress_doesnotexist2 (1 row) BEGIN; @@ -126,10 +126,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ----------------------+---------------------------+---------+---------------------+--------+-----------+--------------------+------------------------------ - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | local | dbname=regress_doesnotexist2 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+--------------------+------------------------------ + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | f | f | local | dbname=regress_doesnotexist2 (1 row) -- rename back to keep the rest simple @@ -162,19 +162,19 @@ ERROR: binary requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, binary = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | t | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | t | f | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; @@ -185,19 +185,42 @@ ERROR: streaming requires a Boolean value CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, streaming = true); WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | t | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | t | f | off | dbname=regress_doesnotexist (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Synchronous commit | Conninfo ------------------+---------------------------+---------+-------------+--------+-----------+--------------------+----------------------------- - regress_testsub | regress_subscription_user | f | {testpub} | f | f | off | dbname=regress_doesnotexist + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | f | off | dbname=regress_doesnotexist +(1 row) + +DROP SUBSCRIPTION regress_testsub; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); +ERROR: two_phase requires a Boolean value +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); +WARNING: tables were not subscribed, you will have to run ALTER SUBSCRIPTION ... REFRESH PUBLICATION to subscribe the tables +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | t | off | dbname=regress_doesnotexist +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two phase commit | Synchronous commit | Conninfo +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+--------------------+----------------------------- + regress_testsub | regress_subscription_user | f | {testpub} | f | f | f | off | dbname=regress_doesnotexist (1 row) DROP SUBSCRIPTION regress_testsub; diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 14fa0b2..2a0b366 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -147,6 +147,21 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- fail - two_phase must be boolean +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = foo); + +-- now it works +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, two_phase = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (two_phase = false); +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); + +\dRs+ + +DROP SUBSCRIPTION regress_testsub; + RESET SESSION AUTHORIZATION; DROP ROLE regress_subscription_user; DROP ROLE regress_subscription_user2; diff --git a/src/test/subscription/t/020_twophase.pl b/src/test/subscription/t/020_twophase.pl index 9c1d681..a680c1a 100644 --- a/src/test/subscription/t/020_twophase.pl +++ b/src/test/subscription/t/020_twophase.pl @@ -47,7 +47,8 @@ my $appname = 'tap_sub'; $node_subscriber->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' - PUBLICATION tap_pub"); + PUBLICATION tap_pub + WITH (two_phase = on)"); # Wait for subscriber to finish initialization my $caughtup_query = diff --git a/src/test/subscription/t/021_twophase_stream.pl b/src/test/subscription/t/021_twophase_stream.pl index 9ec1e31..a2d4824 100644 --- a/src/test/subscription/t/021_twophase_stream.pl +++ b/src/test/subscription/t/021_twophase_stream.pl @@ -41,7 +41,7 @@ $node_subscriber->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub - WITH (streaming = on)"); + WITH (streaming = on, two_phase = on)"); # Wait for subscriber to finish initialization my $caughtup_query = diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 0f95530..9fb461b 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -54,7 +54,8 @@ my $appname_B = 'tap_sub_B'; $node_B->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub_B CONNECTION '$node_A_connstr application_name=$appname_B' - PUBLICATION tap_pub_A"); + PUBLICATION tap_pub_A + WITH (two_phase = on)"); # node_B (pub) -> node_C (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; @@ -66,7 +67,8 @@ my $appname_C = 'tap_sub_C'; $node_C->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub_C CONNECTION '$node_B_connstr application_name=$appname_C' - PUBLICATION tap_pub_B"); + PUBLICATION tap_pub_B + WITH (two_phase = on)"); # Wait for subscribers to finish initialization my $caughtup_query_B = diff --git a/src/test/subscription/t/023_twophase_cascade_stream.pl b/src/test/subscription/t/023_twophase_cascade_stream.pl index 3c6470d..ffba03f 100644 --- a/src/test/subscription/t/023_twophase_cascade_stream.pl +++ b/src/test/subscription/t/023_twophase_cascade_stream.pl @@ -56,7 +56,7 @@ $node_B->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub_B CONNECTION '$node_A_connstr application_name=$appname_B' PUBLICATION tap_pub_A - WITH (streaming = on)"); + WITH (streaming = on, two_phase = on)"); # node_B (pub) -> node_C (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; @@ -69,7 +69,7 @@ $node_C->safe_psql('postgres', " CREATE SUBSCRIPTION tap_sub_C CONNECTION '$node_B_connstr application_name=$appname_C' PUBLICATION tap_pub_B - WITH (streaming = on)"); + WITH (streaming = on, two_phase = on)"); # Wait for subscribers to finish initialization my $caughtup_query_B = -- 1.8.3.1