From 10d729cd12801859c90a6351e78b7fb8ce7ef19c Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 21 Nov 2023 14:50:21 +0530 Subject: [PATCH v41 1/3] Allow logical walsenders to wait for the physical standbys A new property 'failover' is added at the slot level. This is persistent information to indicate that this logical slot is enabled to be synced to the physical standbys so that logical replication can be resumed after failover. It is always false for physical slots. Users can set this flag during CREATE SUBSCRIPTION or during pg_create_logical_replication_slot API. Ex1: CREATE SUBSCRIPTION mysub CONNECTION '..' PUBLICATION mypub WITH (failover = true); Ex2: (failover is the last arg) SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput', false, true, true); A new replication command called ALTER_REPLICATION_SLOT and a corresponding walreceiver API function named walrcv_alter_slot have been implemented. They allow subscribers or users to modify the failover property of a replication slot on the publisher. Altering the failover option of the subscription is currently not permitted. However, this restriction may be lifted in future versions. The value of the 'failover' flag is displayed as part of pg_replication_slots view. A new GUC standby_slot_names has been added. It is the list of physical replication slots that logical replication with failover enabled waits for. The intent of this wait is that no logical replication subscriptions (with failover=true) should get ahead of physical replication standbys (corresponding to the physical slots in standby_slot_names). --- contrib/test_decoding/expected/slot.out | 58 +++ contrib/test_decoding/sql/slot.sql | 13 + doc/src/sgml/catalogs.sgml | 12 + doc/src/sgml/config.sgml | 16 + doc/src/sgml/func.sgml | 11 +- doc/src/sgml/protocol.sgml | 51 +++ doc/src/sgml/ref/alter_subscription.sgml | 19 +- doc/src/sgml/ref/create_subscription.sgml | 23 ++ doc/src/sgml/system-views.sgml | 11 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 6 +- src/backend/commands/subscriptioncmds.c | 100 +++++- .../libpqwalreceiver/libpqwalreceiver.c | 38 +- .../replication/logical/logicalfuncs.c | 13 + src/backend/replication/logical/tablesync.c | 53 ++- src/backend/replication/logical/worker.c | 71 +++- src/backend/replication/repl_gram.y | 18 +- src/backend/replication/repl_scanner.l | 2 + src/backend/replication/slot.c | 182 +++++++++- src/backend/replication/slotfuncs.c | 19 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 335 ++++++++++++++++-- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/bin/pg_upgrade/info.c | 5 +- src/bin/pg_upgrade/pg_upgrade.c | 6 +- src/bin/pg_upgrade/pg_upgrade.h | 2 + src/bin/pg_upgrade/t/003_logical_slots.pl | 6 +- src/bin/psql/describe.c | 8 +- src/bin/psql/tab-complete.c | 2 +- src/include/catalog/pg_proc.dat | 14 +- src/include/catalog/pg_subscription.h | 11 + src/include/nodes/replnodes.h | 12 + src/include/replication/slot.h | 12 +- src/include/replication/walreceiver.h | 18 +- src/include/replication/walsender.h | 4 + src/include/replication/walsender_private.h | 7 + src/include/replication/worker_internal.h | 3 +- src/include/utils/guc_hooks.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/006_logical_decoding.pl | 3 +- src/test/recovery/t/050_verify_slot_order.pl | 149 ++++++++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 165 +++++---- src/test/regress/sql/subscription.sql | 8 + src/tools/pgindent/typedefs.list | 2 + 48 files changed, 1352 insertions(+), 166 deletions(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73..261d8886d3 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +-----------------------+-----------+---------- + failover_true_slot | logical | t + failover_false_slot | logical | f + failover_default_slot | logical | f + physical_slot | physical | f +(4 rows) + +SELECT pg_drop_replication_slot('failover_true_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_false_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_default_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('physical_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c5667..45aeae7fd5 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_true_slot'); +SELECT pg_drop_replication_slot('failover_false_slot'); +SELECT pg_drop_replication_slot('failover_default_slot'); +SELECT pg_drop_replication_slot('physical_slot'); diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 3ec7391ec5..e666730c64 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7990,6 +7990,18 @@ SCRAM-SHA-256$<iteration count>:&l + + + subfailoverstate char + + + State codes for failover mode: + d = disabled, + p = pending enablement, + e = enabled + + + subconninfo text diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 94d1eb2b81..30d9b53e03 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4360,6 +4360,22 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication slots with + failover enabled waits for. If a logical replication connection is + meant to switch to a physical standby after the standby is promoted, + the physical replication slot for the standby should be listed here. + + + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 20da3ed033..90f1f19018 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27541,7 +27541,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27556,8 +27556,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + physical standbys so that logical replication can be resumed + after failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index af3f016f74..bb926ab149 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2060,6 +2060,16 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + FAILOVER { 'true' | 'false' } + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + @@ -2124,6 +2134,47 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + ALTER_REPLICATION_SLOT slot_name ( option [, ...] ) + ALTER_REPLICATION_SLOT + + + + Change the definition of a replication slot. + See for more about + replication slots. This command is currently only supported for logical + replication slots. + + + + + slot_name + + + The name of the slot to alter. Must be a valid replication slot + name (see ). + + + + + + The following options are supported: + + + + FAILOVER { 'true' | 'false' } + + + If true, the slot is enabled to be synced to the physical + standbys so that logical replication can be resumed after failover. + + + + + + + + READ_REPLICATION_SLOT slot_name READ_REPLICATION_SLOT diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 6d36ff0dc9..239c43ba2f 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -73,11 +73,14 @@ ALTER SUBSCRIPTION name RENAME TO < These commands also cannot be executed when the subscription has two_phase - commit enabled, unless + commit enabled or + failover + enabled, unless copy_data is false. See column subtwophasestate - of pg_subscription - to know the actual two-phase state. + and subfailoverstate of + pg_subscription + to know the actual state. @@ -230,6 +233,16 @@ ALTER SUBSCRIPTION name RENAME TO < origin. Only a superuser can set password_required = false. + + + When altering the + slot_name, + the failover property of the new slot may differ from the + failover + parameter specified in the subscription, you need to adjust the + failover property when creating the slot so that it + matches the value specified in subscription. + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index f1c20b3a46..fa6cd1c43f 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,6 +399,29 @@ CREATE SUBSCRIPTION subscription_name + + + failover (boolean) + + + Specifies whether the replication slot associated with the subscription + is enabled to be synced to the physical standbys so that logical + replication can be resumed from the new primary after failover. + The default is false. + + + + The implementation of failover requires that replication + has successfully finished the initial table synchronization + phase. So even when failover is enabled for a + subscription, the internal failover state remains + temporarily pending until the initialization phase + completes. See column subfailoverstate + of pg_subscription + to know the actual failover state. + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 0ef1745631..1dc695fd3a 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + failover bool + + + True if this logical slot is enabled to be synced to the physical + standbys so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f136..18512955ad 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; + sub->failoverstate = subform->subfailoverstate; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 4206752881..4db796aa0b 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 11d18ed9dd..63038f87f7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); @@ -1354,7 +1355,8 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, subpasswordrequired, subrunasowner, - subslotname, subsynccommit, subpublications, suborigin) + subslotname, subsynccommit, subpublications, suborigin, + subfailoverstate) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index edc82c11be..3f35f42621 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -96,6 +97,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; char *origin; + bool failover; XLogRecPtr lsn; } SubOpts; @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; /* Parse options */ foreach(lc, stmt_options) @@ -326,6 +330,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", opts->origin)); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -591,7 +604,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,10 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subfailoverstate - 1] = + CharGetDatum(opts.failover ? + LOGICALREP_FAILOVER_STATE_PENDING : + LOGICALREP_FAILOVER_STATE_DISABLED); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -746,6 +764,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { + bool failover_enabled = false; + check_publications(wrconn, publications); check_publications_origin(wrconn, publications, opts.copy_data, opts.origin, NULL, 0, stmt->subname); @@ -776,6 +796,19 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, InvalidXLogRecPtr); } + /* + * Even if failover is set, don't create the slot with failover + * enabled. Will enable it once all the tables are synced and + * ready. The intention is that if failover happens at the time of + * table-sync, user should re-launch the subscription instead of + * relying on main slot (if synced) with no table-sync data + * present. When the subscription has no tables, leave failover as + * false to allow ALTER SUBSCRIPTION ... REFRESH PUBLICATION to + * work. + */ + if (opts.failover && !opts.copy_data && tables != NIL) + failover_enabled = true; + /* * If requested, create permanent slot for the subscription. We * won't use the initial snapshot for anything, so no need to @@ -807,15 +840,34 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); - - if (twophase_enabled) - UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); + failover_enabled, CRS_NOEXPORT_SNAPSHOT, NULL); + /* Update twophase and/or failover state */ + EnableTwoPhaseFailoverTriState(subid, twophase_enabled, + failover_enabled); ereport(NOTICE, (errmsg("created replication slot \"%s\" on publisher", opts.slot_name))); } + + /* + * If the slot_name is specified without the create_slot option, it + * is possible that the user intends to use an existing slot on the + * publisher, so here we alter the failover property of the slot to + * match the failover value in subscription. + * + * We do not need to change the failover to false if the server + * does not support failover (e.g. pre-PG17) + */ + else if (opts.slot_name && + (failover_enabled || walrcv_server_version(wrconn) >= 170000)) + { + walrcv_alter_slot(wrconn, opts.slot_name, failover_enabled); + ereport(NOTICE, + (errmsg("changed the failover state of replication slot \"%s\" on publisher to %s", + opts.slot_name, + failover_enabled ? "true" : "false"))); + } } PG_FINALLY(); { @@ -1279,13 +1331,21 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION ... WITH (refresh = false)."))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH for details why copy_data + * is not allowed when twophase or failover is enabled. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when two_phase is enabled"), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "two_phase"), + errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + /* translator: %s is a subscription option */ + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when %s is enabled", "failover"), errhint("Use ALTER SUBSCRIPTION ... SET PUBLICATION with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); @@ -1334,8 +1394,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... DROP PUBLICATION ... WITH (refresh = false)"))); /* - * See ALTER_SUBSCRIPTION_REFRESH for details why this is - * not allowed. + * See ALTER_SUBSCRIPTION_REFRESH for details why copy_data + * is not allowed when twophase or failover is enabled. */ if (sub->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED && opts.copy_data) ereport(ERROR, @@ -1347,6 +1407,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, "ALTER SUBSCRIPTION ... ADD PUBLICATION" : "ALTER SUBSCRIPTION ... DROP PUBLICATION"))); + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("ALTER SUBSCRIPTION with refresh and copy_data is not allowed when failover is enabled"), + /* translator: %s is an SQL ALTER command */ + errhint("Use %s with refresh = false, or with copy_data = false, or use DROP/CREATE SUBSCRIPTION.", + isadd ? + "ALTER SUBSCRIPTION ... ADD PUBLICATION" : + "ALTER SUBSCRIPTION ... DROP PUBLICATION"))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION with refresh"); /* Refresh the new list of publications. */ @@ -1392,6 +1462,16 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when two_phase is enabled"), errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + /* + * See comments above for twophasestate, same holds true for + * 'failover' + */ + if (sub->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED && opts.copy_data) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ALTER SUBSCRIPTION ... REFRESH with copy_data is not allowed when failover is enabled"), + errhint("Use ALTER SUBSCRIPTION ... REFRESH with copy_data = false, or use DROP/CREATE SUBSCRIPTION."))); + PreventInTransactionBlock(isTopLevel, "ALTER SUBSCRIPTION ... REFRESH"); AlterSubscription_refresh(sub, opts.copy_data, NULL); diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..7fa0bb8a3a 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -74,8 +74,11 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn, const char *query, @@ -96,6 +99,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_receive = libpqrcv_receive, .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, + .walrcv_alter_slot = libpqrcv_alter_slot, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -883,8 +887,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -913,7 +917,8 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, else appendStringInfoChar(&cmd, ' '); } - + if (failover) + appendStringInfoString(&cmd, "FAILOVER, "); if (use_new_options_syntax) { switch (snapshot_action) @@ -982,6 +987,33 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, return snapshot; } +/* + * Change the definition of the replication slot. + */ +static void +libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, + bool failover) +{ + StringInfoData cmd; + PGresult *res; + + initStringInfo(&cmd); + appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )", + quote_identifier(slotname), + failover ? "true" : "false"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("could not alter replication slot \"%s\" on publisher: %s", + slotname, pchomp(PQerrorMessage(conn->streamConn))))); + + PQclear(res); +} + /* * Return PID of remote backend process. */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 1067aca08f..a36366e117 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -30,6 +30,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -109,6 +110,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wait_for_wal_lsn; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +230,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + if (XLogRecPtrIsInvalid(upto_lsn)) + wait_for_wal_lsn = end_of_wal; + else + wait_for_wal_lsn = Min(upto_lsn, end_of_wal); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to wait_for_wal_lsn. + */ + WalSndWaitForStandbyConfirmation(wait_for_wal_lsn); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index df3c42eb5d..d2d8bf1a7a 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -614,15 +614,28 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. + * + * Same goes for 'failover'. Enable it only if subscription has tables + * and all the tablesyncs have reached READY state. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING || + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) { CommandCounterIncrement(); /* make updates visible */ if (AllTablesyncsReady()) { - ereport(LOG, - (errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled", - MySubscription->name))); + if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING) + ereport(LOG, + /* translator: %s is a subscription option */ + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, "two_phase"))); + + if (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING) + ereport(LOG, + /* translator: %s is a subscription option */ + (errmsg("logical replication apply worker for subscription \"%s\" will restart so that %s can be enabled", + MySubscription->name, "failover"))); + should_exit = true; } } @@ -1420,7 +1433,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + false /* failover */ , CRS_USE_SNAPSHOT, + origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the @@ -1722,10 +1736,12 @@ AllTablesyncsReady(void) } /* - * Update the two_phase state of the specified subscription in pg_subscription. + * Update the twophase and/or failover state of the specified subscription + * in pg_subscription. */ void -UpdateTwoPhaseState(Oid suboid, char new_state) +EnableTwoPhaseFailoverTriState(Oid suboid, bool enable_twophase, + bool enable_failover) { Relation rel; HeapTuple tup; @@ -1733,9 +1749,8 @@ UpdateTwoPhaseState(Oid suboid, char new_state) bool replaces[Natts_pg_subscription]; Datum values[Natts_pg_subscription]; - Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED || - new_state == LOGICALREP_TWOPHASE_STATE_PENDING || - new_state == LOGICALREP_TWOPHASE_STATE_ENABLED); + if (!enable_twophase && !enable_failover) + return; rel = table_open(SubscriptionRelationId, RowExclusiveLock); tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(suboid)); @@ -1749,9 +1764,21 @@ UpdateTwoPhaseState(Oid suboid, char new_state) memset(nulls, false, sizeof(nulls)); memset(replaces, false, sizeof(replaces)); - /* And update/set two_phase state */ - values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(new_state); - replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + /* Update/set two_phase state if asked by the caller */ + if (enable_twophase) + { + values[Anum_pg_subscription_subtwophasestate - 1] = + CharGetDatum(LOGICALREP_TWOPHASE_STATE_ENABLED); + replaces[Anum_pg_subscription_subtwophasestate - 1] = true; + } + + /* Update/set failover state if asked by the caller */ + if (enable_failover) + { + values[Anum_pg_subscription_subfailoverstate - 1] = + CharGetDatum(LOGICALREP_FAILOVER_STATE_ENABLED); + replaces[Anum_pg_subscription_subfailoverstate - 1] = true; + } tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, replaces); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 21abf34ef7..cf5d9caa8b 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -132,6 +132,37 @@ * avoid such deadlocks, we generate a unique GID (consisting of the * subscription oid and the xid of the prepared transaction) for each prepare * transaction on the subscriber. + * + * FAILOVER + * ---------------------- + * The logical slot on the primary can be synced to the standby by specifying + * failover = true when creating the subscription. Enabling failover allows us + * to smoothly transition to the promoted standby, ensuring that we can + * subscribe to the new primary without losing any data. + * + * However, we do not enable failover for slots created by the table sync + * worker. This is because the table sync slot might not be fully synced on the + * standby. During syncing, the local restart_lsn and/or local catalog_xmin of + * the newly created slot on the standby are typically ahead of those on the + * primary. Therefore, the standby needs to wait for the primary server's + * restart_lsn and catalog_xmin to catch up, which takes time. + * + * Additionally, failover is not enabled for the main slot if the table sync is + * in progress. This is because if a failover occurs while the table sync + * worker has reached a certain state (SUBREL_STATE_FINISHEDCOPY or + * SUBREL_STATE_DATASYNC), replication will not be able to continue from the + * new primary node. + * + * As a result, we enable the failover option for the main slot only after the + * initial sync is complete. The failover option is implemented as a tri-state + * with values DISABLED, PENDING, and ENABLED. The state transition process + * between these values is the same as the two_phase option (see TWO_PHASE + * TRANSACTIONS for details). + * + * During the startup of the apply worker, it checks if all table syncs are in + * the READY state for a failover tri-state of PENDING. If so, it alters the + * main slot's failover property to true and updates the tri-state value from + * PENDING to ENABLED. *------------------------------------------------------------------------- */ @@ -3947,6 +3978,7 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || + newsub->failoverstate != MySubscription->failoverstate || !equal(newsub->publications, MySubscription->publications)) { if (am_parallel_apply_worker()) @@ -4482,6 +4514,8 @@ run_apply_worker() TimeLineID startpointTLI; char *err; bool must_use_password; + bool twophase_pending; + bool failover_pending; slotname = MySubscription->slotname; @@ -4538,17 +4572,38 @@ run_apply_worker() * Note: If the subscription has no tables then leave the state as * PENDING, which allows ALTER SUBSCRIPTION ... REFRESH PUBLICATION to * work. + * + * Same goes for 'failover'. It is enabled only if subscription has tables + * and all the tablesyncs have reached READY state, until then it remains + * as PENDING. */ - if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING && - AllTablesyncsReady()) + twophase_pending = + (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING); + failover_pending = + (MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING); + + if ((twophase_pending || failover_pending) && AllTablesyncsReady()) { /* Start streaming with two_phase enabled */ - options.proto.logical.twophase = true; + if (twophase_pending) + options.proto.logical.twophase = true; + + if (failover_pending) + walrcv_alter_slot(LogRepWorkerWalRcvConn, slotname, true); + walrcv_startstreaming(LogRepWorkerWalRcvConn, &options); StartTransactionCommand(); - UpdateTwoPhaseState(MySubscription->oid, LOGICALREP_TWOPHASE_STATE_ENABLED); - MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + /* Update twophase and/or failover */ + EnableTwoPhaseFailoverTriState(MySubscription->oid, twophase_pending, + failover_pending); + if (twophase_pending) + MySubscription->twophasestate = LOGICALREP_TWOPHASE_STATE_ENABLED; + + if (failover_pending) + MySubscription->failoverstate = LOGICALREP_FAILOVER_STATE_ENABLED; + CommitTransactionCommand(); } else @@ -4557,11 +4612,15 @@ run_apply_worker() } ereport(DEBUG1, - (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s", + (errmsg_internal("logical replication apply worker for subscription \"%s\" two_phase is %s and failover is %s", MySubscription->name, MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_DISABLED ? "DISABLED" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING ? "PENDING" : MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_ENABLED ? "ENABLED" : + "?", + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_DISABLED ? "DISABLED" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_PENDING ? "PENDING" : + MySubscription->failoverstate == LOGICALREP_FAILOVER_STATE_ENABLED ? "ENABLED" : "?"))); /* Run the main loop. */ diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y index 0c874e33cf..b706046811 100644 --- a/src/backend/replication/repl_gram.y +++ b/src/backend/replication/repl_gram.y @@ -64,6 +64,7 @@ Node *replication_parse_result; %token K_START_REPLICATION %token K_CREATE_REPLICATION_SLOT %token K_DROP_REPLICATION_SLOT +%token K_ALTER_REPLICATION_SLOT %token K_TIMELINE_HISTORY %token K_WAIT %token K_TIMELINE @@ -79,7 +80,8 @@ Node *replication_parse_result; %type command %type base_backup start_replication start_logical_replication - create_replication_slot drop_replication_slot identify_system + create_replication_slot drop_replication_slot + alter_replication_slot identify_system read_replication_slot timeline_history show %type generic_option_list %type generic_option @@ -111,6 +113,7 @@ command: | start_logical_replication | create_replication_slot | drop_replication_slot + | alter_replication_slot | read_replication_slot | timeline_history | show @@ -257,6 +260,18 @@ drop_replication_slot: } ; +/* ALTER_REPLICATION_SLOT slot */ +alter_replication_slot: + K_ALTER_REPLICATION_SLOT IDENT '(' generic_option_list ')' + { + AlterReplicationSlotCmd *cmd; + cmd = makeNode(AlterReplicationSlotCmd); + cmd->slotname = $2; + cmd->options = $4; + $$ = (Node *) cmd; + } + ; + /* * START_REPLICATION [SLOT slot] [PHYSICAL] %X/%X [TIMELINE %d] */ @@ -399,6 +414,7 @@ ident_or_keyword: | K_START_REPLICATION { $$ = "start_replication"; } | K_CREATE_REPLICATION_SLOT { $$ = "create_replication_slot"; } | K_DROP_REPLICATION_SLOT { $$ = "drop_replication_slot"; } + | K_ALTER_REPLICATION_SLOT { $$ = "alter_replication_slot"; } | K_TIMELINE_HISTORY { $$ = "timeline_history"; } | K_WAIT { $$ = "wait"; } | K_TIMELINE { $$ = "timeline"; } diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l index 1cc7fb858c..0b5ae23195 100644 --- a/src/backend/replication/repl_scanner.l +++ b/src/backend/replication/repl_scanner.l @@ -125,6 +125,7 @@ TIMELINE { return K_TIMELINE; } START_REPLICATION { return K_START_REPLICATION; } CREATE_REPLICATION_SLOT { return K_CREATE_REPLICATION_SLOT; } DROP_REPLICATION_SLOT { return K_DROP_REPLICATION_SLOT; } +ALTER_REPLICATION_SLOT { return K_ALTER_REPLICATION_SLOT; } TIMELINE_HISTORY { return K_TIMELINE_HISTORY; } PHYSICAL { return K_PHYSICAL; } RESERVE_WAL { return K_RESERVE_WAL; } @@ -301,6 +302,7 @@ replication_scanner_is_replication_command(void) case K_START_REPLICATION: case K_CREATE_REPLICATION_SLOT: case K_DROP_REPLICATION_SLOT: + case K_ALTER_REPLICATION_SLOT: case K_READ_REPLICATION_SLOT: case K_TIMELINE_HISTORY: case K_SHOW: diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 18bc28195b..d35e6dec55 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,9 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/memutils.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -90,7 +93,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -98,10 +101,19 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +/* + * This GUC lists streaming replication standby server slot names that + * logical WAL sender processes will wait for. + */ +char *standby_slot_names; + +/* This is parsed and cached list for raw standby_slot_names. */ +static List *standby_slot_names_list = NIL; + static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); @@ -251,7 +263,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +324,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -679,6 +693,31 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotDropAcquired(); } +/* + * Change the definition of the slot identified by the specified name. + */ +void +ReplicationSlotAlter(const char *name, bool failover) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, true); + + if (SlotIsPhysical(MyReplicationSlot)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use %s with a physical replication slot", + "ALTER_REPLICATION_SLOT")); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.failover = failover; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); +} + /* * Permanently drop the currently acquired replication slot. */ @@ -2159,3 +2198,140 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to validate slots specified in GUC standby_slot_names. + */ +static bool +validate_standby_slots(char **newval) +{ + char *rawname; + List *elemlist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Verify syntax and parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + goto ret_standby_slot_names_ng; + } + + /* + * Verify 'type' of slot now. + * + * Skip check if replication slots' data is not initialized yet i.e. we + * are in startup process. + */ + if (!ReplicationSlotCtl) + goto ret_standby_slot_names_ok; + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + goto ret_standby_slot_names_ng; + + if (!SlotIsPhysical(slot)) + { + GUC_check_errdetail("\"%s\" is not a physical replication slot", + name); + goto ret_standby_slot_names_ng; + } + } + +ret_standby_slot_names_ok: + + pfree(rawname); + list_free(elemlist); + return true; + +ret_standby_slot_names_ng: + + pfree(rawname); + list_free(elemlist); + return false; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + if (strcmp(*newval, "") == 0) + return true; + + /* + * "*" is not accepted as in that case primary will not be able to know + * for which all standbys to wait for. Even if we have physical-slots + * info, there is no way to confirm whether there is any standby + * configured for the known physical slots. + */ + if (strcmp(*newval, "*") == 0) + { + GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names", + *newval); + return false; + } + + /* Now verify if the specified slots really exist and have correct type */ + if (!validate_standby_slots(newval)) + return false; + + *extra = guc_strdup(ERROR, *newval); + + return true; +} + +/* + * GUC assign_hook for standby_slot_names + */ +void +assign_standby_slot_names(const char *newval, void *extra) +{ + List *standby_slots; + MemoryContext oldcxt; + char *standby_slot_names_cpy = extra; + + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; + + /* No value is specified for standby_slot_names. */ + if (standby_slot_names_cpy == NULL) + return; + + if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots)) + { + /* This should not happen if GUC checked check_standby_slot_names. */ + elog(ERROR, "invalid list syntax"); + } + + /* + * Switch to the same memory context under which GUC variables are + * allocated (GUCMemoryContext). + */ + oldcxt = MemoryContextSwitchTo(GetMemoryChunkContext(standby_slot_names_cpy)); + standby_slot_names_list = list_copy(standby_slots); + MemoryContextSwitchTo(oldcxt); +} + +/* + * Return a copy of standby_slot_names_list if the copy flag is set to true, + * otherwise return the original list. + */ +List * +GetStandbySlotList(bool copy) +{ + if (copy) + return list_copy(standby_slot_names_list); + else + return standby_slot_names_list; +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 4b694a03d0..fb6e37d2c3 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -42,7 +43,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +119,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +136,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +175,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +193,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +238,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -412,6 +418,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -451,6 +459,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) * crash, but this makes the data consistent after a clean shutdown. */ ReplicationSlotMarkDirty(); + + PhysicalWakeupLogicalWalSnd(); } return retlsn; @@ -679,6 +689,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -734,6 +745,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -773,6 +785,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 2398167f49..e27d231174 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -386,7 +386,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3bc9c82389..f9000b3ef8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -974,12 +974,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1029,6 +1030,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1045,6 +1055,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1054,13 +1065,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); - + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); if (reserve_wal) { @@ -1091,7 +1102,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, failover); /* * Do options check early so that we can bail before calling the @@ -1246,6 +1257,46 @@ DropReplicationSlot(DropReplicationSlotCmd *cmd) ReplicationSlotDrop(cmd->slotname, !cmd->wait); } +/* + * Process extra options given to ALTER_REPLICATION_SLOT. + */ +static void +parseAlterReplSlotOptions(AlterReplicationSlotCmd *cmd, bool *failover) +{ + ListCell *lc; + bool failover_given = false; + + /* Parse options */ + foreach(lc, cmd->options) + { + DefElem *defel = (DefElem *) lfirst(lc); + + if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } + else + elog(ERROR, "unrecognized option: %s", defel->defname); + } +} + +/* + * Change the definition of a replication slot. + */ +static void +AlterReplicationSlot(AlterReplicationSlotCmd *cmd) +{ + bool failover = false; + + parseAlterReplSlotOptions(cmd, &failover); + ReplicationSlotAlter(cmd->slotname, failover); +} + /* * Load previously initiated logical slot and prepare for sending data (via * WalSndLoop). @@ -1527,27 +1578,237 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Wake up logical walsenders with failover-enabled slots if the physical slot + * of the current walsender is specified in standby_slot_names GUC. + */ +void +PhysicalWakeupLogicalWalSnd(void) +{ + ListCell *lc; + List *standby_slots; + + Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot)); + + standby_slots = GetStandbySlotList(false); + + foreach(lc, standby_slots) + { + char *name = lfirst(lc); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); + return; + } + } +} + +/* + * Reload the config file and reinitialize the standby slot list if the GUC + * standby_slot_names has changed. + */ +static void +WalSndRereadConfigAndReInitSlotList(List **standby_slots) +{ + char *pre_standby_slot_names = pstrdup(standby_slot_names); + + ProcessConfigFile(PGC_SIGHUP); + + if (strcmp(pre_standby_slot_names, standby_slot_names) != 0) + { + list_free(*standby_slots); + *standby_slots = GetStandbySlotList(true); + } + + pfree(pre_standby_slot_names); +} + +/* + * Filter the standby slots based on the specified log sequence number + * (wait_for_lsn). + * + * This function updates the passed standby_slots list, removing any slots that + * have already caught up to or surpassed the given wait_for_lsn. Additionally, + * it removes slots that have been invalidated, dropped, or converted to + * logical slots. + */ +static void +WalSndFilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots) +{ + ListCell *lc; + List *standby_slots_cpy = *standby_slots; + + foreach(lc, standby_slots_cpy) + { + char *name = lfirst(lc); + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool invalidated = false; + char *warningfmt = NULL; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (slot && SlotIsPhysical(slot)) + { + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + } + + /* Continue if the current slot hasn't caught up. */ + if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && + restart_lsn < wait_for_lsn) + { + /* Log warning if no active_pid for this physical slot */ + if (slot->active_pid == 0) + ereport(WARNING, + errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid", + name, "standby_slot_names"), + errdetail("Logical replication is waiting on the " + "standby associated with \"%s\"", name), + errhint("Consider starting standby associated with " + "\"%s\" or amend standby_slot_names", name)); + + continue; + } + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + else if (!slot) + warningfmt = _("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring"); + + /* + * If a logical slot name is provided in standby_slot_names, issue a + * WARNING and skip it. Although logical slots are disallowed in the + * GUC check_hook(validate_standby_slots), it is still possible for a + * user to drop an existing physical slot and recreate a logical slot + * with the same name. Since it is harmless, a WARNING should be + * enough, no need to error-out. + */ + else if (SlotIsLogical(slot)) + warningfmt = _("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring"); + + /* + * Specified physical slot may have been invalidated, so no point in + * waiting for it. + */ + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring"); + else + Assert(restart_lsn >= wait_for_lsn); + + /* + * Reaching here indicates that either the slot has passed the + * wait_for_lsn or there is an issue with the slot that requires a + * warning to be reported. + */ + if (warningfmt) + ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); + + standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc); + } + + *standby_slots = standby_slots_cpy; +} + +/* + * Wait for physical standby to confirm receiving the given lsn. + * + * Used by logical decoding SQL functions that acquired slot with failover + * enabled. It waits for physical standbys corresponding to the physical slots + * specified in the standby_slot_names GUC. + */ +void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slots; + + Assert(!am_walsender); + + if (!MyReplicationSlot->data.failover) + return; + + standby_slots = GetStandbySlotList(true); + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + long sleeptime = -1; + + CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + WalSndRereadConfigAndReInitSlotList(&standby_slots); + } + + WalSndFilterStandbySlots(wait_for_lsn, &standby_slots); + + /* Exit if done waiting for every slot. */ + if (standby_slots == NIL) + break; + + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); + list_free(standby_slots); +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * - * Returns end LSN of flushed WAL. Normally this will be >= loc, but - * if we detect a shutdown request (either from postmaster or client) - * we will return early, so caller must always check. + * If the walsender holds a logical slot that has enabled failover, the + * function also waits for all the specified streaming replication standby + * servers to confirm receipt of WAL up to RecentFlushPtr. + * + * Returns end LSN of flushed WAL. Normally this will be >= loc, but if we + * detect a shutdown request (either from postmaster or client) we will return + * early, so caller must always check. */ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc) { int wakeEvents; + bool wait_for_standby = false; + uint32 wait_event; + List *standby_slots = NIL; static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr; + if (MyReplicationSlot->data.failover) + standby_slots = GetStandbySlotList(true); + /* - * Fast path to avoid acquiring the spinlock in case we already know we - * have enough WAL available. This is particularly interesting if we're - * far behind. + * Check if all the standby servers have confirmed receipt of WAL up to + * RecentFlushPtr if we already know we have enough WAL available. + * + * Note that we cannot directly return without checking the status of + * standby servers because the standby_slot_names may have changed, which + * means there could be new standby slots in the list that have not yet + * caught up to the RecentFlushPtr. */ - if (RecentFlushPtr != InvalidXLogRecPtr && - loc <= RecentFlushPtr) - return RecentFlushPtr; + if (!XLogRecPtrIsInvalid(RecentFlushPtr) && loc <= RecentFlushPtr) + { + WalSndFilterStandbySlots(RecentFlushPtr, &standby_slots); + + /* + * Fast path to avoid acquiring the spinlock in case we already know we + * have enough WAL available and all the standby servers have confirmed + * receipt of WAL up to RecentFlushPtr. This is particularly interesting + * if we're far behind. + */ + if (standby_slots == NIL) + return RecentFlushPtr; + } /* Get a more recent flush pointer. */ if (!RecoveryInProgress()) @@ -1568,7 +1829,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (ConfigReloadPending) { ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); + WalSndRereadConfigAndReInitSlotList(&standby_slots); SyncRepInitConfig(); } @@ -1583,8 +1844,18 @@ WalSndWaitForWal(XLogRecPtr loc) if (got_STOPPING) XLogBackgroundFlush(); + /* + * Update the standby slots that have not yet caught up to the flushed + * position. It is good to wait up to RecentFlushPtr and then let it + * send the changes to logical subscribers one by one which are + * already covered in RecentFlushPtr without needing to wait on every + * change for standby confirmation. + */ + if (wait_for_standby) + WalSndFilterStandbySlots(RecentFlushPtr, &standby_slots); + /* Update our idea of the currently flushed position. */ - if (!RecoveryInProgress()) + else if (!RecoveryInProgress()) RecentFlushPtr = GetFlushRecPtr(NULL); else RecentFlushPtr = GetXLogReplayRecPtr(NULL); @@ -1612,8 +1883,14 @@ WalSndWaitForWal(XLogRecPtr loc) !waiting_for_ping_response) WalSndKeepalive(false, InvalidXLogRecPtr); - /* check whether we're done */ - if (loc <= RecentFlushPtr) + if (loc > RecentFlushPtr) + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL; + else if (standby_slots) + { + wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION; + wait_for_standby = true; + } + else break; /* Waiting for new WAL. Since we need to wait, we're now caught up. */ @@ -1654,9 +1931,11 @@ WalSndWaitForWal(XLogRecPtr loc) if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); + WalSndWait(wakeEvents, sleeptime, wait_event); } + list_free(standby_slots); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -1819,6 +2098,13 @@ exec_replication_command(const char *cmd_string) EndReplicationCommand(cmdtag); break; + case T_AlterReplicationSlotCmd: + cmdtag = "ALTER_REPLICATION_SLOT"; + set_ps_display(cmdtag); + AlterReplicationSlot((AlterReplicationSlotCmd *) cmd_node); + EndReplicationCommand(cmdtag); + break; + case T_StartReplicationCmd: { StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node; @@ -2049,6 +2335,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + PhysicalWakeupLogicalWalSnd(); } /* @@ -3311,6 +3598,8 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } @@ -3380,8 +3669,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) * * And, we use separate shared memory CVs for physical and logical * walsenders for selective wake ups, see WalSndWakeup() for more details. + * + * When the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV + * that is woken up by physical walsenders when the walreceiver has + * confirmed the receipt of LSN. */ - if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) + if (wait_event == WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION) + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv); else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL) ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d7995931bd..ede94a1ede 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -76,6 +76,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for the WAL to be received by physical standby in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 5c6f5af873..f12e2e384e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4572,6 +4572,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("Lists streaming replication standby server slot " + "names that logical WAL sender processes will wait for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "WAL sender processes only after specified " + "replication slots confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, assign_standby_slot_names, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index cf9f283cfe..5d940b72cd 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -329,6 +329,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsender processes will wait for # - Standby Servers - diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 4878aa22bf..e16286f18c 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -661,7 +661,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " + res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " "%s as caught_up, conflicting as invalid " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " @@ -679,6 +679,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) int i_slotname; int i_plugin; int i_twophase; + int i_failover; int i_caught_up; int i_invalid; @@ -687,6 +688,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) i_slotname = PQfnumber(res, "slot_name"); i_plugin = PQfnumber(res, "plugin"); i_twophase = PQfnumber(res, "two_phase"); + i_failover = PQfnumber(res, "failover"); i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); @@ -697,6 +699,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname)); curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); + curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); } diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 3960af4036..09f7437716 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -916,8 +916,10 @@ create_logical_replication_slots(void) appendStringLiteralConn(query, slot_info->slotname, conn); appendPQExpBuffer(query, ", "); appendStringLiteralConn(query, slot_info->plugin, conn); - appendPQExpBuffer(query, ", false, %s);", - slot_info->two_phase ? "true" : "false"); + + appendPQExpBuffer(query, ", false, %s, %s);", + slot_info->two_phase ? "true" : "false", + slot_info->failover ? "true" : "false"); PQclear(executeQueryOrDie(conn, "%s", query->data)); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index a710f325de..f6ac78418e 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -160,6 +160,8 @@ typedef struct bool two_phase; /* can the slot decode 2PC? */ bool caught_up; /* has the slot caught up to latest changes? */ bool invalid; /* if true, the slot is unusable */ + bool failover; /* is the slot designated to be synced + * to the physical standby? */ } LogicalSlotInfo; typedef struct diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl index 5b01cf8c40..0a1c467ed0 100644 --- a/src/bin/pg_upgrade/t/003_logical_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_slots.pl @@ -158,7 +158,7 @@ $sub->start; $sub->safe_psql( 'postgres', qq[ CREATE TABLE tbl (a int); - CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true') + CREATE SUBSCRIPTION regress_sub CONNECTION '$old_connstr' PUBLICATION regress_pub WITH (two_phase = 'true', failover = 'true') ]); $sub->wait_for_subscription_sync($oldpub, 'regress_sub'); @@ -172,8 +172,8 @@ command_ok([@pg_upgrade_cmd], 'run of pg_upgrade of old cluster'); # Check that the slot 'regress_sub' has migrated to the new cluster $newpub->start; my $result = $newpub->safe_psql('postgres', - "SELECT slot_name, two_phase FROM pg_replication_slots"); -is($result, qq(regress_sub|t), 'check the slot exists on new cluster'); + "SELECT slot_name, two_phase, failover FROM pg_replication_slots"); +is($result, qq(regress_sub|t|t), 'check the slot exists on new cluster'); # Update the connection my $new_connstr = $newpub->connstr . ' dbname=postgres'; diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 5077e7b358..36795b1085 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6563,7 +6563,8 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false, + false}; if (pset.sversion < 100000) { @@ -6627,6 +6628,11 @@ describeSubscriptions(const char *pattern, bool verbose) gettext_noop("Password required"), gettext_noop("Run as owner?")); + if (pset.sversion >= 170000) + appendPQExpBuffer(&buf, + ", subfailoverstate AS \"%s\"\n", + gettext_noop("Failover")); + appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" ", subconninfo AS \"%s\"\n", diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 049801186c..905964a2e8 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3327,7 +3327,7 @@ psql_completion(const char *text, int start, int end) /* Complete "CREATE SUBSCRIPTION ... WITH ( " */ else if (HeadMatches("CREATE", "SUBSCRIPTION") && TailMatches("WITH", "(")) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", - "disable_on_error", "enabled", "origin", + "disable_on_error", "enabled", "failover", "origin", "password_required", "run_as_owner", "slot_name", "streaming", "synchronous_commit", "two_phase"); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index fb58dee3bc..d906734750 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11100,17 +11100,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e0b91eacd2..3190a3889b 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -31,6 +31,14 @@ #define LOGICALREP_TWOPHASE_STATE_PENDING 'p' #define LOGICALREP_TWOPHASE_STATE_ENABLED 'e' +/* + * failover tri-state values. See comments atop worker.c to know more about + * these states. + */ +#define LOGICALREP_FAILOVER_STATE_DISABLED 'd' +#define LOGICALREP_FAILOVER_STATE_PENDING 'p' +#define LOGICALREP_FAILOVER_STATE_ENABLED 'e' + /* * The subscription will request the publisher to only send changes that do not * have any origin. @@ -93,6 +101,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + char subfailoverstate; /* Failover state */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -145,6 +155,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + char failoverstate; /* Allow slot to be synchronized for failover */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index 5142a08729..bef8a7162e 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -72,6 +72,18 @@ typedef struct DropReplicationSlotCmd } DropReplicationSlotCmd; +/* ---------------------- + * ALTER_REPLICATION_SLOT command + * ---------------------- + */ +typedef struct AlterReplicationSlotCmd +{ + NodeTag type; + char *slotname; + List *options; +} AlterReplicationSlotCmd; + + /* ---------------------- * START_REPLICATION command * ---------------------- diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index d3535eed58..ca06e5b1ad 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for physical standbys)? + * Only relevant for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -210,6 +216,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -218,9 +225,10 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); @@ -253,4 +261,6 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern List *GetStandbySlotList(bool copy); + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 04b439dc50..61bc8de72c 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -356,9 +356,20 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); +/* + * walrcv_alter_slot_fn + * + * Change the definition of a replication slot. Currently, it only supports + * changing the failover property of the slot. + */ +typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn, + const char *slotname, + bool failover); + /* * walrcv_get_backend_pid_fn * @@ -400,6 +411,7 @@ typedef struct WalReceiverFunctionsType walrcv_receive_fn walrcv_receive; walrcv_send_fn walrcv_send; walrcv_create_slot_fn walrcv_create_slot; + walrcv_alter_slot_fn walrcv_alter_slot; walrcv_get_backend_pid_fn walrcv_get_backend_pid; walrcv_exec_fn walrcv_exec; walrcv_disconnect_fn walrcv_disconnect; @@ -429,8 +441,10 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) +#define walrcv_alter_slot(conn, slotname, failover) \ + WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 268f8e8d0f..1fcc22a127 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -14,6 +14,8 @@ #include +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -47,6 +49,8 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern void PhysicalWakeupLogicalWalSnd(void); +extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 13fd5877a6..48c6a7a146 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,13 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + /* + * Used by physical walsenders holding slots specified in + * standby_slot_names to wake up logical walsenders holding + * failover-enabled slots when a walreceiver confirms the receipt of LSN. + */ + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 47854b5cd4..4378690ab0 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -258,7 +258,8 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname); extern bool AllTablesyncsReady(void); -extern void UpdateTwoPhaseState(Oid suboid, char new_state); +extern void EnableTwoPhaseFailoverTriState(Oid suboid, bool enable_twophase, + bool enable_failover); extern void process_syncing_tables(XLogRecPtr current_lsn); extern void invalidate_syncing_table_states(Datum arg, int cacheid, diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 3d74483f44..2f3028cc07 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -162,5 +162,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); +extern void assign_standby_slot_names(const char *newval, void *extra); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9d8039684a..3be3ee52fc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5025d65b1b..a3c3ee3a14 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Pass failover=true (last-arg), it should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..bff0f52a46 --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,149 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (primary_slot_name = sb1_slot) +# | ----> standby2 (primary_slot_name = sb2_slot) +# primary ----- | +# | ----> subscriber1 (failover = true) +# | ----> subscriber2 (failover = false) +# +# standby_slot_names = 'sb1_slot' +# +# Set up is configured in such a way that the logical slot of subscriber1 is +# enabled failover, thus it will wait for the physical slot of +# standby1(sb1_slot) to catch up before sending decoded changes to subscriber1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow any logical slots that enabled failover from +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +# Create a subscription with failover = true +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node without enabling failover, wait for sync to +# complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscription that's up and running and is not enabled for failover. +# It gets the data from primary without waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscription that's up and running and is enabled for failover +# doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscription enabled for failover +# While the standby was down, this subscriber didn't receive any data from +# primary i.e. the primary didn't allow it to go ahead of standby. +$publisher->wait_for_catchup('regress_mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 05070393b9..cb3b04aa0c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..96c614332c 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | d | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,31 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | d | off | dbname=regress_doesnotexist | 0/0 +(1 row) + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; +-- test failover option +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, failover = true); +WARNING: subscription was created, but is not connected +HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. +\dRs+ + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | p | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); diff --git a/src/test/regress/sql/subscription.sql b/src/test/regress/sql/subscription.sql index 444e563ff3..e4601158b3 100644 --- a/src/test/regress/sql/subscription.sql +++ b/src/test/regress/sql/subscription.sql @@ -290,6 +290,14 @@ ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); DROP SUBSCRIPTION regress_testsub; +-- test failover option +CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, failover = true); + +\dRs+ + +ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); +DROP SUBSCRIPTION regress_testsub; + -- let's do some tests with pg_create_subscription rather than superuser SET SESSION AUTHORIZATION regress_subscription_user3; diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 86a9886d4f..d80d30e99c 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -85,6 +85,7 @@ AlterOwnerStmt AlterPolicyStmt AlterPublicationAction AlterPublicationStmt +AlterReplicationSlotCmd AlterRoleSetStmt AlterRoleStmt AlterSeqStmt @@ -3862,6 +3863,7 @@ varattrib_1b_e varattrib_4b vbits verifier_context +walrcv_alter_slot_fn walrcv_check_conninfo_fn walrcv_connect_fn walrcv_create_slot_fn -- 2.30.0.windows.2