From c5a2238dedd38a37b52ce106fccca65f1851bd02 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Fri, 20 Oct 2023 08:32:38 +0530 Subject: [PATCH v25 1/2] Allow logical walsenders to wait for the physical standbys A new property 'failover' is added at the slot level which is persistent information which specifies that this logical slot is enabled to be synced to the physical standbys so that logical replication can be resumed after failover. It is always false for physical slots. Users can set it during the create subscription or during pg_create_logical_replication_slot and alter it using alter subscription. Examples: create subscription mysub connection '..' publication mypub WITH (failover = true); alter subscription mysub set (failover = true); --last arg SELECT * FROM pg_create_logical_replication_slot('myslot', 'pgoutput', false, true, true); This 'failover' is displayed as part of pg_replication_slots view. A new GUC standby_slot_names has been added. It is the list of physical replication slots that logical replication with failover enabled waits for. The intent of this wait is that no logical replication subscribers (with failover=true) should go ahead of physical replication standbys (corresponding to the physical slots in standby_slot_names). --- contrib/test_decoding/expected/slot.out | 19 ++ contrib/test_decoding/sql/slot.sql | 6 + doc/src/sgml/config.sgml | 22 ++ doc/src/sgml/func.sgml | 11 +- doc/src/sgml/ref/alter_subscription.sgml | 5 +- doc/src/sgml/ref/create_subscription.sgml | 11 + doc/src/sgml/system-views.sgml | 11 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/commands/subscriptioncmds.c | 30 +- .../libpqwalreceiver/libpqwalreceiver.c | 18 +- src/backend/replication/logical/logical.c | 3 + .../replication/logical/logicalfuncs.c | 9 + src/backend/replication/logical/tablesync.c | 3 +- src/backend/replication/logical/worker.c | 2 + src/backend/replication/pgoutput/pgoutput.c | 14 + src/backend/replication/slot.c | 128 +++++++- src/backend/replication/slotfuncs.c | 28 +- src/backend/replication/walreceiver.c | 2 +- src/backend/replication/walsender.c | 278 ++++++++++++++++-- .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/misc/guc_tables.c | 14 + src/backend/utils/misc/postgresql.conf.sample | 2 + src/bin/psql/describe.c | 9 +- src/bin/psql/tab-complete.c | 3 +- src/include/catalog/pg_proc.dat | 14 +- src/include/catalog/pg_subscription.h | 4 + src/include/replication/logical.h | 9 + src/include/replication/pgoutput.h | 1 + src/include/replication/slot.h | 15 +- src/include/replication/walreceiver.h | 7 +- src/include/replication/walsender.h | 4 + src/include/replication/walsender_private.h | 2 + src/include/utils/guc_hooks.h | 2 + src/test/recovery/meson.build | 1 + src/test/recovery/t/006_logical_decoding.pl | 3 +- src/test/recovery/t/050_verify_slot_order.pl | 158 ++++++++++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/subscription.out | 152 +++++----- 40 files changed, 864 insertions(+), 147 deletions(-) create mode 100644 src/test/recovery/t/050_verify_slot_order.pl diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73..1c055f329c 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,22 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test logical slots creation with 'failover'=true (last arg) +SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +---------------+-----------+---------- + failover_slot | logical | t +(1 row) + +SELECT pg_drop_replication_slot('failover_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c5667..1133e45abb 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,9 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test logical slots creation with 'failover'=true (last arg) +SELECT 'init' FROM pg_create_logical_replication_slot('failover_slot', 'test_decoding', false, false, true); +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_slot'); diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 3839c72c86..18fcc1f84d 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4344,6 +4344,28 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + standby_slot_names (string) + + standby_slot_names configuration parameter + + + + + List of physical replication slots that logical replication slots with + failover enabled waits for. If a logical replication connection is + meant to switch to a physical standby after the standby is promoted, + the physical replication slot for the standby should be listed here. + + + The standbys corresponding to the physical replication slots in + standby_slot_names must enable + enable_syncslot for the standbys to receive + failover logical slots changes from the primary. + + + + diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 7c3e940afe..2a8778b2c0 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27405,7 +27405,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27420,8 +27420,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + physical standbys so that logical replication can be resumed + after failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index ba70855530..81eae5d270 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -226,8 +226,9 @@ ALTER SUBSCRIPTION name RENAME TO < streaming, disable_on_error, password_required, - run_as_owner, and - origin. + run_as_owner, + origin, and + failover. Only a superuser can set password_required = false. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index c1bafbfa06..22215d4ccb 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -399,6 +399,17 @@ CREATE SUBSCRIPTION subscription_name + + + failover (boolean) + + + Specifies whether the replication slot assocaited with the subscription + is enabled to be synced to the physical standbys so that logical + replication can be resumed from the new primary after failover. + + + diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 2b35c2f91b..2d10111f76 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2532,6 +2532,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx invalidated). Always NULL for physical slots. + + + + failover bool + + + True if this logical slot is enabled to be synced to the physical + standbys so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index d6a978f136..c520ec55a2 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; sub->runasowner = subform->subrunasowner; + sub->failover = subform->subfailover; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 35d738d576..24ad69c333 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fcb14976c0..211e3d7b39 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1002,7 +1002,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflicting + L.conflicting, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index edc82c11be..91b659d1aa 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -71,6 +71,7 @@ #define SUBOPT_RUN_AS_OWNER 0x00001000 #define SUBOPT_LSN 0x00002000 #define SUBOPT_ORIGIN 0x00004000 +#define SUBOPT_FAILOVER 0x00008000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -96,6 +97,7 @@ typedef struct SubOpts bool passwordrequired; bool runasowner; char *origin; + bool failover; XLogRecPtr lsn; } SubOpts; @@ -157,6 +159,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); + if (IsSet(supported_opts, SUBOPT_FAILOVER)) + opts->failover = false; /* Parse options */ foreach(lc, stmt_options) @@ -326,6 +330,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", opts->origin)); } + else if (IsSet(supported_opts, SUBOPT_FAILOVER) && + strcmp(defel->defname, "failover") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_FAILOVER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_FAILOVER; + opts->failover = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_LSN) && strcmp(defel->defname, "lsn") == 0) { @@ -591,7 +604,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -710,6 +724,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, publicationListToArray(publications); values[Anum_pg_subscription_suborigin - 1] = CStringGetTextDatum(opts.origin); + values[Anum_pg_subscription_subfailover - 1] = + BoolGetDatum(opts.failover); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -807,7 +823,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, twophase_enabled = true; walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled, - CRS_NOEXPORT_SNAPSHOT, NULL); + opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL); if (twophase_enabled) UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED); @@ -1132,7 +1148,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN | + SUBOPT_FAILOVER); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); @@ -1218,6 +1235,13 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, replaces[Anum_pg_subscription_suborigin - 1] = true; } + if (IsSet(opts.specified_opts, SUBOPT_FAILOVER)) + { + values[Anum_pg_subscription_subfailover - 1] = + BoolGetDatum(opts.failover); + replaces[Anum_pg_subscription_subfailover - 1] = true; + } + update_tuple = true; break; } diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 60d5c1fc40..3ffcd74698 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -74,6 +74,7 @@ static char *libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn); @@ -470,6 +471,10 @@ libpqrcv_startstreaming(WalReceiverConn *conn, appendStringInfo(&cmd, ", origin '%s'", options->proto.logical.origin); + if (options->proto.logical.failover && + PQserverVersion(conn->streamConn) >= 160000) + appendStringInfo(&cmd, ", failover 'on'"); + pubnames = options->proto.logical.publication_names; pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); if (!pubnames_str) @@ -883,8 +888,8 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) */ static char * libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, - bool temporary, bool two_phase, CRSSnapshotAction snapshot_action, - XLogRecPtr *lsn) + bool temporary, bool two_phase, bool failover, + CRSSnapshotAction snapshot_action, XLogRecPtr *lsn) { PGresult *res; StringInfoData cmd; @@ -913,7 +918,14 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, else appendStringInfoChar(&cmd, ' '); } - + if (failover) + { + appendStringInfoString(&cmd, "FAILOVER"); + if (use_new_options_syntax) + appendStringInfoString(&cmd, ", "); + else + appendStringInfoChar(&cmd, ' '); + } if (use_new_options_syntax) { switch (snapshot_action) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 41243d0187..6c34f4f4d9 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -602,6 +602,9 @@ CreateDecodingContext(XLogRecPtr start_lsn, SnapBuildSetTwoPhaseAt(ctx->snapshot_builder, start_lsn); } + /* set failover in the slot, as requested */ + slot->data.failover = ctx->failover; + ctx->reorder->output_rewrites = ctx->options.receive_rewrites; ereport(LOG, diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 1067aca08f..b9826baa0b 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -30,6 +30,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/array.h" #include "utils/builtins.h" @@ -109,6 +110,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin MemoryContext per_query_ctx; MemoryContext oldcontext; XLogRecPtr end_of_wal; + XLogRecPtr wal_to_wait; LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; ArrayType *arr; @@ -228,6 +230,13 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin NameStr(MyReplicationSlot->data.plugin), format_procedure(fcinfo->flinfo->fn_oid)))); + if (XLogRecPtrIsInvalid(upto_lsn)) + wal_to_wait = end_of_wal; + else + wal_to_wait = Min(upto_lsn, end_of_wal); + + WalSndWaitForStandbyConfirmation(wal_to_wait); + ctx->output_writer_private = p; /* diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 37a0abe2f4..196bc8ecb7 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1412,7 +1412,8 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, false /* permanent */ , false /* two_phase */ , - CRS_USE_SNAPSHOT, origin_startpos); + MySubscription->failover , CRS_USE_SNAPSHOT, + origin_startpos); /* * Setup replication origin tracking. The purpose of doing this before the diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 54c14495be..c3eac6dd6d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3952,6 +3952,7 @@ maybe_reread_subscription(void) newsub->passwordrequired != MySubscription->passwordrequired || strcmp(newsub->origin, MySubscription->origin) != 0 || newsub->owner != MySubscription->owner || + newsub->failover != MySubscription->failover || !equal(newsub->publications, MySubscription->publications)) { if (am_parallel_apply_worker()) @@ -4402,6 +4403,7 @@ set_stream_options(WalRcvStreamOptions *options, options->proto.logical.twophase = false; options->proto.logical.origin = pstrdup(MySubscription->origin); + options->proto.logical.failover = MySubscription->failover; } /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index c1c66848f3..abf1d7fdfe 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -284,6 +284,7 @@ parse_output_parameters(List *options, PGOutputData *data) bool streaming_given = false; bool two_phase_option_given = false; bool origin_option_given = false; + bool failover_option_given = false; data->binary = false; data->streaming = LOGICALREP_STREAM_OFF; @@ -397,6 +398,16 @@ parse_output_parameters(List *options, PGOutputData *data) errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("unrecognized origin value: \"%s\"", origin)); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_option_given) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_option_given = true; + + data->failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); } @@ -500,6 +511,9 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, else ctx->twophase_opt_given = true; + /* assign the failover flag */ + ctx->failover = data->failover; + /* Init publication state. */ data->publications = NIL; publications_valid = false; diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 7e5ec500d8..60d80fd0bd 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -52,6 +52,8 @@ #include "storage/proc.h" #include "storage/procarray.h" #include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/varlena.h" /* * Replication slot on-disk data structure. @@ -90,7 +92,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -98,9 +100,11 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; /* My backend's replication slot in the shared memory array */ ReplicationSlot *MyReplicationSlot = NULL; -/* GUC variable */ +/* GUC variables */ int max_replication_slots = 10; /* the maximum number of replication * slots */ +char *standby_slot_names; +List *standby_slot_names_list = NIL; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -251,7 +255,8 @@ ReplicationSlotValidateName(const char *name, int elevel) */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +316,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -596,6 +602,13 @@ ReplicationSlotRelease(void) MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; LWLockRelease(ProcArrayLock); + + /* + * To prevent the backend from accessing a freed name list in the next + * call, reset the name list here. This is a convenient place to reset the + * standby names as it will always be called after finishing replication. + */ + standby_slot_names_list = NIL; } /* @@ -2121,3 +2134,112 @@ RestoreSlotFromDisk(const char *name) (errmsg("too many replication slots active before shutdown"), errhint("Increase max_replication_slots and try again."))); } + +/* + * A helper function to validate slots specified in standby_slot_names GUCs. + */ +static bool +validate_standby_slots(char **newval) +{ + char *rawname; + List *elemlist; + ListCell *lc; + + /* Need a modifiable copy of string */ + rawname = pstrdup(*newval); + + /* Verify syntax and parse string into list of identifiers */ + if (!SplitIdentifierString(rawname, ',', &elemlist)) + { + /* syntax error in name list */ + GUC_check_errdetail("List syntax is invalid."); + pfree(rawname); + list_free(elemlist); + return false; + } + + /* + * Verify 'type' of slot now. + * + * Skip check if replication slots' data is not initialized yet i.e. we + * are in startup process. + */ + if (!ReplicationSlotCtl) + return true; + + foreach(lc, elemlist) + { + char *name = lfirst(lc); + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (!slot) + { + GUC_check_errdetail("replication slot \"%s\" does not exist", name); + list_free(elemlist); + return false; + } + + if (SlotIsLogical(slot)) + { + GUC_check_errdetail("cannot have logical replication slot \"%s\" " + "in this parameter", name); + list_free(elemlist); + return false; + } + } + + list_free(elemlist); + return true; +} + +/* + * GUC check_hook for standby_slot_names + */ +bool +check_standby_slot_names(char **newval, void **extra, GucSource source) +{ + if (strcmp(*newval, "") == 0) + return true; + + /* + * "*" is not accepted as in that case primary will not be able to + * know for which all standbys to wait for. Even if we have physical-slots + * info, there is no way to confirm whether there is any standby configured + * for the known physical slots. + */ + if (strcmp(*newval, "*") == 0) + { + GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names", + *newval); + return false; + } + + /* Now verify if the specified slots really exist and have correct type */ + if (!validate_standby_slots(newval)) + return false; + + return true; +} + +/* + * Initialize the list from raw standby_slot_names and cache it, + * in order to avoid parsing these repeatedly. Done at WALSender + * startup and after each SIGHUP. + */ +void +SlotSyncInitConfig(void) +{ + char *rawname; + + /* Free the old one */ + list_free(standby_slot_names_list); + standby_slot_names_list = NIL; + + if (strcmp(standby_slot_names, "") != 0) + { + rawname = pstrdup(standby_slot_names); + SplitIdentifierString(rawname, ',', &standby_slot_names_list); + } +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6035cf4816..8263a3b54d 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -21,6 +21,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/pg_lsn.h" @@ -42,7 +43,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +119,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +136,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +175,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +193,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +238,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -412,6 +418,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(false); } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -440,17 +448,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) if (startlsn < moveto) { - SpinLockAcquire(&MyReplicationSlot->mutex); - MyReplicationSlot->data.restart_lsn = moveto; - SpinLockRelease(&MyReplicationSlot->mutex); + PhysicalConfirmReceivedLocation(moveto); retlsn = moveto; - - /* - * Dirty the slot so as it is written out at the next checkpoint. Note - * that the LSN position advanced may still be lost in the event of a - * crash, but this makes the data consistent after a clean shutdown. - */ - ReplicationSlotMarkDirty(); } return retlsn; @@ -683,6 +682,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -738,6 +738,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -777,6 +778,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index feff709435..68073ce3ca 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -387,7 +387,7 @@ WalReceiverMain(void) "pg_walreceiver_%lld", (long long int) walrcv_get_backend_pid(wrconn)); - walrcv_create_slot(wrconn, slotname, true, false, 0, NULL); + walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL); SpinLockAcquire(&walrcv->mutex); strlcpy(walrcv->slotname, slotname, NAMEDATALEN); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e250b0567e..7a05f977a7 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -148,6 +148,12 @@ static TimeLineID sendTimeLineNextTLI = 0; static bool sendTimeLineIsHistoric = false; static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; +/* + * The variable to store the current value of standby_slot_names before each + * ConfigReload. + */ +static char *StandbySlotNamesPreReload = NULL; + /* * How far have we sent WAL already? This is also advertised in * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) @@ -259,7 +265,7 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, TimeLineID *tli_p); - +static bool WalSndSlotInList(char *slot_names, List *slot_names_list); /* Initialize walsender process before entering the main command loop */ void @@ -828,6 +834,7 @@ StartReplication(StartReplicationCmd *cmd) SpinLockRelease(&MyWalSnd->mutex); SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ replication_active = true; @@ -974,12 +981,13 @@ static void parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, bool *reserve_wal, CRSSnapshotAction *snapshot_action, - bool *two_phase) + bool *two_phase, bool *failover) { ListCell *lc; bool snapshot_action_given = false; bool reserve_wal_given = false; bool two_phase_given = false; + bool failover_given = false; /* Parse options */ foreach(lc, cmd->options) @@ -1029,6 +1037,15 @@ parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd, two_phase_given = true; *two_phase = defGetBoolean(defel); } + else if (strcmp(defel->defname, "failover") == 0) + { + if (failover_given || cmd->kind != REPLICATION_KIND_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"))); + failover_given = true; + *failover = defGetBoolean(defel); + } else elog(ERROR, "unrecognized option: %s", defel->defname); } @@ -1045,6 +1062,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) char *slot_name; bool reserve_wal = false; bool two_phase = false; + bool failover = false; CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT; DestReceiver *dest; TupOutputState *tstate; @@ -1054,13 +1072,14 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) Assert(!MyReplicationSlot); - parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase); + parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action, &two_phase, + &failover); if (cmd->kind == REPLICATION_KIND_PHYSICAL) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); } else { @@ -1075,7 +1094,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, failover); } if (cmd->kind == REPLICATION_KIND_LOGICAL) @@ -1318,6 +1337,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) replication_active = true; SyncRepInitConfig(); + SlotSyncInitConfig(); /* Main loop of walsender */ WalSndLoop(XLogSendLogical); @@ -1408,6 +1428,33 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, ProcessPendingWrites(); } +/* + * Process input from the client and the timeout. + */ +static void +ProcessRepliesAndTimeOut(void) +{ + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + SlotSyncInitConfig(); + } + + /* Check for input from the client */ + ProcessRepliesIfAny(); + + /* die if timeout was reached */ + WalSndCheckTimeOut(); + + /* Send keepalive if the time has come */ + WalSndKeepaliveIfNecessary(); +} + /* * Wait until there is no pending write. Also process replies from the other * side and check timeouts during that. @@ -1419,14 +1466,7 @@ ProcessPendingWrites(void) { long sleeptime; - /* Check for input from the client */ - ProcessRepliesIfAny(); - - /* die if timeout was reached */ - WalSndCheckTimeOut(); - - /* Send keepalive if the time has come */ - WalSndKeepaliveIfNecessary(); + ProcessRepliesAndTimeOut(); if (!pq_is_send_pending()) break; @@ -1440,16 +1480,6 @@ ProcessPendingWrites(void) /* Clear any already-pending wakeups */ ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); - - /* Process any requests or signals received recently */ - if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - SyncRepInitConfig(); - } - /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) WalSndShutdown(); @@ -1527,6 +1557,190 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId ProcessPendingWrites(); } +/* + * Does this Wal Sender need to wake up logical walsender. + * + * Check if the physical slot of this walsender is specified in + * standby_slot_names GUC. + */ +static bool +WalSndWakeupNeeded() +{ + Assert(MyReplicationSlot != NULL); + Assert(SlotIsPhysical(MyReplicationSlot)); + + /* + * Initialize the slot list if not yet. This is needed when it is called + * outside of the walsender. + */ + if (strcmp(standby_slot_names, "") != 0 && standby_slot_names_list == NIL) + SlotSyncInitConfig(); + + return WalSndSlotInList(standby_slot_names, standby_slot_names_list); +} + +/* + * Helper function for WalSndWakeupNeeded. + */ +static bool +WalSndSlotInList(char *slot_names, List *slot_names_list) +{ + ListCell *l; + bool inlist = false; + + if (strcmp(standby_slot_names, "") == 0) + return false; + + /* Special handling for "*" which means all. */ + if (strcmp(slot_names, "*") == 0) + return true; + + foreach(l, slot_names_list) + { + char *name = lfirst(l); + + if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0) + { + inlist = true; + break; + } + } + + return inlist; +} + +/* + * Wait for physical standby to confirm receiving given lsn. + * + * Here logical walsender associated with failover logical slot waits + * for physical standbys corresponding to physical slots specified in + * standby_slot_names GUC. + */ +void +WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) +{ + List *standby_slot_cpy; + + if (!MyReplicationSlot->data.failover) + return; + + standby_slot_cpy = list_copy(standby_slot_names_list); + + ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv); + + for (;;) + { + ListCell *l; + long sleeptime = -1; + + foreach(l, standby_slot_cpy) + { + char *name = lfirst(l); + XLogRecPtr restart_lsn = InvalidXLogRecPtr; + bool invalidated = false; + char *warningfmt = NULL; + ReplicationSlot *slot; + + slot = SearchNamedReplicationSlot(name, true); + + if (slot && SlotIsPhysical(slot)) + { + SpinLockAcquire(&slot->mutex); + restart_lsn = slot->data.restart_lsn; + invalidated = slot->data.invalidated != RS_INVAL_NONE; + SpinLockRelease(&slot->mutex); + } + + /* Continue if the current slot hasn't caught up. */ + if (!invalidated && !XLogRecPtrIsInvalid(restart_lsn) && + restart_lsn < wait_for_lsn) + continue; + + /* + * It may happen that the slot specified in standby_slot_names GUC + * value is dropped, so let's skip over it. + */ + else if (!slot) + warningfmt = _("replication slot \"%s\" specified in parameter \"%s\" does not exist, ignoring"); + + /* + * If logical slot name is given in standby_slot_names, give + * WARNING and skip it. Since it is harmless, so WARNING should be + * enough, no need to error-out. + */ + else if (SlotIsLogical(slot)) + warningfmt = _("cannot have logical replication slot \"%s\" in parameter \"%s\", ignoring"); + + /* + * Specified physical slot may have been invalidated, so no point + * in waiting for it. + */ + else if (XLogRecPtrIsInvalid(restart_lsn) || invalidated) + warningfmt = _("physical slot \"%s\" specified in parameter \"%s\" has been invalidated, ignoring"); + else + Assert(restart_lsn >= wait_for_lsn); + + /* + * Reaching here indicates that either the slot has passed the + * wait_for_lsn or there is an issue with the slot that requires a + * warning to be reported. + */ + if (warningfmt) + ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names")); + + standby_slot_cpy = foreach_delete_current(standby_slot_cpy, l); + } + + /* Exit if done waiting for every slot. */ + if (standby_slot_cpy == NIL) + break; + + if (am_walsender) + { + bool recheck = false; + + /* + * Save the current value of standby_slot_names before ConfigReload + * in ProcessRepliesAndTimeOut. + */ + if (ConfigReloadPending) + { + /* Free the previous allocation if any */ + if (StandbySlotNamesPreReload) + pfree(StandbySlotNamesPreReload); + + StandbySlotNamesPreReload = pstrdup(standby_slot_names); + recheck = true; + } + + ProcessRepliesAndTimeOut(); + + /* + * Re-initiate standby_slot_cpy if standby_slot_names changed + * after ConfigReload. + */ + if (recheck && + strcmp(StandbySlotNamesPreReload, standby_slot_names) != 0) + standby_slot_cpy = list_copy(standby_slot_names_list); + + /* If postmaster asked us to stop, don't wait anymore. */ + if (got_STOPPING) + break; + } + + /* + * Sleep until other physical walsenders awaken us or until a timeout + * occurs. + */ + sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); + + ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, sleeptime, + WAIT_EVENT_WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION); + } + + ConditionVariableCancelSleep(); +} + /* * Wait till WAL < loc is flushed to disk so it can be safely sent to client. * @@ -1570,6 +1784,7 @@ WalSndWaitForWal(XLogRecPtr loc) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -1657,6 +1872,15 @@ WalSndWaitForWal(XLogRecPtr loc) WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL); } + /* + * Wait for specified streaming replication standby servers (if any) to + * confirm receipt of WAL upto RecentFlushPtr. It is good to wait here + * upto RecentFlushPtr and then let it send the changes to logical + * subscribers one by one which are already covered in RecentFlushPtr + * without needing to wait on every change for standby confirmation. + */ + WalSndWaitForStandbyConfirmation(RecentFlushPtr); + /* reactivate latch so WalSndLoop knows to continue */ SetLatch(MyLatch); return RecentFlushPtr; @@ -2030,7 +2254,7 @@ ProcessStandbyMessage(void) /* * Remember that a walreceiver just confirmed receipt of lsn `lsn`. */ -static void +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; @@ -2049,6 +2273,9 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { ReplicationSlotMarkDirty(); ReplicationSlotsComputeRequiredLSN(); + + if (WalSndWakeupNeeded()) + ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv); } /* @@ -2469,6 +2696,7 @@ WalSndLoop(WalSndSendDataCallback send_data) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); SyncRepInitConfig(); + SlotSyncInitConfig(); } /* Check for input from the client */ @@ -3311,6 +3539,8 @@ WalSndShmemInit(void) ConditionVariableInit(&WalSndCtl->wal_flush_cv); ConditionVariableInit(&WalSndCtl->wal_replay_cv); + + ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv); } } diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index d7995931bd..f6e2ec82c1 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -76,6 +76,7 @@ LIBPQWALRECEIVER_CONNECT "Waiting in WAL receiver to establish connection to rem LIBPQWALRECEIVER_RECEIVE "Waiting in WAL receiver to receive data from remote server." SSL_OPEN_SERVER "Waiting for SSL while attempting connection." WAL_SENDER_WAIT_FOR_WAL "Waiting for WAL to be flushed in WAL sender process." +WAL_SENDER_WAIT_FOR_STANDBY_CONFIRMATION "Waiting for physical standby confirmation in WAL sender process." WAL_SENDER_WRITE_DATA "Waiting for any activity when processing replies from WAL receiver in WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4c58574166..76bed072a7 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -4552,6 +4552,20 @@ struct config_string ConfigureNamesString[] = check_debug_io_direct, assign_debug_io_direct, NULL }, + { + {"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY, + gettext_noop("List of streaming replication standby server slot " + "names that logical walsenders waits for."), + gettext_noop("Decoded changes are sent out to plugins by logical " + "walsenders only after specified replication slots " + "confirm receiving WAL."), + GUC_LIST_INPUT | GUC_LIST_QUOTE + }, + &standby_slot_names, + "", + check_standby_slot_names, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d08d55c3fe..014491e06f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -326,6 +326,8 @@ # method to choose sync standbys, number of sync standbys, # and comma-separated list of application_name # from standby(s); '*' = all +#standby_slot_names = '' # streaming replication standby server slot names that + # logical walsenders waits for # - Standby Servers - diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index bac94a338c..72050672c7 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -6595,7 +6595,8 @@ describeSubscriptions(const char *pattern, bool verbose) PGresult *res; printQueryOpt myopt = pset.popt; static const bool translate_columns[] = {false, false, false, false, - false, false, false, false, false, false, false, false, false, false}; + false, false, false, false, false, false, false, false, false, false, + false}; if (pset.sversion < 100000) { @@ -6654,10 +6655,12 @@ describeSubscriptions(const char *pattern, bool verbose) appendPQExpBuffer(&buf, ", suborigin AS \"%s\"\n" ", subpasswordrequired AS \"%s\"\n" - ", subrunasowner AS \"%s\"\n", + ", subrunasowner AS \"%s\"\n" + ", subfailover AS \"%s\"\n", gettext_noop("Origin"), gettext_noop("Password required"), - gettext_noop("Run as owner?")); + gettext_noop("Run as owner?"), + gettext_noop("Enable failover?")); appendPQExpBuffer(&buf, ", subsynccommit AS \"%s\"\n" diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 93742fc6ac..5f065e5c55 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -3302,7 +3302,8 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("binary", "connect", "copy_data", "create_slot", "disable_on_error", "enabled", "origin", "password_required", "run_as_owner", "slot_name", - "streaming", "synchronous_commit", "two_phase"); + "streaming", "synchronous_commit", "two_phase", + "failover"); /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index c92d0631a0..1da2e97486 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11088,17 +11088,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index e0b91eacd2..33f1fe98d5 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -93,6 +93,9 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subrunasowner; /* True if replication should execute as the * subscription owner */ + bool subfailover; /* True if the replication slot should be + * enabled for failover */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -145,6 +148,7 @@ typedef struct Subscription List *publications; /* List of publication names to subscribe to */ char *origin; /* Only publish data originating from the * specified origin */ + bool failover; /* Allow slot to be synchronized for failover */ } Subscription; /* Disallow streaming in-progress transactions. */ diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5f49554ea0..c9334c0d59 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -100,6 +100,15 @@ typedef struct LogicalDecodingContext */ bool twophase_opt_given; + /* + * Does the plugin require that the slot be enabled for failover? + * + * This flag indicates that the plugin passed in an failover + * option so that the logical slot is marked to be synchronized + * to the potential physical standbys. + */ + bool failover; + /* * State for writing output. */ diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index cee209e4cc..f953f9d4b2 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -33,6 +33,7 @@ typedef struct PGOutputData bool messages; bool two_phase; bool publish_no_origin; + bool failover; } PGOutputData; #endif /* PGOUTPUT_H */ diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 758ca79a81..cc6d3978e1 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for physical standbys)? + * Relevant for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -210,6 +216,10 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; +extern PGDLLIMPORT char *standby_slot_names; + +/* Globals */ +extern PGDLLIMPORT List *standby_slot_names_list; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); @@ -218,7 +228,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); @@ -253,4 +263,7 @@ extern void CheckPointReplicationSlots(bool is_shutdown); extern void CheckSlotRequirements(void); extern void CheckSlotPermissions(void); +extern void WaitForStandbyLSN(XLogRecPtr wait_for_lsn); +extern void SlotSyncInitConfig(void); + #endif /* SLOT_H */ diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 281626fa6f..c5808db655 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -187,6 +187,8 @@ typedef struct * prepare time */ char *origin; /* Only publish data originating from the * specified origin */ + bool failover; /* make the replication slot a failover slot + * (sync candidate for physical standbys) */ } logical; } proto; } WalRcvStreamOptions; @@ -356,6 +358,7 @@ typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn, const char *slotname, bool temporary, bool two_phase, + bool failover, CRSSnapshotAction snapshot_action, XLogRecPtr *lsn); @@ -429,8 +432,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd) #define walrcv_send(conn, buffer, nbytes) \ WalReceiverFunctions->walrcv_send(conn, buffer, nbytes) -#define walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) \ - WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, snapshot_action, lsn) +#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \ + WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) #define walrcv_get_backend_pid(conn) \ WalReceiverFunctions->walrcv_get_backend_pid(conn) #define walrcv_exec(conn, exec, nRetTypes, retTypes) \ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 9df7e50f94..1f7483c421 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -14,6 +14,8 @@ #include +#include "access/xlogdefs.h" + /* * What to do with a snapshot in create replication slot command. */ @@ -47,6 +49,8 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); +extern void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void WalSndWaitForStandbyConfirmation(XLogRecPtr wait_for_lsn); /* * Remember that we want to wakeup walsenders later diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 7d919583bd..1b73695bfa 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -113,6 +113,8 @@ typedef struct ConditionVariable wal_flush_cv; ConditionVariable wal_replay_cv; + ConditionVariable wal_confirm_rcv_cv; + WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER]; } WalSndCtlData; diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h index 2a191830a8..6d87f64ab0 100644 --- a/src/include/utils/guc_hooks.h +++ b/src/include/utils/guc_hooks.h @@ -160,5 +160,7 @@ extern bool check_wal_consistency_checking(char **newval, void **extra, extern void assign_wal_consistency_checking(const char *newval, void *extra); extern bool check_wal_segment_size(int *newval, void **extra, GucSource source); extern void assign_wal_sync_method(int new_wal_sync_method, void *extra); +extern bool check_standby_slot_names(char **newval, void **extra, + GucSource source); #endif /* GUC_HOOKS_H */ diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 9d8039684a..3be3ee52fc 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -45,6 +45,7 @@ tests += { 't/037_invalid_database.pl', 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', + 't/050_verify_slot_order.pl', ], }, } diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index 5025d65b1b..a3c3ee3a14 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'}, undef, 'logical slot was actually dropped with DB'); # Test logical slot advancing and its durability. +# Pass failover=true (last-arg), it should not have any impact on advancing. my $logical_slot = 'logical_slot'; $node_primary->safe_psql('postgres', - "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);" + "SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);" ); $node_primary->psql( 'postgres', " diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl new file mode 100644 index 0000000000..119a1b7905 --- /dev/null +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -0,0 +1,158 @@ + +# Copyright (c) 2023, PostgreSQL Global Development Group + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Test primary disallowing specified logical replication slots getting ahead of +# specified physical replication slots. It uses the following set up: +# +# | ----> standby1 (connected via streaming replication) +# | ----> standby2 (connected via streaming replication) +# primary ----- | +# | ----> subscriber1 (connected via logical replication) +# | ----> subscriber2 (connected via logical replication) +# +# Set up is configured in such a way that primary never lets subscriber1 ahead +# of standby1. + +# Create primary +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Configure primary to disallow specified logical replication slot (lsub1_slot) +# getting ahead of specified physical replication slot (sb1_slot). +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb1_slot' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb2_slot');}); + +$primary->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby1->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb1_slot' +)); +$standby1->start; +$primary->wait_for_replay_catchup($standby1); + +# Create another standby +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); +$standby2->append_conf( + 'postgresql.conf', qq( +primary_slot_name = 'sb2_slot' +)); +$standby2->start; +$primary->wait_for_replay_catchup($standby2); + +# Create publication on primary +my $publisher = $primary; +$publisher->safe_psql('postgres', "CREATE PUBLICATION mypub FOR TABLE tab_int;"); +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; + +# Create a subscriber node, wait for sync to complete +my $subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1'); +$subscriber1->init(allows_streaming => 'logical'); +$subscriber1->start; +$subscriber1->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); + +# Create a subscription with failover = true +$subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub1 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub1_slot, failover = true);"); +$subscriber1->wait_for_subscription_sync; + +# Create another subscriber node, wait for sync to complete +my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2'); +$subscriber2->init(allows_streaming => 'logical'); +$subscriber2->start; +$subscriber2->safe_psql('postgres', "CREATE TABLE tab_int (a int PRIMARY KEY);"); +$subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION mysub2 CONNECTION '$publisher_connstr' " + . "PUBLICATION mypub WITH (slot_name = lsub2_slot);"); +$subscriber2->wait_for_subscription_sync; + +# Stop the standby associated with specified physical replication slot so that +# the logical replication slot won't receive changes until the standby comes +# up. +$standby1->stop; + +# Create some data on primary +my $primary_row_count = 10; +my $primary_insert_time = time(); +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# Wait for the standby that's up and running gets the data from primary +$primary->wait_for_replay_catchup($standby2); +my $result = $standby2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby2 gets data from primary"); + +# Wait for the subscription that's up and running and is not enabled for failover. +# It gets the data from primary without waiting for any standbys. +$publisher->wait_for_catchup('mysub2'); +$result = $subscriber2->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber2 gets data from primary"); + +# The subscription that's up and running and is enabled for failover +# doesn't get the data from primary and keeps waiting for the +# standby specified in standby_slot_names. +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = 0 FROM tab_int;"); +is($result, 't', "subscriber1 doesn't get data from primary until standby1 acknowledges changes"); + +# Start the standby specified in standby_slot_names and wait for it to catch +# up with the primary. +$standby1->start; +$primary->wait_for_replay_catchup($standby1); +$result = $standby1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "standby1 gets data from primary"); + +# Now that the standby specified in standby_slot_names is up and running, +# primary must send the decoded changes to subscription enabled for failover +# While the standby was down, this subscriber didn't receive any data from +# primary i.e. the primary didn't allow it to go ahead of standby. +$publisher->wait_for_catchup('mysub1'); +$result = $subscriber1->safe_psql('postgres', + "SELECT count(*) = $primary_row_count FROM tab_int;"); +is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); + +# Alter the subscription to toggle 'failover' and see that +# the change is reflected in the corresponding slot on the primary. +#XXX: Is this the right place for this test? +$result = $publisher->safe_psql('postgres', + "SELECT failover FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';"); +is($result, 'f', "lsub2_slot has failover=false"); +$subscriber2->safe_psql('postgres', + "ALTER SUBSCRIPTION mysub2 SET (failover = true);"); +$publisher->wait_for_catchup('mysub2'); +$result = $publisher->safe_psql('postgres', + "SELECT failover FROM pg_replication_slots WHERE slot_name = 'lsub2_slot';"); +is($result, 't', "altered lsub2_slot's failover option"); + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2c60400ade..1de71a00bc 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflicting - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting) + l.conflicting, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/subscription.out b/src/test/regress/expected/subscription.out index b15eddbff3..bd337d2572 100644 --- a/src/test/regress/expected/subscription.out +++ b/src/test/regress/expected/subscription.out @@ -116,18 +116,18 @@ CREATE SUBSCRIPTION regress_testsub4 CONNECTION 'dbname=regress_doesnotexist' PU WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | none | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub4 SET (origin = any); \dRs+ regress_testsub4 - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN -------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub4 | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub3; @@ -145,10 +145,10 @@ ALTER SUBSCRIPTION regress_testsub CONNECTION 'foobar'; ERROR: invalid connection string syntax: missing "=" after "foobar" in connection info string \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false); @@ -157,10 +157,10 @@ ALTER SUBSCRIPTION regress_testsub SET (slot_name = 'newname'); ALTER SUBSCRIPTION regress_testsub SET (password_required = false); ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | f | t | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (password_required = true); @@ -176,10 +176,10 @@ ERROR: unrecognized subscription parameter: "create_slot" -- ok ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345'); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/12345 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/12345 (1 row) -- ok - with lsn = NONE @@ -188,10 +188,10 @@ ALTER SUBSCRIPTION regress_testsub SKIP (lsn = NONE); ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0'); ERROR: invalid WAL location (LSN): 0/0 \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist2 | 0/0 (1 row) BEGIN; @@ -223,10 +223,10 @@ ALTER SUBSCRIPTION regress_testsub_foo SET (synchronous_commit = foobar); ERROR: invalid value for parameter "synchronous_commit": "foobar" HINT: Available values: local, remote_write, remote_apply, on, off. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+------------------------------+---------- - regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | local | dbname=regress_doesnotexist2 | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+------------------------------+---------- + regress_testsub_foo | regress_subscription_user | f | {testpub2,testpub3} | f | off | d | f | any | t | f | f | local | dbname=regress_doesnotexist2 | 0/0 (1 row) -- rename back to keep the rest simple @@ -255,19 +255,19 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | t | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (binary = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -279,27 +279,27 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | parallel | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (streaming = false); ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication already exists @@ -314,10 +314,10 @@ ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refr ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false); ERROR: publication "testpub1" is already in subscription "regress_testsub" \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub,testpub1,testpub2} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) -- fail - publication used more than once @@ -332,10 +332,10 @@ ERROR: publication "testpub3" is not in subscription "regress_testsub" -- ok - delete publications ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) DROP SUBSCRIPTION regress_testsub; @@ -371,10 +371,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) --fail - alter of two_phase option not supported. @@ -383,10 +383,10 @@ ERROR: unrecognized subscription parameter: "two_phase" -- but can alter streaming when two_phase enabled ALTER SUBSCRIPTION regress_testsub SET (streaming = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -396,10 +396,10 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | on | p | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); @@ -412,18 +412,18 @@ CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUB WARNING: subscription was created, but is not connected HINT: To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription. \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | f | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true); \dRs+ - List of subscriptions - Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Synchronous commit | Conninfo | Skip LSN ------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+--------------------+-----------------------------+---------- - regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | off | dbname=regress_doesnotexist | 0/0 + List of subscriptions + Name | Owner | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Enable failover? | Synchronous commit | Conninfo | Skip LSN +-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+------------------+--------------------+-----------------------------+---------- + regress_testsub | regress_subscription_user | f | {testpub} | f | off | d | t | any | t | f | f | off | dbname=regress_doesnotexist | 0/0 (1 row) ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE); -- 2.34.1