From 5fa7276cccae7db7c244149398cb56fe0ce705cf Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 11 Dec 2023 10:01:17 +0800 Subject: [PATCH v44 2/3] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The nap time of worker is tuned according to the activity on the primary. The worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. And if activity is observed again, nap time is reduced back to 10ms. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby, then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). This situation may occur due to the following reasons: - The max_slot_wal_keep_size on the standby is insufficient to retain WAL records from the restart_lsn of the slot. - primary_slot_name is temporarily reset to null and the physical slot is removed. - The primary changes wal_level to a level lower than logical. Slots synced on the standby can be identified using 'sync_state' column of pg_replication_slots view. The values are: 'n': none for user slots, 'i': sync initiated for the slot but waiting for the remote slot on the primary server to catch up. 'r': ready for periodic syncs. --- doc/src/sgml/bgworker.sgml | 16 +- doc/src/sgml/config.sgml | 35 +- doc/src/sgml/logicaldecoding.sgml | 34 + doc/src/sgml/system-views.sgml | 25 + src/backend/access/transam/xlogrecovery.c | 10 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 4 + src/backend/postmaster/postmaster.c | 6 + .../libpqwalreceiver/libpqwalreceiver.c | 42 + src/backend/replication/logical/Makefile | 1 + src/backend/replication/logical/logical.c | 20 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1329 +++++++++++++++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/logical/worker.c | 15 +- src/backend/replication/slot.c | 20 +- src/backend/replication/slotfuncs.c | 37 +- src/backend/replication/walsender.c | 6 +- src/backend/storage/ipc/ipci.c | 2 + src/backend/tcop/postgres.c | 12 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 10 +- src/include/commands/subscriptioncmds.h | 4 + src/include/postmaster/bgworker.h | 1 + src/include/replication/slot.h | 22 +- src/include/replication/walreceiver.h | 19 + src/include/replication/worker_internal.h | 12 + .../t/050_standby_failover_slots_sync.pl | 127 ++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 2 + 33 files changed, 1808 insertions(+), 29 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index 2c393385a9..377f301b63 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -121,11 +121,17 @@ typedef struct BackgroundWorker BgWorkerStart_ConsistentState (start as soon as a consistent state has been reached in a hot standby, allowing processes to connect to databases and run read-only queries), and - BgWorkerStart_RecoveryFinished (start as soon as the system has - entered normal read-write state). Note the last two values are equivalent - in a server that's not a hot standby. Note that this setting only indicates - when the processes are to be started; they do not stop when a different state - is reached. + BgWorkerStart_RecoveryFinished (start as soon as the system + has entered normal read-write state. Note that the + BgWorkerStart_ConsistentState and + BgWorkerStart_RecoveryFinished are equivalent + in a server that's not a hot standby), and + BgWorkerStart_ConsistentState_HotStandby (same meaning as + BgWorkerStart_ConsistentState but it is more strict in + terms of the server i.e. start the worker only if it is hot-standby; if it is + consistent state in non-standby, worker will not be started). Note that this + setting only indicates when the processes are to be started; they do not stop + when a different state is reached. diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 30d9b53e03..1977962caf 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4373,6 +4373,12 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. + + The standbys corresponding to the physical replication slots in + standby_slot_names must enable + enable_syncslot for the standbys to receive + failover logical slots changes from the primary. + @@ -4566,10 +4572,15 @@ ANY num_sync ( num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..0defa0ec2b 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -346,6 +346,40 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_log_standby_snapshot function on the primary. + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot creation and set + enable_syncslot on the standby. For the synchronization + to work, it is mandatory to have physical replication slot between the + primary and the standby. This physical replication slot for the standby + should be listed in standby_slot_names on the primary + to prevent the subscriber from consuming changes faster than the hot + standby. Additionally, similar to creating a logical replication slot + on the hot standby, hot_standby_feedback should be + set on the standby and a physical slot between the primary and the standby + should be used. + + + + By enabling synchronization of slots, logical replication can be resumed + after failover depending upon the + pg_replication_slots.sync_state + for the synchronized slots on the standby at the time of failover. + The slots which were in ready sync_state ('r') on the standby before + failover can be used for logical replication after failover. However, + the slots which were in initiated sync_state ('i) and were not + sync-ready ('r') at the time of failover will be dropped and logical + replication for such slots can not be resumed after failover. This applies + to the case where a logical subscription is disabled before failover and is + enabled after failover. If the synchronized slot due to disabled + subscription could not be made sync-ready ('r') on standby, then the + subscription can not be resumed after failover even when enabled. + If the primary is idle, making the synchronized slot on the standby + as sync-ready ('r') for enabled subscription may take noticeable time. + This can be sped up by calling the + pg_log_standby_snapshot function on the primary. + + Replication slots persist across crashes and know nothing about the state diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 1dc695fd3a..3f6e2f82c8 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2543,6 +2543,31 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + sync_state char + + + Defines slot synchronization state. This is meaningful on the physical + standby which has enabled slots synchronization. + + + State code: + n = none for user created slots, + i = sync initiated for the slot but slot is not ready + yet for periodic syncs, + r = ready for periodic syncs. + + + The hot standby can have any of these sync_state for the slots but on a + hot standby, the slots with state 'r' and 'i' can neither be used for logical + decoded nor dropped by the user. The primary server will have sync_state + as 'n' for all the slots. But if the standby is promoted to become the + new primary server, sync_state can be seen 'r' as well. On this new + primary server, slots with sync_state as 'r' and 'n' will behave the same. + + diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index c61566666a..9fb09ba0fc 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -50,6 +50,7 @@ #include "postmaster/startup.h" #include "replication/slot.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1435,6 +1436,15 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. Additionally, + * drop any slots that have initiated but not yet completed the sync + * process. + */ + ShutDownSlotSync(); + slotsync_drop_initiated_slots(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 63038f87f7..c4b3e8a807 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflicting, - L.failover + L.failover, + L.sync_state FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index c345639086..854c967910 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -22,6 +22,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" +#include "replication/worker_internal.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -130,6 +131,9 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index ae31d66930..d75bb378b7 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -117,6 +117,7 @@ #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -1006,6 +1007,8 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); + SlotSyncWorkerRegister(); + /* * process any libraries that should be preloaded at postmaster start */ @@ -5743,6 +5746,9 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; + else if (start_time == BgWorkerStart_ConsistentState_HotStandby && + pmState != PM_RUN) + return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index ff65fdb4d9..7e1492f046 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -418,6 +421,45 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + PQconninfoOption *opt; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8288da5277..7de68a61e2 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,26 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Slots in state SYNCSLOT_STATE_INITIATED should have been dropped on + * promotion. + */ + if (!RecoveryInProgress() && slot->data.sync_state == SYNCSLOT_STATE_INITIATED) + elog(ERROR, "replication slot \"%s\" was not synced completely from the primary server", + NameStr(slot->data.name)); + + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. + */ + if (RecoveryInProgress() && slot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical decoding", + NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot."))); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..e01c3cce42 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1329 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary server for + * some time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if + * any activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/table.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool failover; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* + * Struct for sharing information between startup process and slot + * sync worker. + * + * Slot sync worker's pid is needed by startup process in order to + * shut it down during promotion. + */ +typedef struct SlotSyncWorkerCtx +{ + pid_t pid; + slock_t mutex; +} SlotSyncWorkerCtx; + +SlotSyncWorkerCtx *SlotSyncWorker = NULL; + +/* GUC variable */ +bool enable_syncslot = false; + +/* The last sync-cycle time when the worker updated any of the slots. */ +static TimestampTz last_update_time; + +/* Worker's nap time in case of regular activity on the primary server */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on the primary server */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 10000L /* 10 sec */ + +/* + * Number of attempts for wait_for_primary_slot_catchup() after + * which it aborts the wait and the slot sync worker then moves + * to the next slot creation/sync. + */ +#define WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS 5 + +static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); + +/* + * Wait for remote slot to pass locally reserved position. + * + * Ping and wait for the primary server for + * WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS during a slot creation, if it still + * does not catch up, abort the wait. The ones for which wait is aborted will + * attempt the wait and sync in the next sync-cycle. + * + * *persist will be set to false if the slot has disappeared or was invalidated + * on the primary; otherwise, it will be set to true. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *persist) +{ +#define WAIT_OUTPUT_COLUMN_COUNT 4 + StringInfoData cmd; + int wait_count = 0; + + if (persist) + *persist = true; + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT conflicting, restart_lsn, confirmed_flush_lsn," + " catalog_xmin FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + XLogRecPtr new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *slot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, + XIDOID}; + + CHECK_FOR_INTERRUPTS(); + + /* Handle any termination request if any */ + ProcessSlotSyncInterrupts(wrconn); + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from the" + " primary server: %s", + remote_slot->name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + ereport(WARNING, + (errmsg("slot \"%s\" disappeared from the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + walrcv_clear_result(res); + + /* + * The slot being created will be dropped when it is released (see + * ReplicationSlotRelease). + */ + if (persist) + *persist = false; + + return false; + } + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + new_restart_lsn = DatumGetLSN(slot_getattr(slot, 2, &isnull)); + if (new_invalidated || isnull) + { + ereport(WARNING, + (errmsg("slot \"%s\" invalidated on the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + ExecClearTuple(slot); + walrcv_clear_result(res); + + /* + * The slot being created will be dropped when it is released (see + * ReplicationSlotRelease). + */ + if (persist) + *persist = false; + + return false; + } + + /* + * Once we got valid restart_lsn, then confirmed_lsn and catalog_xmin + * are expected to be valid/non-null. + */ + new_confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + new_catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 4, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + + return true; + } + + if (++wait_count >= WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS) + { + ereport(LOG, + errmsg("aborting the wait for remote slot \"%s\"", + remote_slot->name)); + pfree(cmd.data); + + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 2000L, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHUP); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); +} + +/* + * Drop the slots for which sync is initiated but not yet completed + * i.e. they are still waiting for the primary server to catch up. + */ +void +slotsync_drop_initiated_slots(void) +{ + List *slots = NIL; + ListCell *lc; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.sync_state == SYNCSLOT_STATE_INITIATED) + slots = lappend(slots, s); + } + + LWLockRelease(ReplicationSlotControlLock); + + foreach(lc, slots) + { + ReplicationSlot *s = (ReplicationSlot *) lfirst(lc); + + ReplicationSlotDrop(NameStr(s->data.name), true, false); + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d as it " + "was not sync-ready", NameStr(s->data.name), + s->data.database))); + } + + list_free(slots); +} + +/* + * Get list of local logical slot names which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slot_names(void) +{ + List *localSyncedSlots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && + (s->data.sync_state != SYNCSLOT_STATE_NONE)) + { + localSyncedSlots = lappend(localSyncedSlots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return localSyncedSlots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalidated on + * the standby but valid on the primary server. If found so, it sets + * locally_invalidated to true. + */ +static bool +check_sync_slot_validity(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *cell; + + foreach(cell, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * If remote slot is not invalidated but local slot is marked as + * invalidated, then set the bool. + */ + SpinLockAcquire(&local_slot->mutex); + *locally_invalidated = + (remote_slot->invalidated == RS_INVAL_NONE) && + (local_slot->data.invalidated != RS_INVAL_NONE); + SpinLockRelease(&local_slot->mutex); + + return true; + } + } + + return false; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Additionally, it drops slots that are valid on the primary but got + * invalidated on the standby. This situation may occur due to the following + * reasons: + * - The max_slot_wal_keep_size on the standby is insufficient to retain WAL + * records from the restart_lsn of the slot. + * - primary_slot_name is temporarily reset to null and the physical slot is + * removed. + * - The primary changes wal_level to a level lower than logical. + * + * The assumption is that these dropped local invalidated slots will get + * recreated in next sync-cycle and it is okay to drop and recreate such slots + * as long as these are not consumable on the standby (which is the case + * currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + /* + * Get the list of local 'synced' slot so that those not on remote could + * be dropped. + */ + local_slot_list = get_local_synced_slot_names(); + + foreach(lc_slot, local_slot_list) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc_slot); + bool local_exists = false; + bool locally_invalidated = false; + + local_exists = check_sync_slot_validity(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!local_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true, false); + + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database))); + } + } +} + +/* + * Constructs the query in order to get failover logical slots + * information from the primary server. + */ +static void +construct_slot_query(StringInfo s) +{ + /* + * Fetch slots with failover enabled. + * + * If we are on cascading standby, we should fetch only those slots from + * the first standby which have sync_state as either 'n' or 'r'. Slots + * with sync_state as 'i' are not sync ready yet. And when we are on the + * first standby, the primary server is supposed to have slots with + * sync_state as 'n' only (or it may have all 'n', 'r' and 'i' if standby + * is promoted as primary). Thus in all the cases, filter sync_state !='i' + * is appropriate one. + */ + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, failover," + " database, pg_get_slot_invalidation_cause(slot_name)" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and sync_state != 'i'"); +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The 'sync_state' in slot.data is set to SYNCSLOT_STATE_INITIATED + * immediately after creation. It stays in same state until the + * initialization is complete. The initialization is considered to + * be completed once the remote_slot catches up with locally reserved + * position and local slot is updated. The sync_state is then changed + * to SYNCSLOT_STATE_READY. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *slot_updated) +{ + ReplicationSlot *s; + char sync_state = 0; + + /* + * Make sure that concerned WAL is received before syncing slot to target + * lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. + */ + SpinLockAcquire(&WalRcv->mutex); + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + SpinLockRelease(&WalRcv->mutex); + elog(ERROR, "skipping sync of slot \"%s\" as the received slot sync " + "LSN %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd)); + } + SpinLockRelease(&WalRcv->mutex); + + /* Search for the named slot */ + if ((s = SearchNamedReplicationSlot(remote_slot->name, true))) + { + SpinLockAcquire(&s->mutex); + sync_state = s->data.sync_state; + SpinLockRelease(&s->mutex); + } + + StartTransactionCommand(); + + /* + * Already existing slot (created by slot sync worker) and ready for sync, + * acquire and sync it. + */ + if (sync_state == SYNCSLOT_STATE_READY) + { + ReplicationSlotAcquire(remote_slot->name, true); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (MyReplicationSlot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&MyReplicationSlot->mutex); + } + + /* Skip the sync if slot has been invalidated locally. */ + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + goto cleanup; + + /* + * With hot_standby_feedback enabled and invalidations handled + * apropriately as above, this should never happen. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(ERROR, + errmsg("not synchronizing local slot \"%s\" LSN(%X/%X)" + " to remote slot's LSN(%X/%X) as synchronization " + " would move it backwards", remote_slot->name, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + LSN_FORMAT_ARGS(remote_slot->restart_lsn))); + + goto cleanup; + } + + if (remote_slot->confirmed_lsn != MyReplicationSlot->data.confirmed_flush || + remote_slot->restart_lsn != MyReplicationSlot->data.restart_lsn || + remote_slot->catalog_xmin != MyReplicationSlot->data.catalog_xmin) + { + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + *slot_updated = true; + } + } + + /* + * Already existing slot but not ready (i.e. waiting for the primary + * server to catch-up), lets attempt to make it sync-ready now. + */ + else if (sync_state == SYNCSLOT_STATE_INITIATED) + { + ReplicationSlotAcquire(remote_slot->name, true); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (MyReplicationSlot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&MyReplicationSlot->mutex); + } + + /* Skip the sync if slot has been invalidated locally. */ + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + goto cleanup; + + /* + * Refer the slot creation part (last 'else' block) for more details + * on this wait. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, NULL)) + { + goto cleanup; + } + } + + /* + * Wait for primary is over, update the lsns and mark the slot as + * READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&MyReplicationSlot->mutex); + + /* Save the changes */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + + *slot_updated = true; + + ereport(LOG, errmsg("newly locally created slot \"%s\" is sync-ready " + "now", remote_slot->name)); + } + /* User created slot with the same name exists, raise ERROR. */ + else if (sync_state == SYNCSLOT_STATE_NONE) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("skipping sync of slot \"%s\" as it is a user created" + " slot", remote_slot->name), + errdetail("This slot has failover enabled on the primary and" + " thus is sync candidate but user created slot with" + " the same name already exists on the standby"))); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase, + remote_slot->failover, + SYNCSLOT_STATE_INITIATED); + + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + /* + * If the local restart_lsn and/or local catalog_xmin is ahead of + * those on the remote then we cannot create the local slot in sync + * with the primary server because that would mean moving the local + * slot backwards and we might not have WALs retained for old LSN. In + * this case we will wait for the primary server's restart_lsn and + * catalog_xmin to catch up with the local one before attempting the + * sync. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + bool persist; + + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, &persist)) + { + /* + * The remote slot didn't catch up to locally reserved + * position. + * + * We do not drop the slot because the restart_lsn can be + * ahead of the current location when recreating the slot in + * the next cycle. It may take more time to create such a + * slot. Therefore, we persist it (provided remote-slot is + * still valid) and attempt the wait and synchronization in + * the next cycle. + */ + if (persist) + { + ReplicationSlotPersist(); + *slot_updated = true; + } + + goto cleanup; + } + } + + + /* + * Wait for primary is either not needed or is over. Update the lsns + * and mark the slot as READY for further syncs. + */ + local_slot_update(remote_slot); + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&MyReplicationSlot->mutex); + + /* Mark the slot as PERSISTENT and save the changes to disk */ + ReplicationSlotPersist(); + *slot_updated = true; + + ereport(LOG, errmsg("newly locally created slot \"%s\" is sync-ready " + "now", remote_slot->name)); + } + +cleanup: + + ReplicationSlotRelease(); + CommitTransactionCommand(); + + return; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and update + * the slots locally. Creates the slots if not present on the standby. + * + * Returns nap time for the next sync-cycle. + */ +static long +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 9 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, INT2OID}; + + WalRcvExecResult *res; + TupleTableSlot *slot; + StringInfoData s; + List *remote_slot_list = NIL; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + ListCell *cell; + bool slot_updated = false; + TimestampTz now; + + /* The primary_slot_name is not set yet or WALs not received yet */ + SpinLockAcquire(&WalRcv->mutex); + if (!WalRcv || + (WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + { + SpinLockRelease(&WalRcv->mutex); + return naptime; + } + SpinLockRelease(&WalRcv->mutex); + + /* The syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* + * Make result tuples live outside TopTransactionContext to make them + * accessible even after transaction is committed. + */ + MemoryContextSwitchTo(oldctx); + + /* Construct query to get slots info from the primary server */ + initStringInfo(&s); + construct_slot_query(&s); + + elog(DEBUG2, "slot sync worker's query:%s \n", s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch failover logical slots info " + "from the primary server: %s", res->err))); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, 5, + &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->failover = DatumGetBool(slot_getattr(slot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 8, &isnull)); + Assert(!isnull); + + remote_slot->invalidated = DatumGetInt16(slot_getattr(slot, 9, &isnull)); + Assert(!isnull); + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(slot); + } + + /* + * Drop local slots that no longer need to be synced. Do it before + * synchronize_one_slot to allow dropping of slots before actual sync + * which are invalidated locally while still valid on the primary server. + */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach(cell, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + synchronize_one_slot(wrconn, remote_slot, &slot_updated); + } + + now = GetCurrentTimestamp(); + + /* + * If any of the slots get updated in this sync-cycle, retain default + * naptime and update 'last_update_time' in slot sync worker. But if no + * activity is observed in this sync-cycle, then increase naptime provided + * inactivity time reaches threshold. + */ + if (slot_updated) + last_update_time = now; + + else if (TimestampDifferenceExceeds(last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + naptime = WORKER_INACTIVITY_NAPTIME_MS; + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + return naptime; +} + +/* + * Connect to the remote (primary) server. + * + * This uses GUC primary_conninfo in order to connect to the primary. + * For slot sync to work, primary_conninfo is required to specify dbname + * as well. + */ +static WalReceiverConn * +remote_connect(void) +{ + WalReceiverConn *wrconn = NULL; + char *err; + + wrconn = walrcv_connect(PrimaryConnInfo, true, false, + cluster_name[0] ? cluster_name : "slotsyncworker", + &err); + if (wrconn == NULL) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Connects primary to validate the slot specified in primary_slot_name. + * + * Exits the worker if physical slot with the specified name does not exist. + */ +static void +validate_primary_slot(WalReceiverConn *wrconn) +{ + WalRcvExecResult *res; + Oid slotRow[1] = {BOOLOID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *slot; + bool valid; + bool tuple_ok; + + /* Syscache access needs a transaction env. */ + StartTransactionCommand(); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "select count(*) = 1 from pg_replication_slots where " + "slot_type='physical' and slot_name=%s", + quote_literal_cstr(PrimarySlotName)); + + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch primary_slot_name info from the " + "primary: %s", res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + tuple_ok = tuplestore_gettupleslot(res->tuplestore, true, false, slot); + Assert(tuple_ok); /* It must return one tuple */ + + valid = DatumGetBool(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + if (!valid) + ereport(ERROR, + errmsg("exiting slots synchronization as slot specified in " + "primary_slot_name is not valid")); + + ExecClearTuple(slot); + walrcv_clear_result(res); + CommitTransactionCommand(); +} + +/* + * Checks if GUCs are set appropriately before starting slot sync worker + */ +static void +validate_slotsync_parameters(char **dbname) +{ + /* + * Since 'enable_syncslot' is ON, check that other GUC settings + * (primary_slot_name, hot_standby_feedback, wal_level, primary_conninfo) + * are compatible with slot synchronization. If not, raise ERROR. + */ + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) + ereport(ERROR, + errmsg("exiting slots synchronization as primary_slot_name is " + "not set")); + + /* + * Hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + ereport(ERROR, + errmsg("exiting slots synchronization as hot_standby_feedback " + "is off")); + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + ereport(ERROR, + errmsg("exiting slots synchronisation as it requires " + "wal_level >= logical")); + + /* + * The primary_conninfo is required to make connection to primary for + * getting slots information. + */ + if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) + ereport(ERROR, + errmsg("exiting slots synchronization as primary_conninfo " + "is not set")); + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + *dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (*dbname == NULL) + ereport(ERROR, + errmsg("exiting slots synchronization as dbname is not " + "specified in primary_conninfo")); + +} + +/* + * Re-read the config file. + * + * If any of the slot sync GUCs changed, validate the values again + * through validate_slotsync_parameters() which will exit the worker + * if validaity fails. + */ +static void +slotsync_reread_config(WalReceiverConn *wrconn) +{ + char *conninfo = pstrdup(PrimaryConnInfo); + char *slotname = pstrdup(PrimarySlotName); + bool syncslot = enable_syncslot; + bool standbyfeedback = hot_standby_feedback; + bool revalidate = false; + char *dbname; + bool conninfoChanged; + bool slotnameChanged; + + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + Assert(dbname); + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + conninfoChanged = strcmp(conninfo, PrimaryConnInfo) != 0; + slotnameChanged = strcmp(slotname, PrimarySlotName) != 0; + + if (conninfoChanged || slotnameChanged || + (syncslot != enable_syncslot) || + (standbyfeedback != hot_standby_feedback)) + { + revalidate = true; + } + + pfree(conninfo); + pfree(slotname); + + if (revalidate) + { + char *new_dbname; + + validate_slotsync_parameters(&new_dbname); + + /* + * Since we have initialized this worker with old dbname, thus exit if + * dbname changed. Let it get restarted and connect to new dbname + * specified. + */ + if (conninfoChanged && strcmp(dbname, new_dbname) != 0) + { + ereport(ERROR, + errmsg("exiting slot sync woker as dbname in " + "primary_conninfo changed")); + } + + if (slotnameChanged) + validate_primary_slot(wrconn); + } +} + + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + errmsg("replication slot sync worker is shutting" + " down on receiving SIGINT")); + + walrcv_disconnect(wrconn); + proc_exit(0); + } + + + if (ConfigReloadPending) + slotsync_reread_config(wrconn); +} + +/* + * Cleanup function for logical replication launcher. + * + * Called on logical replication launcher exit. + */ +static void +slotsync_worker_onexit(int code, Datum arg) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + SlotSyncWorker->pid = 0; + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * The main loop of our worker process. + * + * It connects to the primary server, fetches logical failover slots + * information periodically in order to create and sync the slots. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + + ereport(LOG, errmsg("replication slot sync worker started")); + + before_shmem_exit(slotsync_worker_onexit, (Datum) 0); + + SpinLockAcquire(&SlotSyncWorker->mutex); + + Assert(SlotSyncWorker->pid == 0); + + /* Advertise our PID so that the startup process can kill us on promotion */ + SlotSyncWorker->pid = MyProcPid; + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + validate_slotsync_parameters(&dbname); + + /* + * Connect to the database specified by user in primary_conninfo. We need + * a database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, NULL, 0); + + /* Connect to the primary server */ + wrconn = remote_connect(); + + /* + * Connect to primary and validate the slot specified in + * primary_slot_name. + */ + validate_primary_slot(wrconn); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + + ProcessSlotSyncInterrupts(wrconn); + + naptime = synchronize_slots(wrconn); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} + +/* + * Is current process the slot sync worker? + */ +bool +IsSlotSyncWorker(void) +{ + return SlotSyncWorker->pid == MyProcPid; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + SpinLockAcquire(&SlotSyncWorker->mutex); + if (!SlotSyncWorker->pid) + { + SpinLockRelease(&SlotSyncWorker->mutex); + return; + } + + kill(SlotSyncWorker->pid, SIGINT); + + SpinLockRelease(&SlotSyncWorker->mutex); + + /* Wait for it to die. */ + for (;;) + { + int rc; + + /* Wait a bit, we don't expect to have to wait long. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + SpinLockAcquire(&SlotSyncWorker->mutex); + + /* Is it gone? */ + if (!SlotSyncWorker->pid) + break; + + SpinLockRelease(&SlotSyncWorker->mutex); + } + + SpinLockRelease(&SlotSyncWorker->mutex); +} + +/* + * Allocate and initialize slow sync worker shared memory + */ +void +SlotSyncWorkerShmemInit(void) +{ + Size size; + bool found; + + size = sizeof(SlotSyncWorkerCtx); + size = MAXALIGN(size); + + SlotSyncWorker = (SlotSyncWorkerCtx *) + ShmemInitStruct("Slot Sync Worker Data", size, &found); + + if (!found) + { + memset(SlotSyncWorker, 0, size); + SpinLockInit(&SlotSyncWorker->mutex); + } +} + +/* + * Register the background worker for slots synchronization provided + * enable_syncslot is ON. + */ +void +SlotSyncWorkerRegister(void) +{ + BackgroundWorker bgw; + + if (!enable_syncslot) + { + ereport(LOG, + errmsg("skipping slots synchronization as enable_syncslot is " + "disabled.")); + return; + } + + memset(&bgw, 0, sizeof(bgw)); + + /* We need database connection which needs shared-memory access as well. */ + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* Start as soon as a consistent state has been reached in a hot standby */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; + + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + snprintf(bgw.bgw_name, BGW_MAXLEN, + "replication slot sync worker"); + snprintf(bgw.bgw_type, BGW_MAXLEN, + "slot sync worker"); + + bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; + bgw.bgw_notify_pid = 0; + bgw.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&bgw); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index d2d8bf1a7a..f1675dbd85 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index e46a1955e8..7b3784c212 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -141,7 +141,20 @@ * subscribe to the new primary without losing any data. * * However, we do not enable failover for slots created by the table sync - * worker. + * worker. This is because the table sync slot might not be fully synced on the + * standby due to the following reasons: + * + * - The standby needs to wait for the primary server to catch up because the + * local restart_lsn of the newly created slot on the standby is set using + * the latest redo position (GetXLogReplayRecPtr()), which is typically ahead + * of the primary's restart_lsn. + * - The table sync slot's restart_lsn won't be advanced until the state + * becomes SUBREL_STATE_CATCHUP. + * + * Therefore, if a failover happens before the restart_lsn advances, the table + * sync slot will not be synced to the standby. Consequently, we will not be + * able to subscribe to the promoted standby due to the absence of the + * necessary table sync slot. * * Additionally, failover is not enabled for the main slot if the table sync is * in progress. This is because if a failover occurs while the table sync diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index ca8b07a095..fe2503f37f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -266,7 +267,7 @@ ReplicationSlotValidateName(const char *name, int elevel) void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover) + bool two_phase, bool failover, char sync_state) { ReplicationSlot *slot = NULL; int i; @@ -327,6 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.sync_state = sync_state; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -686,12 +688,26 @@ restart: * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name, bool nowait) +ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd) { Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (user_cmd && RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary."))); + } + ReplicationSlotDropAcquired(); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index c87f61666d..e5ba5956aa 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -44,7 +44,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false); + false, SYNCSLOT_STATE_NONE); if (immediately_reserve) { @@ -137,7 +137,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover); + failover, SYNCSLOT_STATE_NONE); /* * Create logical decoding context to find start point or, if we don't @@ -226,11 +226,38 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name), true); + ReplicationSlotDrop(NameStr(*name), true, true); PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * Returns RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + ReplicationSlot *s; + ReplicationSlotInvalidationCause cause; + + s = SearchNamedReplicationSlot(NameStr(*name), true); + + if (s == NULL) + PG_RETURN_NULL(); + + SpinLockAcquire(&s->mutex); + cause = s->data.invalidated; + SpinLockRelease(&s->mutex); + + PG_RETURN_INT16(cause); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. @@ -238,7 +265,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -420,6 +447,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = CharGetDatum(slot_contents.data.sync_state); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index b5493fcd69..6b65c3c7dc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1071,7 +1071,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false); + false, false, SYNCSLOT_STATE_NONE); if (reserve_wal) { @@ -1102,7 +1102,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover); + two_phase, failover, SYNCSLOT_STATE_NONE); /* * Do options check early so that we can bail before calling the @@ -1254,7 +1254,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname, !cmd->wait); + ReplicationSlotDrop(cmd->slotname, !cmd->wait, true); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 2225a4a6e6..76a0f9be58 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -36,6 +36,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -336,6 +337,7 @@ CreateOrAttachShmemStructs(void) WalRcvShmemInit(); PgArchShmemInit(); ApplyLauncherShmemInit(); + SlotSyncWorkerShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 7298a187d1..92ea349488 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -60,6 +60,7 @@ #include "replication/logicalworker.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "rewrite/rewriteHandler.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -3286,6 +3287,17 @@ ProcessInterrupts(void) */ proc_exit(1); } + else if (IsSlotSyncWorker()) + { + ereport(DEBUG1, + (errmsg_internal("replication slot sync worker is shutting down due to administrator command"))); + + /* + * Slot sync worker can be stopped at any time. + * Use exit status 1 so the background worker is restarted. + */ + proc_exit(1); + } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ede94a1ede..7eb735824c 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 4b776266a4..2b40d5db6e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -67,6 +67,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2021,6 +2022,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 5d940b72cd..39224137e1 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -358,6 +358,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = off # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d906734750..6a8192ad83 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11095,14 +11095,18 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '8484', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,char}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,sync_state}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index e90ff376a6..8559900b70 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,6 +79,7 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, + BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 5ddad69348..5fa7a18111 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -52,6 +51,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* The possible values for 'sync_state' in ReplicationSlotPersistentData */ +#define SYNCSLOT_STATE_NONE 'n' /* None for user created slots */ +#define SYNCSLOT_STATE_INITIATED 'i' /* Sync initiated for the slot but + * not completed yet, waiting for + * the primary server to catch-up */ +#define SYNCSLOT_STATE_READY 'r' /* Initialization complete, ready + * to be synced further */ + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +119,13 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Is this a slot created by a sync-slot worker? + * + * Relevant for logical slots on the physical standby. + */ + char sync_state; + /* * Is this a failover slot (sync candidate for physical standbys)? Only * relevant for logical slots on the primary server. @@ -225,9 +239,10 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover); + bool two_phase, bool failover, + char sync_state); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); @@ -253,7 +268,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index f1135762fb..c96a814b26 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -19,6 +19,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -279,6 +280,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -403,6 +419,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -428,6 +445,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 84bb79ac0f..16b1129fcd 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -237,6 +237,11 @@ extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; +/* Slot sync worker objects */ +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; +extern PGDLLIMPORT bool enable_syncslot; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); @@ -326,6 +331,13 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); +extern void ReplSlotSyncWorkerMain(Datum main_arg); +extern void SlotSyncWorkerRegister(void); +extern void ShutDownSlotSync(void); +extern void slotsync_drop_initiated_slots(void); +extern bool IsSlotSyncWorker(void); +extern void SlotSyncWorkerShmemInit(void); + #define isParallelApplyWorker(worker) ((worker)->in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) #define isTablesyncWorker(worker) ((worker)->in_use && \ diff --git a/src/test/recovery/t/050_standby_failover_slots_sync.pl b/src/test/recovery/t/050_standby_failover_slots_sync.pl index 09b5d006a5..eb7667b1b3 100644 --- a/src/test/recovery/t/050_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/050_standby_failover_slots_sync.pl @@ -295,4 +295,131 @@ is( $primary->safe_psql( $subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION regress_mysub3"); +# Test logical failover slots on the standby +# Configure standby3 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb3_slot--->| ----> standby3 (connected via streaming replication) +# | lsub1_slot(synced_slot) + +# Cleanup old standby_slot_names +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = '' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb3_slot');}); + +$backup_name = 'backup2'; +$primary->backup($backup_name); + +# Create standby3 +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby3->stop; +$standby3->append_conf( + 'postgresql.conf', q{ +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb3_slot' +}); +$standby3->append_conf( + 'postgresql.conf', qq( +primary_conninfo = '$connstr_1 dbname=postgres' +)); +$standby3->start; + +# Add this standby into the primary's configuration +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb3_slot' +)); +$primary->start; + +# Restart the standby +$standby3->restart; + +# Wait for the standby to start sync +$offset = -s $standby3->logfile; +$standby3->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"lsub1_slot\"/, + $offset); + +# Advance lsn on the primary +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +$offset = -s $standby3->logfile; +$standby3->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? wait over for remote slot \"lsub1_slot\"/, + $offset); + +# Confirm that logical failover slot is created on the standby +is( $standby3->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots;} + ), + 'lsub1_slot', + 'failover slot was created'); + +# Verify slot properties on the standby +is( $standby3->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|r", + 'logical slot has sync_state as ready and failover as true on standby'); + +# Verify slot properties on the primary +is( $primary->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|n", + 'logical slot has sync_state as none and failover as true on primary'); + +# Test to confirm that restart_lsn of the logical slot on the primary is synced to the standby + +# Truncate table on primary +$primary->safe_psql('postgres', + "TRUNCATE TABLE tab_int;"); + +# Insert data on the primary +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# let the slots get synced on the standby +sleep 2; + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'restart_lsn of slot lsub1_slot synced to standby'); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +$primary_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'confirmed_flush_lsn of slot lsub1_slot synced to the standby'); + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index cb3b04aa0c..f2e5a3849c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflicting, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) + l.failover, + l.sync_state + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, sync_state) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..aac83755de 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 175bcecfd1..9a79feaddf 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2317,6 +2317,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2575,6 +2576,7 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorkerCtx SlruCtl SlruCtlData SlruErrorCause -- 2.30.0.windows.2