From 7e79be7c4abca20fad1466e58477601bee058af4 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sat, 6 Jan 2024 15:08:57 +0800 Subject: [PATCH v59 2/2] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The logical replication slots on the primary can be synchronized to the hot standby by enabling the failover option during slot creation and setting 'enable_syncslot' on the standby. For the synchronization to work, it is mandatory to have a physical replication slot between the primary and the standby, and hot_standby_feedback must be enabled on the standby. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. The nap time of the worker is tuned according to the activity on the primary. The worker waits for a period of time before the next synchronization, with the duration varying based on whether any slots were updated during the last cycle. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and recreated on the standby in next sync-cycle provided the slot still exists on the primary server. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. The slots synchronization status on the standby can be monitored using 'synced' column of pg_replication_slots view. --- doc/src/sgml/bgworker.sgml | 65 +- doc/src/sgml/config.sgml | 27 +- doc/src/sgml/logicaldecoding.sgml | 34 + doc/src/sgml/system-views.sgml | 16 + src/backend/access/transam/xlog.c | 5 +- src/backend/access/transam/xlogrecovery.c | 15 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 4 + src/backend/postmaster/postmaster.c | 10 + .../libpqwalreceiver/libpqwalreceiver.c | 41 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 12 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1168 +++++++++++++++++ src/backend/replication/slot.c | 32 +- src/backend/replication/slotfuncs.c | 14 +- src/backend/replication/walreceiverfuncs.c | 16 + src/backend/replication/walsender.c | 4 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/tcop/postgres.c | 11 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 6 +- src/include/postmaster/bgworker.h | 1 + src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 17 +- src/include/replication/walreceiver.h | 19 + src/include/replication/worker_internal.h | 10 + .../t/050_standby_failover_slots_sync.pl | 165 ++- src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 2 + 33 files changed, 1686 insertions(+), 37 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index 2c393385a9..a7cfe6c58c 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,18 +114,59 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process; it can be one of - BgWorkerStart_PostmasterStart (start as soon as - postgres itself has finished its own initialization; processes - requesting this are not eligible for database connections), - BgWorkerStart_ConsistentState (start as soon as a consistent state - has been reached in a hot standby, allowing processes to connect to - databases and run read-only queries), and - BgWorkerStart_RecoveryFinished (start as soon as the system has - entered normal read-write state). Note the last two values are equivalent - in a server that's not a hot standby. Note that this setting only indicates - when the processes are to be started; they do not stop when a different state - is reached. + postgres should start the process. Note that this setting + only indicates when the processes are to be started; they do not stop when + a different state is reached. Possible values are: + + + + BgWorkerStart_PostmasterStart + + + BgWorkerStart_PostmasterStart + Start as soon as postgres itself has finished its own initialization; + processes requesting this are not eligible for database connections. + + + + + + BgWorkerStart_ConsistentState + + + BgWorkerStart_ConsistentState + Start as soon as a consistent state has been reached in a hot-standby, + allowing processes to connect to databases and run read-only queries. + + + + + + BgWorkerStart_ConsistentState_HotStandby + + + BgWorkerStart_ConsistentState_HotStandby + Same meaning as BgWorkerStart_ConsistentState but + it is more strict in terms of the server i.e. start the worker only + if it is hot-standby. + + + + + + BgWorkerStart_RecoveryFinished + + + BgWorkerStart_RecoveryFinished + Start as soon as the system has entered normal read-write state. Note + that the BgWorkerStart_ConsistentState and + BgWorkerStart_RecoveryFinished are equivalent + in a server that's not a hot standby. + + + + + diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 61038472c5..bd2d2f871e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4612,8 +4612,13 @@ ANY num_sync ( ) then it is also + necessary to specify dbname in the + primary_conninfo string. This will only be used for + slot synchronization. It is ignored for streaming. This parameter can only be set in the postgresql.conf @@ -4938,6 +4943,24 @@ ANY num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..ac244370f7 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -346,6 +346,40 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_log_standby_snapshot function on the primary. + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot creation and setting + on the standby. For the synchronization + to work, it is mandatory to have a physical replication slot between the + primary and the standby, and hot_standby_feedback must + be enabled on the standby. It's also highly recommended that the said + physical replication slot is named in standby_slot_names + list on the primary, to prevent the subscriber from consuming changes + faster than the hot standby. + + + + The ability to resume logical replication after failover depends upon the + pg_replication_slots.synced + value for the synchronized slots on the standby at the time of failover. + Only persistent slots that have attained synced state as true on the standby + before failover can be used for logical replication after failover. + Temporary slots will be dropped, therefore logical replication for those + slots cannot be resumed. For example, if the synchronized slot could not + become persistent on the standby due to a disabled subscription, then the + subscription cannot be resumed after failover even when it is enabled. + + + + In order to resume logical replication after failover from the synced + logical slots, it is required that 'conninfo' in subscriptions are altered + to point to the new primary server using + ALTER SUBSCRIPTION ... CONNECTION. + It is recommended that subscriptions are first disabled before promoting + the standby and are enabled back once these are altered as above after + failover. + + Replication slots persist across crashes and know nothing about the state diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 1868b95836..64e5112810 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2566,6 +2566,22 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + synced bool + + + True if this logical slot was synced from a primary server. + + + On a hot standby, the slots with the synced column marked as true can + neither be used for logical decoding nor dropped by the user. The value + of this column has no meaning on the primary server; the column value on + the primary is default false for all slots but may (if leftover from a + promoted standby) also be true. + + diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 478377c4a2..2d66d0d84b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -3596,6 +3596,9 @@ XLogGetLastRemovedSegno(void) /* * Return the oldest WAL segment on the given TLI that still exists in * XLOGDIR, or 0 if none. + * + * If the given TLI is 0, return the oldest WAL segment among all the currently + * existing WAL segments. */ XLogSegNo XLogGetOldestSegno(TimeLineID tli) @@ -3619,7 +3622,7 @@ XLogGetOldestSegno(TimeLineID tli) wal_segment_size); /* Ignore anything that's not from the TLI of interest. */ - if (tli != file_tli) + if (tli != 0 && tli != file_tli) continue; /* If it's the oldest so far, update oldest_segno. */ diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 1b48d7171a..d4688b9a02 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1441,6 +1442,20 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. + * + * We do not update the 'synced' column from true to false here, as any + * failed update could leave some slot's 'synced' column as false. This + * could cause issues during slot sync after restarting the server as a + * standby. While updating after switching to the new timeline is an + * option, it does not simplify the handling for 'synced' column. + * Therefore, we retain the 'synced' column as true after promotion as they + * can provide useful information about their origin. + */ + ShutDownSlotSync(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index e43a93739d..ad57bade07 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflict_reason, - L.failover + L.failover, + L.synced FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 67f92c24db..46828b8a89 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -21,6 +21,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" +#include "replication/worker_internal.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -129,6 +130,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index feb471dd1d..d90d5d1576 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -116,6 +116,7 @@ #include "postmaster/walsummarizer.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -1010,6 +1011,12 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + /* + * Register the slot sync worker here to kick start slot-sync operation + * sooner on the physical standby. + */ + SlotSyncWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ @@ -5799,6 +5806,9 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; + if (start_time == BgWorkerStart_ConsistentState_HotStandby && + pmState != PM_RUN) + return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f18a04d8a4..f910a3b103 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -418,6 +421,44 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index ca09c683f1..5aefb10ecb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical" + " decoding", NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot.")); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index 1050eb2c09..3dec36a6de 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..2625f4d1c6 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1168 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2024, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * While creating the slot on physical standby, if the local restart_lsn and/or + * local catalog_xmin is ahead of those on the remote then the worker cannot + * create the local slot in sync with the primary server because that would + * mean moving the local slot backwards and the standby might not have WALs + * retained for old LSN. In this case, the worker will mark the slot as + * RS_TEMPORARY. Once the primary server catches up, it will move the slot to + * RS_PERSISTENT and will perform the sync periodically. + * + * The worker also takes care of dropping the slots which were created by it + * and are currently not needed to be synchronized. + * + * It waits for a period of time before the next synchronization, with the + * duration varying based on whether any slots were updated during the last + * cycle. Refer to the comments above sleep_quanta and wait_for_slot_activity() + * for more details. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlog_internal.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by startup process in order to + * shut it down during promotion. + */ +typedef struct SlotSyncWorkerCtxStruct +{ + pid_t pid; + slock_t mutex; +} SlotSyncWorkerCtxStruct; + +SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; + +/* GUC variable */ +bool enable_syncslot = false; + +/* + * When no slots got updated in the last cycle, we sleep for a number of + * milliseconds that is a integer multiple of MS_PER_SLEEP_QUANTUM. This is the + * multiplier. It should vary between 1 and MAX_SLEEP_QUANTA, depending on + * system activity. See wait_for_slot_activity() for how we adjust this. + */ +static long sleep_quanta = 1; + +/* + * The sleep time will always be a multiple of 200ms and will not exceed + * thirty seconds (150 * 200 = 30 * 1000). + */ +#define MAX_SLEEP_QUANTA 150 +#define MS_PER_SLEEP_QUANTUM 200 + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); +} + +/* + * Helper function for drop_obsolete_slots() + * + * Drops synced slot identified by the passed in name. + */ +static void +drop_synced_slots_internal(const char *name) +{ + Assert(MyReplicationSlot == NULL); + + ReplicationSlotAcquire(name, true); + + Assert(MyReplicationSlot->data.synced); + + ReplicationSlotDropAcquired(); +} + +/* + * Get list of local logical slots which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slots(void) +{ + List *local_slots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && s->data.synced) + { + local_slots = lappend(local_slots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return local_slots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if the slot on the standby server was invalidated while the + * corresponding remote slot in the list remained valid. If found so, it sets + * the locally_invalidated flag to true. + */ +static bool +check_sync_slot_on_remote(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *lc; + + foreach(lc, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped slots will get recreated in next + * sync-cycle and it is okay to drop and recreate such slots as long as these + * are not consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slots = NIL; + ListCell *lc; + + local_slots = get_local_synced_slots(); + + foreach(lc, local_slots) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc); + bool remote_exists = false; + bool locally_invalidated = false; + + remote_exists = check_sync_slot_on_remote(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!remote_exists || locally_invalidated) + { + drop_synced_slots_internal(NameStr(local_slot->data.name)); + + ereport(LOG, + errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database)); + } + } +} + +/* + * Reserve WAL for the currently active slot using the specified WAL location + * (restart_lsn). + * + * If the given WAL location has been removed, reserve WAL using the oldest + * existing WAL segment. + */ +static void +reserve_wal_for_slot(XLogRecPtr restart_lsn) +{ + XLogSegNo oldest_segno; + XLogSegNo segno; + ReplicationSlot *slot = MyReplicationSlot; + + Assert(slot != NULL); + Assert(slot->data.restart_lsn == InvalidXLogRecPtr); + + while (true) + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = restart_lsn; + SpinLockRelease(&slot->mutex); + + /* Prevent WAL removal as fast as possible */ + ReplicationSlotsComputeRequiredLSN(); + + XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size); + + /* + * Find the oldest existing WAL segment file. + * + * Normally, we can determine it by using the last removed segment + * number. However, if no WAL segment files have been removed by a + * checkpoint since startup, we need to search for the oldest segment + * file currently existing in XLOGDIR. + */ + oldest_segno = XLogGetLastRemovedSegno() + 1; + + if (oldest_segno == 1) + oldest_segno = XLogGetOldestSegno(0); + + /* + * If all required WAL is still there, great, otherwise retry. The + * slot should prevent further removal of WAL, unless there's a + * concurrent ReplicationSlotsComputeRequiredLSN() after we've written + * the new restart_lsn above, so normally we should never need to loop + * more than twice. + */ + if (segno >= oldest_segno) + break; + + /* Retry using the location of the oldest wal segment */ + XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn); + } +} + +/* + * Update the LSNs and persist the slot for further syncs if the remote + * restart_lsn and catalog_xmin have caught up with the local ones. Otherwise, + * persist the slot and return. + * + * Return true if the slot is marked READY, otherwise false. + */ +static bool +update_and_persist_slot(RemoteSlot *remote_slot) +{ + ReplicationSlot *slot = MyReplicationSlot; + + /* + * Check if the primary server has caught up. Refer to the comment atop the + * file for details on this check. + * + * We also need to check if remote_slot's confirmed_lsn becomes valid. It + * is possible to get null values for confirmed_lsn and catalog_xmin if on + * the primary server the slot is just created with a valid restart_lsn and + * slot-sync worker has fetched the slot before the primary server could + * set valid confirmed_lsn and catalog_xmin. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn || + XLogRecPtrIsInvalid(remote_slot->confirmed_lsn) || + TransactionIdPrecedes(remote_slot->catalog_xmin, + slot->data.catalog_xmin)) + { + /* + * The remote slot didn't catch up to locally reserved position. + * + * We do not drop the slot because the restart_lsn can be ahead of the + * current location when recreating the slot in the next cycle. It may + * take more time to create such a slot. Therefore, we keep this slot + * and attempt the wait and synchronization in the next cycle. + */ + return false; + } + + local_slot_update(remote_slot); + + if (slot->data.persistency == RS_TEMPORARY) + ReplicationSlotPersist(); + else + { + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + } + + ereport(LOG, + errmsg("newly locally created slot \"%s\" is sync-ready now", + remote_slot->name)); + + return true; +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The slot is created as a temporary slot and stays in same state until the + * initialization is complete. The initialization is considered to be completed + * once the remote_slot catches up with locally reserved position and local + * slot is updated. The slot is then persisted. + * + * Returns TRUE if the local slot is updated. + */ +static bool +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot) +{ + ReplicationSlot *slot; + bool slot_updated = false; + XLogRecPtr latestWalEnd; + + /* + * Sanity check: Make sure that concerned WAL is received before syncing + * slot to target lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. + */ + latestWalEnd = GetWalRcvLatestWalEnd(); + if (remote_slot->confirmed_lsn > latestWalEnd) + { + elog(ERROR, "exiting from slot synchronization as the received slot sync" + " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X", + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + remote_slot->name, + LSN_FORMAT_ARGS(latestWalEnd)); + } + + /* Search for the named slot */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + bool synced; + + SpinLockAcquire(&slot->mutex); + synced = slot->data.synced; + SpinLockRelease(&slot->mutex); + + /* User created slot with the same name exists, raise ERROR. */ + if (!synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("exiting from slot synchronization on receiving" + " the failover slot \"%s\" from the primary server", + remote_slot->name), + errdetail("A user-created slot with the same name already" + " exists on the standby.")); + + /* + * Slot created by the slot sync worker exists, sync it. + * + * It is important to acquire the slot here before checking + * invalidation. If we don't acquire the slot first, there could be a + * race condition that the local slot could be invalidated just after + * checking the 'invalidated' flag here and we could end up + * overwriting 'invalidated' flag to remote_slot's value. See + * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly + * if the slot is not acquired by other processes. + */ + ReplicationSlotAcquire(remote_slot->name, true); + + Assert(slot == MyReplicationSlot); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (slot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&slot->mutex); + + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + + /* Skip the sync of an invalidated slot */ + if (slot->data.invalidated != RS_INVAL_NONE) + { + ReplicationSlotRelease(); + return slot_updated; + } + + /* Slot not ready yet, let's attempt to make it sync-ready now. */ + if (slot->data.persistency == RS_TEMPORARY) + { + slot_updated = update_and_persist_slot(remote_slot); + } + /* Slot ready for sync, so sync it. */ + else + { + /* + * Sanity check: With hot_standby_feedback enabled and + * invalidations handled appropriately as above, this should never + * happen. + */ + if (remote_slot->restart_lsn < slot->data.restart_lsn) + elog(ERROR, + "cannot synchronize local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization" + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(slot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn)); + + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush || + remote_slot->restart_lsn != slot->data.restart_lsn || + remote_slot->catalog_xmin != slot->data.catalog_xmin) + { + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + + /* Make sure the slot changes persist across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + slot_updated = true; + } + } + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + + /* Skip creating the local slot if remote_slot is invalidated already */ + if (remote_slot->invalidated != RS_INVAL_NONE) + return false; + + /* Ensure that we have transaction env needed by get_database_oid() */ + Assert(IsTransactionState()); + + ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, + remote_slot->two_phase, + remote_slot->failover, + true); + + /* For shorter lines. */ + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + reserve_wal_for_slot(remote_slot->restart_lsn); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + (void) update_and_persist_slot(remote_slot); + slot_updated = true; + } + + ReplicationSlotRelease(); + + return slot_updated; +} + +/* + * Maps the pg_replication_slots.conflict_reason text value to + * ReplicationSlotInvalidationCause enum value + */ +static ReplicationSlotInvalidationCause +get_slot_invalidation_cause(char *conflict_reason) +{ + Assert(conflict_reason); + + if (strcmp(conflict_reason, SLOT_INVAL_WAL_REMOVED_TEXT) == 0) + return RS_INVAL_WAL_REMOVED; + else if (strcmp(conflict_reason, SLOT_INVAL_HORIZON_TEXT) == 0) + return RS_INVAL_HORIZON; + else if (strcmp(conflict_reason, SLOT_INVAL_WAL_LEVEL_TEXT) == 0) + return RS_INVAL_WAL_LEVEL; + else + Assert(0); + + /* Keep compiler quiet */ + return RS_INVAL_NONE; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and updates + * the slots locally. Creates the slots if not present on the standby. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *tupslot; + StringInfoData s; + List *remote_slot_list = NIL; + ListCell *lc; + bool some_slot_updated = false; + XLogRecPtr latestWalEnd; + + /* + * The primary_slot_name is not set yet or WALs not received yet. + * Synchronization is not possible if the walreceiver is not started. + */ + latestWalEnd = GetWalRcvLatestWalEnd(); + SpinLockAcquire(&WalRcv->mutex); + if ((WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return false; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + initStringInfo(&s); + + /* Construct query to fetch slots with failover enabled. */ + appendStringInfo(&s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, conflict_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch failover logical slots info" + " from the primary server: %s", res->err)); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = !slot_attisnull(tupslot, 3) ? + DatumGetLSN(slot_getattr(tupslot, 3, &isnull)) : + InvalidXLogRecPtr; + + remote_slot->restart_lsn = !slot_attisnull(tupslot, 4) ? + DatumGetLSN(slot_getattr(tupslot, 4, &isnull)) : + InvalidXLogRecPtr; + + remote_slot->catalog_xmin = !slot_attisnull(tupslot, 5) ? + DatumGetTransactionId(slot_getattr(tupslot, 5, &isnull)) : + InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(tupslot, 6, &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(tupslot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(tupslot, + 8, &isnull)); + Assert(!isnull); + + remote_slot->invalidated = !slot_attisnull(tupslot, 9) ? + get_slot_invalidation_cause(TextDatumGetCString(slot_getattr(tupslot, 9, &isnull))) : + RS_INVAL_NONE; + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(tupslot); + } + + /* Drop local slots that no longer need to be synced. */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach(lc, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(lc); + + some_slot_updated |= synchronize_one_slot(wrconn, remote_slot); + } + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + CommitTransactionCommand(); + + return some_slot_updated; +} + +/* + * Checks the primary server info. + * + * Using the specified primary server connection, check whether we are a + * cascading standby. It also validates primary_slot_name for non-cascading + * standbys. + */ +static void +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +{ +#define PRIMARY_INFO_OUTPUT_COL_COUNT 2 + WalRcvExecResult *res; + Oid slotRow[PRIMARY_INFO_OUTPUT_COL_COUNT] = {BOOLOID, BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *tupslot; + bool valid; + bool remote_in_recovery; + + /* The syscache access in walrcv_exec() needs a transaction env. */ + StartTransactionCommand(); + + Assert(am_cascading_standby != NULL); + + *am_cascading_standby = false; /* overwritten later if cascading */ + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_is_in_recovery(), count(*) = 1" + " FROM pg_replication_slots" + " WHERE slot_type='physical' AND slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, PRIMARY_INFO_OUTPUT_COL_COUNT, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not fetch primary_slot_name \"%s\" info from the" + " primary server: %s", PrimarySlotName, res->err)); + + tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + (void) tuplestore_gettupleslot(res->tuplestore, true, false, tupslot); + + /* It must return one tuple */ + Assert(tuplestore_tuple_count(res->tuplestore) == 1); + + remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull)); + Assert(!isnull); + + if (remote_in_recovery) + { + /* No need to check further, just set am_cascading_standby to true */ + *am_cascading_standby = true; + } + else + { + /* We are a normal standby */ + valid = DatumGetBool(slot_getattr(tupslot, 2, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("exiting from slot synchronization due to bad configuration"), + /* translator: second %s is a GUC variable name */ + errdetail("The primary server slot \"%s\" specified by %s is not valid.", + PrimarySlotName, "primary_slot_name")); + } + + ExecClearTuple(tupslot); + walrcv_clear_result(res); + CommitTransactionCommand(); +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, raise an ERROR. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static char * +validate_parameters_and_get_dbname(void) +{ + char *dbname; + + /* Sanity check. */ + Assert(enable_syncslot); + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_slot_name")); + + /* + * hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be enabled.", "hot_standby_feedback")); + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("wal_level must be >= logical.")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) + ereport(ERROR, + /* translator: %s is a GUC variable name */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("%s must be defined.", "primary_conninfo")); + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + ereport(ERROR, + + /* + * translator: 'dbname' is a specific option; %s is a GUC variable + * name + */ + errmsg("exiting from slot synchronization due to bad configuration"), + errhint("'dbname' must be specified in %s.", "primary_conninfo")); + + return dbname; +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs have changed, exit the worker and + * let it get restarted by the postmaster. + */ +static void +slotsync_reread_config(WalReceiverConn *wrconn) +{ + char *old_primary_conninfo = pstrdup(PrimaryConnInfo); + char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_hot_standby_feedback = hot_standby_feedback; + bool conninfo_changed; + bool primary_slotname_changed; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; + primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + + if (conninfo_changed || + primary_slotname_changed || + (old_hot_standby_feedback != hot_standby_feedback)) + { + ereport(LOG, + errmsg("slot sync worker will restart because of" + " a parameter change")); + /* The exit code 1 will make postmaster restart this worker */ + proc_exit(1); + } + + pfree(old_primary_conninfo); + pfree(old_primary_slotname); +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + walrcv_disconnect(wrconn); + ereport(LOG, + errmsg("replication slot sync worker is shutting down" + " on receiving SIGINT")); + proc_exit(0); + } + + if (ConfigReloadPending) + slotsync_reread_config(wrconn); +} + +/* + * Cleanup function for logical replication launcher. + * + * Called on logical replication launcher exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = InvalidPid; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Sleep for long enough that we believe it's likely that the slots on primary + * get updated. + */ +static void +wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) +{ + int rc; + + if (am_cascading_standby) + { + /* + * Slot synchronization is currently not supported on cascading + * standby. So if we are on the cascading standby, we will skip the + * sync and take a longer nap before we check again whether we are + * still cascading standby or not. + */ + sleep_quanta = MAX_SLEEP_QUANTA; + } + else if (!some_slot_updated) + { + /* + * No slots were updated, so double the sleep time, but not beyond the + * maximum allowable value. + */ + sleep_quanta = Min(sleep_quanta * 2, MAX_SLEEP_QUANTA); + } + else + { + /* + * Some slots were updated since the last sleep, so reset the sleep + * time. + */ + sleep_quanta = 1; + } + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + sleep_quanta * MS_PER_SLEEP_QUANTUM, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + bool am_cascading_standby; + char *err; + + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + SpinLockAcquire(&SlotSyncWorker->mutex); + + Assert(SlotSyncWorker->pid == InvalidPid); + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + dbname = validate_parameters_and_get_dbname(); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, NULL, 0); + + /* + * Establish the connection to the primary server for slots + * synchronization. + */ + wrconn = walrcv_connect(PrimaryConnInfo, true, false, + cluster_name[0] ? cluster_name : "slotsyncworker", + &err); + if (wrconn == NULL) + ereport(ERROR, + errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err)); + + /* + * Using the specified primary server connection, check whether we are + * cascading standby and validates primary_slot_name for + * non-cascading-standbys. + */ + check_primary_info(wrconn, &am_cascading_standby); + + /* Main wait loop */ + for (;;) + { + bool some_slot_updated = false; + + ProcessSlotSyncInterrupts(wrconn); + + if (!am_cascading_standby) + some_slot_updated = synchronize_slots(wrconn); + + wait_for_slot_activity(some_slot_updated, am_cascading_standby); + + /* + * If the standby was promoted then what was previously a cascading + * standby might no longer be one, so recheck each time. + */ + if (am_cascading_standby) + check_primary_info(wrconn, &am_cascading_standby); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} + +/* + * Is current process the slot sync worker? + */ +bool +IsLogicalSlotSyncWorker(void) +{ + return SlotSyncWorker->pid == MyProcPid; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + if (SlotSyncWorker->pid == InvalidPid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + + kill(SlotSyncWorker->pid, SIGINT); + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Wait for it to die */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (SlotSyncWorker->pid == InvalidPid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slot sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtxStruct); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtxStruct *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SlotSyncWorker->pid = InvalidPid; + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +/* + * Register the background worker for slots synchronization provided + * enable_syncslot is ON. + */ +void +SlotSyncWorkerRegister(void) +{ + BackgroundWorker bgw; + + if (!enable_syncslot) + { + ereport(LOG, + errmsg("skipping slot synchronization"), + errdetail("enable_syncslot is disabled.")); + return; + } + + memset(&bgw, 0, sizeof(bgw)); + + /* We need database connection which needs shared-memory access as well */ + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Start as soon as a consistent state has been reached in a hot standby */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; + + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot sync worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "slot sync worker"); + + bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 696376400e..33f957b02f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -103,7 +104,6 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ static void ReplicationSlotShmemExit(int code, Datum arg); -static void ReplicationSlotDropAcquired(void); static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ @@ -250,11 +250,12 @@ ReplicationSlotValidateName(const char *name, int elevel) * user will only get commit prepared. * failover: If enabled, allows the slot to be synced to physical standbys so * that logical replication can be resumed after failover. + * synced: True if the slot is created by a slotsync worker. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, bool synced) { ReplicationSlot *slot = NULL; int i; @@ -315,6 +316,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.synced = synced; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -680,6 +682,16 @@ ReplicationSlotDrop(const char *name, bool nowait) ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + ReplicationSlotDropAcquired(); } @@ -699,6 +711,16 @@ ReplicationSlotAlter(const char *name, bool failover) errmsg("cannot use %s with a physical replication slot", "ALTER_REPLICATION_SLOT")); + /* + * Do not allow users to alter the slots which are currently being synced + * from the primary to the standby. + */ + if (RecoveryInProgress() && MyReplicationSlot->data.synced) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot alter replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary server.")); + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.failover = failover; SpinLockRelease(&MyReplicationSlot->mutex); @@ -711,7 +733,7 @@ ReplicationSlotAlter(const char *name, bool failover) /* * Permanently drop the currently acquired replication slot. */ -static void +void ReplicationSlotDropAcquired(void) { ReplicationSlot *slot = MyReplicationSlot; @@ -867,8 +889,8 @@ ReplicationSlotMarkDirty(void) } /* - * Convert a slot that's marked as RS_EPHEMERAL to a RS_PERSISTENT slot, - * guaranteeing it will be there after an eventual crash. + * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a + * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash. */ void ReplicationSlotPersist(void) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index eb685089b3..338d092e84 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -43,7 +43,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, false); if (immediately_reserve) { @@ -136,7 +136,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, false); /* * Create logical decoding context to find start point or, if we don't @@ -237,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -418,21 +418,23 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) break; case RS_INVAL_WAL_REMOVED: - values[i++] = CStringGetTextDatum("wal_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_REMOVED_TEXT); break; case RS_INVAL_HORIZON: - values[i++] = CStringGetTextDatum("rows_removed"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_HORIZON_TEXT); break; case RS_INVAL_WAL_LEVEL: - values[i++] = CStringGetTextDatum("wal_level_insufficient"); + values[i++] = CStringGetTextDatum(SLOT_INVAL_WAL_LEVEL_TEXT); break; } } values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = BoolGetDatum(slot_contents.data.synced); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 73a7d8f96c..d420a833cd 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -345,6 +345,22 @@ GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) return recptr; } +/* + * Returns the latest reported end of WAL on the sender + */ +XLogRecPtr +GetWalRcvLatestWalEnd() +{ + WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = walrcv->latestWalEnd; + SpinLockRelease(&walrcv->mutex); + + return recptr; +} + /* * Returns the last+1 byte position that walreceiver has written. * This returns a recently written value without taking a lock. diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 77c8baa32a..c692ac3569 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1224,7 +1224,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, false); if (reserve_wal) { @@ -1255,7 +1255,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, false); /* * Do options check early so that we can bail before calling the diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index e5119ed55d..04fed1007e 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -38,6 +38,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -342,6 +343,7 @@ CreateOrAttachShmemStructs(void) WalSummarizerShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 1eaaf3c6c5..19b08c1b5f 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,6 +3286,17 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsLogicalSlotSyncWorker()) + { + elog(DEBUG1, + "replication slot sync worker is shutting down due to administrator command"); + + /* + * Slot sync worker can be stopped at any time. Use exit status 1 + * so the background worker is restarted. + */ + proc_exit(1); + } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f625473ad4..0879bab57e 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index e53ebc6dc2..0f5ec63de1 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -68,6 +68,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2044,6 +2045,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index b2809c711a..136be912e6 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -361,6 +361,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index f40726c4f7..c43d7c06f0 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11115,9 +11115,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover,synced}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 22fc49ec27..7092fc72c6 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,6 +79,7 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, + BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index a18d79d1b2..bbe04226db 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -22,6 +22,7 @@ extern void TablesyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); +extern bool IsLogicalSlotSyncWorker(void); extern void HandleParallelApplyMessageInterrupt(void); extern void HandleParallelApplyMessages(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 585ccbb504..f81bef9e42 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* + * The possible values for 'conflict_reason' returned in + * pg_get_replication_slots. + */ +#define SLOT_INVAL_WAL_REMOVED_TEXT "wal_removed" +#define SLOT_INVAL_HORIZON_TEXT "rows_removed" +#define SLOT_INVAL_WAL_LEVEL_TEXT "wal_level_insufficient" + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +120,11 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Was this slot synchronized from the primary server? + */ + char synced; + /* * Is this a failover slot (sync candidate for physical standbys)? Only * relevant for logical slots on the primary server. @@ -224,9 +237,11 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + bool synced); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f566a99ba1..5e942cb4fc 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -279,6 +279,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +418,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -428,6 +444,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ @@ -485,6 +503,7 @@ extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, bool create_temp_slot); extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); extern XLogRecPtr GetWalRcvWriteRecPtr(void); +extern XLogRecPtr GetWalRcvLatestWalEnd(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..2167720971 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool enable_syncslot; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -325,6 +330,11 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void ReplSlotSyncWorkerMain(Datum main_arg); +extern void SlotSyncWorkerRegister(void); +extern void ShutDownSlotSync(void); +extern void SlotSyncWorkerShmemInit(void); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index 646293c39e..aeeb19a052 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -84,6 +84,169 @@ is( $publisher->safe_psql( "t", 'logical slot has failover true on the publisher'); -$subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub1"); +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); + +################################################## +# Test logical failover slots on the standby +# Configure standby1 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb1_slot--->| ----> standby1 (connected via streaming replication) +# | lsub1_slot(synced_slot) +################################################## + +my $primary = $publisher; +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Create a standby +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby1->append_conf( + 'postgresql.conf', qq( +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +my $standby1_conninfo = $standby1->connstr . ' dbname=postgres'; + +# Wait for the standby to start sync +$standby1->start; + +# Generate a log to trigger the walsender to send messages to the walreceiver +# which will update WalRcv->latestWalEnd to a valid number. +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +my $offset = -s $standby1->logfile; +$standby1->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? newly locally created slot \"lsub1_slot\" is sync-ready now/, + $offset); + +# Confirm that logical failover slot is created on the standby and is sync +# ready. +is($standby1->safe_psql('postgres', + q{SELECT failover, synced FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + "t|t", + 'logical slot has failover as true and synced as true on standby'); + +################################################## +# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot +# on the primary is synced to the standby +################################################## + +# Insert data on the primary +$primary->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + INSERT INTO tab_int SELECT generate_series(1, 10); +]); + +$subscriber1->safe_psql( + 'postgres', qq[ + CREATE TABLE tab_int (a int PRIMARY KEY); + ALTER SUBSCRIPTION regress_mysub1 REFRESH PUBLICATION; +]); + +$subscriber1->wait_for_subscription_sync; + +# Do not allow any further advancement of the restart_lsn and +# confirmed_flush_lsn for the lsub1_slot. +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); + +# Wait for the replication slot to become inactive on the publisher +$primary->poll_query_until( + 'postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", + 1); + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_restart_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +my $primary_flush_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn and of confirmed_flush_lsn lsub1_slot slot are synced +# to the standby +ok( $standby1->poll_query_until( + 'postgres', + "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"), + 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + +################################################## +# Test that a synchronized slot can not be decoded, altered or dropped by the user +################################################## + +# Disable hot_standby_feedback temporarily to stop slot sync worker otherwise +# the concerned testing scenarios here may be interrupted by different error: +# 'ERROR: replication slot is active for PID ..' + +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = off;'); +$standby1->restart; + +# Attempting to perform logical decoding on a synced slot should result in an error +my ($result, $stdout, $stderr) = $standby1->psql('postgres', + "select * from pg_logical_slot_get_changes('lsub1_slot',NULL,NULL);"); +ok($stderr =~ /ERROR: cannot use replication slot "lsub1_slot" for logical decoding/, + "logical decoding is not allowed on synced slot"); + +# Attempting to alter a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql( + 'postgres', + qq[ALTER_REPLICATION_SLOT lsub1_slot (failover);], + replication => 'database'); +ok($stderr =~ /ERROR: cannot alter replication slot "lsub1_slot"/, + "synced slot on standby cannot be altered"); + +# Attempting to drop a synced slot should result in an error +($result, $stdout, $stderr) = $standby1->psql('postgres', + "SELECT pg_drop_replication_slot('lsub1_slot');"); +ok($stderr =~ /ERROR: cannot drop replication slot "lsub1_slot"/, + "synced slot on standby cannot be dropped"); + +# Enable hot_standby_feedback and restart standby +$standby1->safe_psql('postgres', 'ALTER SYSTEM SET hot_standby_feedback = on;'); +$standby1->restart; + +################################################## +# Promote the standby1 to primary. Confirm that: +# a) the slot 'lsub1_slot' is retained on the new primary +# b) logical replication for regress_mysub1 is resumed successfully after failover +################################################## +$standby1->promote; + +# Update subscription with the new primary's connection info +$subscriber1->safe_psql('postgres', + "ALTER SUBSCRIPTION regress_mysub1 CONNECTION '$standby1_conninfo'; + ALTER SUBSCRIPTION regress_mysub1 ENABLE; "); + +is($standby1->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';}), + 'lsub1_slot', + 'synced slot retained on the new primary'); + +# Insert data on the new primary +$standby1->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(11, 20);"); +$standby1->wait_for_catchup('regress_mysub1'); + +# Confirm that data in tab_int replicated on subscriber +is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}), + "20", + 'data replicated from the new primary'); done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index acc2339b49..ae687531d2 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflict_reason, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) + l.failover, + l.synced + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover, synced) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..aac83755de 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9b67986914..766b1bf6c8 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2321,6 +2321,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2581,6 +2582,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtxStruct SlruCtl SlruCtlData SlruErrorCause -- 2.30.0.windows.2