From d609ac77dee09f3dc80092b39f61b5583749097d Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 27 Nov 2023 11:36:09 +0800 Subject: [PATCH v38 2/3] Add logical slot sync capability to the physical standby This patch implements synchronization of logical replication slots from the primary server to the physical standby so that logical replication can be resumed after failover. All the failover logical replication slots on the primary (assuming configurations are appropriate) are automatically created on the physical standbys and are synced periodically. Slot-sync worker on the standby server ping the primary server at regular intervals to get the necessary failover logical slots information and create/update the slots locally. GUC 'enable_syncslot' enables a physical standby to synchronize failover logical replication slots from the primary server. The replication launcher on the physical standby starts slot-sync worker which is then responsible to keep on syncing the logical failover slots from the primary server. The nap time of worker is tuned according to the activity on the primary. The worker starts with nap time of 10ms and if no activity is observed on the primary for some time, then nap time is increased to 10sec. And if activity is observed again, nap time is reduced back to 10ms. The logical slots created by slot-sync worker on physical standbys are not allowed to be dropped or consumed. Any attempt to perform logical decoding on such slots will result in an error. If a logical slot is invalidated on the primary, slot on the standby is also invalidated. If a logical slot on the primary is valid but is invalidated on the standby due to conflict (say required rows removed on the primary), then that slot is dropped and recreated on the standby in next sync-cycle. It is okay to recreate such slots as long as these are not consumable on the standby (which is the case currently). Slots synced on the standby can be identified using 'sync_state' column of pg_replication_slots view. The values are: 'n': none for user slots, 'i': sync initiated for the slot but waiting for the remote slot on the primary server to catch up. 'r': ready for periodic syncs. --- doc/src/sgml/config.sgml | 35 +- doc/src/sgml/logicaldecoding.sgml | 13 + doc/src/sgml/system-views.sgml | 18 + src/backend/access/transam/xlogrecovery.c | 11 + src/backend/catalog/system_views.sql | 3 +- src/backend/postmaster/bgworker.c | 5 +- .../libpqwalreceiver/libpqwalreceiver.c | 44 +- src/backend/replication/logical/Makefile | 1 + .../replication/logical/applyparallelworker.c | 3 +- src/backend/replication/logical/launcher.c | 667 +++++++++-- src/backend/replication/logical/logical.c | 22 + src/backend/replication/logical/meson.build | 1 + src/backend/replication/logical/slotsync.c | 1032 +++++++++++++++++ src/backend/replication/logical/tablesync.c | 5 +- src/backend/replication/slot.c | 18 +- src/backend/replication/slotfuncs.c | 33 +- src/backend/replication/walsender.c | 2 +- src/backend/storage/lmgr/lwlock.c | 2 + src/backend/storage/lmgr/lwlocknames.txt | 1 + .../utils/activity/wait_event_names.txt | 2 + src/backend/utils/misc/guc_tables.c | 12 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/catalog/pg_proc.dat | 10 +- src/include/commands/subscriptioncmds.h | 4 + src/include/nodes/replnodes.h | 9 + src/include/replication/logicallauncher.h | 9 +- src/include/replication/logicalworker.h | 1 + src/include/replication/slot.h | 19 +- src/include/replication/walreceiver.h | 27 + src/include/replication/worker_internal.h | 55 +- src/include/storage/lwlock.h | 1 + src/test/recovery/t/050_verify_slot_order.pl | 127 ++ src/test/regress/expected/rules.out | 5 +- src/test/regress/expected/sysviews.out | 3 +- src/tools/pgindent/typedefs.list | 6 + 35 files changed, 2056 insertions(+), 151 deletions(-) create mode 100644 src/backend/replication/logical/slotsync.c diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 6440378d5c..b2da901ad6 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4357,6 +4357,12 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows meant to switch to a physical standby after the standby is promoted, the physical replication slot for the standby should be listed here. + + The standbys corresponding to the physical replication slots in + standby_slot_names must enable + enable_syncslot for the standbys to receive + failover logical slots changes from the primary. + @@ -4550,10 +4556,15 @@ ANY num_sync ( num_sync ( + enable_syncslot (boolean) + + enable_syncslot configuration parameter + + + + + It enables a physical standby to synchronize logical failover slots + from the primary server so that logical subscribers are not blocked + after failover. + + + It is disabled by default. This parameter can only be set in the + postgresql.conf file or on the server command line. + + + diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index cd152d4ced..90bec06f36 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -346,6 +346,19 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU pg_log_standby_snapshot function on the primary. + + A logical replication slot on the primary can be synchronized to the hot + standby by enabling the failover option during slot creation and set + enable_syncslot on the standby. The physical replication + slot for the standby should be listed in + standby_slot_names on the primary to prevent the + subscriber from consuming changes faster than the hot standby. + Additionally, similar to creating a logical replication slot on the hot + standby, hot_standby_feedback should be set on the + standby and a physical slot between the primary and the standby should be + used. + + Replication slots persist across crashes and know nothing about the state diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 1dc695fd3a..3631681adc 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2543,6 +2543,24 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx after failover. Always false for physical slots. + + + + sync_state char + + + Defines slot synchronization state. This is meaningful on physical + standby which has enabled slots synchronization. For the primary server, + its value is always 'none'. + + + State code: + n = none for user created slots, + i = sync initiated for the slot but not yet completed, + not ready for periodic syncs, + r = ready for periodic syncs + + diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index c61566666a..8ea6dc799a 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -49,7 +49,9 @@ #include "postmaster/bgwriter.h" #include "postmaster/startup.h" #include "replication/slot.h" +#include "replication/logicallauncher.h" #include "replication/walreceiver.h" +#include "replication/worker_internal.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -1435,6 +1437,15 @@ FinishWalRecovery(void) */ XLogShutdownWalRcv(); + /* + * Shutdown the slot sync workers to prevent potential conflicts between + * user processes and slotsync workers after a promotion. Additionally, + * drop any slots that have initiated but not yet completed the sync + * process. + */ + ShutDownSlotSync(); + slotsync_drop_initiated_slots(); + /* * We are now done reading the xlog from stream. Turn off streaming * recovery to force fetching the files (which would be required at end of diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 63038f87f7..c4b3e8a807 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS L.safe_wal_size, L.two_phase, L.conflicting, - L.failover + L.failover, + L.sync_state FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 48a9924527..0e039c786f 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -125,11 +125,14 @@ static const struct "ParallelWorkerMain", ParallelWorkerMain }, { - "ApplyLauncherMain", ApplyLauncherMain + "LauncherMain", LauncherMain }, { "ApplyWorkerMain", ApplyWorkerMain }, + { + "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain + }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 336c2bec99..5f82d01e6d 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -34,6 +34,7 @@ #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/tuplestore.h" +#include "utils/varlena.h" PG_MODULE_MAGIC; @@ -58,6 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host, int *sender_port); static char *libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli); +static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo); static int libpqrcv_server_version(WalReceiverConn *conn); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, @@ -100,6 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { .walrcv_send = libpqrcv_send, .walrcv_create_slot = libpqrcv_create_slot, .walrcv_alter_slot = libpqrcv_alter_slot, + .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo, .walrcv_get_backend_pid = libpqrcv_get_backend_pid, .walrcv_exec = libpqrcv_exec, .walrcv_disconnect = libpqrcv_disconnect @@ -413,6 +416,45 @@ libpqrcv_server_version(WalReceiverConn *conn) return PQserverVersion(conn->streamConn); } +/* + * Get database name from the primary server's conninfo. + * + * If dbname is not found in connInfo, return NULL value. + */ +static char * +libpqrcv_get_dbname_from_conninfo(const char *connInfo) +{ + PQconninfoOption *opts; + PQconninfoOption *opt; + char *dbname = NULL; + char *err = NULL; + + opts = PQconninfoParse(connInfo, &err); + if (opts == NULL) + { + /* The error string is malloc'd, so we must free it explicitly */ + char *errcopy = err ? pstrdup(err) : "out of memory"; + + PQfreemem(err); + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", errcopy))); + } + + for (opt = opts; opt->keyword != NULL; ++opt) + { + /* + * If multiple dbnames are specified, then the last one will be + * returned + */ + if (strcmp(opt->keyword, "dbname") == 0 && opt->val && + opt->val[0] != '\0') + dbname = pstrdup(opt->val); + } + + return dbname; +} + /* * Start streaming WAL data from given streaming options. * @@ -998,7 +1040,7 @@ libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, */ static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname, - bool failover) + bool failover) { StringInfoData cmd; PGresult *res; diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile index 2dc25e37bb..ba03eeff1c 100644 --- a/src/backend/replication/logical/Makefile +++ b/src/backend/replication/logical/Makefile @@ -25,6 +25,7 @@ OBJS = \ proto.o \ relation.o \ reorderbuffer.o \ + slotsync.o \ snapbuild.o \ tablesync.o \ worker.o diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 9b37736f8e..192c9e1860 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -922,7 +922,8 @@ ParallelApplyWorkerMain(Datum main_arg) before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); SpinLockAcquire(&MyParallelShared->mutex); - MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; + MyParallelShared->logicalrep_worker_generation = + MyLogicalRepWorker->hdr.generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; SpinLockRelease(&MyParallelShared->mutex); diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 501910b445..c0d6cf7e85 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -8,20 +8,27 @@ * src/backend/replication/logical/launcher.c * * NOTES - * This module contains the logical replication worker launcher which - * uses the background worker infrastructure to start the logical - * replication workers for every enabled subscription. + * This module contains the replication worker launcher which + * uses the background worker infrastructure to: + * a) start the logical replication workers for every enabled subscription + * when not in standby_mode. + * b) start the slot sync worker for logical failover slots synchronization + * from the primary server when in standby_mode. * *------------------------------------------------------------------------- */ #include "postgres.h" +#include "access/genam.h" #include "access/heapam.h" #include "access/htup.h" #include "access/htup_details.h" #include "access/tableam.h" #include "access/xact.h" +#include "access/xlogrecovery.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_database.h" #include "catalog/pg_subscription.h" #include "catalog/pg_subscription_rel.h" #include "funcapi.h" @@ -44,6 +51,7 @@ #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" @@ -57,6 +65,9 @@ int max_logical_replication_workers = 4; int max_sync_workers_per_subscription = 2; int max_parallel_apply_workers_per_subscription = 2; +bool enable_syncslot = false; + +SlotSyncWorkerInfo *SlotSyncWorker = NULL; LogicalRepWorker *MyLogicalRepWorker = NULL; @@ -70,6 +81,7 @@ typedef struct LogicalRepCtxStruct dshash_table_handle last_start_dsh; /* Background workers. */ + SlotSyncWorkerInfo ss_worker; /* slot sync worker */ LogicalRepWorker workers[FLEXIBLE_ARRAY_MEMBER]; } LogicalRepCtxStruct; @@ -102,6 +114,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); +static void slotsync_worker_cleanup(SlotSyncWorkerInfo *worker); static int logicalrep_pa_worker_count(Oid subid); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); @@ -178,6 +191,8 @@ get_subscription_list(void) } /* + * This is common code for logical workers and slot sync worker. + * * Wait for a background worker to start up and attach to the shmem context. * * This is only needed for cleaning up the shared memory in case the worker @@ -186,12 +201,14 @@ get_subscription_list(void) * Returns whether the attach was successful. */ static bool -WaitForReplicationWorkerAttach(LogicalRepWorker *worker, +WaitForReplicationWorkerAttach(LogicalWorkerHeader *worker, uint16 generation, - BackgroundWorkerHandle *handle) + BackgroundWorkerHandle *handle, + LWLock *lock) { BgwHandleStatus status; int rc; + bool is_slotsync_worker = (lock == SlotSyncWorkerLock) ? true : false; for (;;) { @@ -199,27 +216,32 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker, CHECK_FOR_INTERRUPTS(); - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* Worker either died or has started. Return false if died. */ if (!worker->in_use || worker->proc) { - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); return worker->in_use; } - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Check if worker has died before attaching, and clean up after it. */ status = GetBackgroundWorkerPid(handle, &pid); if (status == BGWH_STOPPED) { - LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); + LWLockAcquire(lock, LW_EXCLUSIVE); /* Ensure that this was indeed the worker we waited for. */ if (generation == worker->generation) - logicalrep_worker_cleanup(worker); - LWLockRelease(LogicalRepWorkerLock); + { + if (is_slotsync_worker) + slotsync_worker_cleanup((SlotSyncWorkerInfo *) worker); + else + logicalrep_worker_cleanup((LogicalRepWorker *) worker); + } + LWLockRelease(lock); return false; } @@ -262,8 +284,8 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running) if (isParallelApplyWorker(w)) continue; - if (w->in_use && w->subid == subid && w->relid == relid && - (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && w->relid == relid && + (!only_running || w->hdr.proc)) { res = w; break; @@ -290,7 +312,8 @@ logicalrep_workers_find(Oid subid, bool only_running) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->in_use && w->subid == subid && (!only_running || w->proc)) + if (w->hdr.in_use && w->subid == subid && + (!only_running || w->hdr.proc)) res = lappend(res, w); } @@ -351,7 +374,7 @@ retry: { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (!w->in_use) + if (!w->hdr.in_use) { worker = w; slot = i; @@ -380,7 +403,7 @@ retry: * If the worker was marked in use but didn't manage to attach in * time, clean it up. */ - if (w->in_use && !w->proc && + if (w->hdr.in_use && !w->hdr.proc && TimestampDifferenceExceeds(w->launch_time, now, wal_receiver_timeout)) { @@ -438,9 +461,9 @@ retry: /* Prepare the worker slot. */ worker->type = wtype; worker->launch_time = now; - worker->in_use = true; - worker->generation++; - worker->proc = NULL; + worker->hdr.in_use = true; + worker->hdr.generation++; + worker->hdr.proc = NULL; worker->dbid = dbid; worker->userid = userid; worker->subid = subid; @@ -457,7 +480,7 @@ retry: TIMESTAMP_NOBEGIN(worker->reply_time); /* Before releasing lock, remember generation for future identification. */ - generation = worker->generation; + generation = worker->hdr.generation; LWLockRelease(LogicalRepWorkerLock); @@ -510,7 +533,7 @@ retry: { /* Failed to start worker, so clean up the worker slot. */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); - Assert(generation == worker->generation); + Assert(generation == worker->hdr.generation); logicalrep_worker_cleanup(worker); LWLockRelease(LogicalRepWorkerLock); @@ -522,19 +545,23 @@ retry: } /* Now wait until it attaches. */ - return WaitForReplicationWorkerAttach(worker, generation, bgw_handle); + return WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + LogicalRepWorkerLock); } /* * Internal function to stop the worker and wait until it detaches from the - * slot. + * slot. It is used for both logical workers and slot sync worker. */ static void -logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) +logicalrep_worker_stop_internal(LogicalWorkerHeader *worker, int signo, + LWLock *lock) { uint16 generation; - Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_SHARED)); + Assert(LWLockHeldByMeInMode(lock, LW_SHARED)); /* * Remember which generation was our worker so we can check if what we see @@ -550,7 +577,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) { int rc; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -564,7 +591,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) } /* Recheck worker status. */ - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); /* * Check whether the worker slot is no longer used, which would mean @@ -591,7 +618,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) if (!worker->proc || worker->generation != generation) break; - LWLockRelease(LogicalRepWorkerLock); + LWLockRelease(lock); /* Wait a bit --- we don't expect to have to wait long. */ rc = WaitLatch(MyLatch, @@ -604,7 +631,7 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo) CHECK_FOR_INTERRUPTS(); } - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + LWLockAcquire(lock, LW_SHARED); } } @@ -623,7 +650,9 @@ logicalrep_worker_stop(Oid subid, Oid relid) if (worker) { Assert(!isParallelApplyWorker(worker)); - logicalrep_worker_stop_internal(worker, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -669,8 +698,10 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo) /* * Only stop the worker if the generation matches and the worker is alive. */ - if (worker->generation == generation && worker->proc) - logicalrep_worker_stop_internal(worker, SIGINT); + if (worker->hdr.generation == generation && worker->hdr.proc) + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + LogicalRepWorkerLock); LWLockRelease(LogicalRepWorkerLock); } @@ -696,14 +727,14 @@ logicalrep_worker_wakeup(Oid subid, Oid relid) /* * Wake up (using latch) the specified logical replication worker. * - * Caller must hold lock, else worker->proc could change under us. + * Caller must hold lock, else worker->hdr.proc could change under us. */ void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker) { Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - SetLatch(&worker->proc->procLatch); + SetLatch(&worker->hdr.proc->procLatch); } /* @@ -718,7 +749,7 @@ logicalrep_worker_attach(int slot) Assert(slot >= 0 && slot < max_logical_replication_workers); MyLogicalRepWorker = &LogicalRepCtx->workers[slot]; - if (!MyLogicalRepWorker->in_use) + if (!MyLogicalRepWorker->hdr.in_use) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -727,7 +758,7 @@ logicalrep_worker_attach(int slot) slot))); } - if (MyLogicalRepWorker->proc) + if (MyLogicalRepWorker->hdr.proc) { LWLockRelease(LogicalRepWorkerLock); ereport(ERROR, @@ -736,7 +767,7 @@ logicalrep_worker_attach(int slot) "another worker, cannot attach", slot))); } - MyLogicalRepWorker->proc = MyProc; + MyLogicalRepWorker->hdr.proc = MyProc; before_shmem_exit(logicalrep_worker_onexit, (Datum) 0); LWLockRelease(LogicalRepWorkerLock); @@ -771,7 +802,9 @@ logicalrep_worker_detach(void) LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc); if (isParallelApplyWorker(w)) - logicalrep_worker_stop_internal(w, SIGTERM); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) w, + SIGTERM, + LogicalRepWorkerLock); } LWLockRelease(LogicalRepWorkerLock); @@ -794,10 +827,10 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) Assert(LWLockHeldByMeInMode(LogicalRepWorkerLock, LW_EXCLUSIVE)); worker->type = WORKERTYPE_UNKNOWN; - worker->in_use = false; - worker->proc = NULL; - worker->dbid = InvalidOid; + worker->hdr.in_use = false; + worker->hdr.proc = NULL; worker->userid = InvalidOid; + worker->dbid = InvalidOid; worker->subid = InvalidOid; worker->relid = InvalidOid; worker->leader_pid = InvalidPid; @@ -931,9 +964,18 @@ ApplyLauncherRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; + + /* + * The launcher now takes care of launching both logical apply workers and + * logical slot sync worker. Thus to cater to the requirements of both, + * start it as soon as a consistent state is reached. This will help + * slot sync worker to start timely on a physical standby while on a + * non-standby server, it holds same meaning as that of + * BgWorkerStart_RecoveryFinished. + */ + bgw.bgw_start_time = BgWorkerStart_ConsistentState; snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ApplyLauncherMain"); + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "LauncherMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "logical replication launcher"); snprintf(bgw.bgw_type, BGW_MAXLEN, @@ -1115,13 +1157,455 @@ ApplyLauncherWakeup(void) } /* - * Main loop for the apply launcher process. + * Clean up slot sync worker info. + */ +static void +slotsync_worker_cleanup(SlotSyncWorkerInfo *worker) +{ + Assert(LWLockHeldByMeInMode(SlotSyncWorkerLock, LW_EXCLUSIVE)); + + worker->hdr.in_use = false; + worker->hdr.proc = NULL; + worker->last_update_time = 0; +} + +/* + * Attach slot sync worker to SlotSyncWorkerInfo assigned by the launcher. */ void -ApplyLauncherMain(Datum main_arg) +slotsync_worker_attach() { - ereport(DEBUG1, - (errmsg_internal("logical replication launcher started"))); + /* Block concurrent access. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + SlotSyncWorker = &LogicalRepCtx->ss_worker; + + if (!SlotSyncWorker->hdr.in_use) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot sync worker not initialized, cannot attach"))); + } + + if (SlotSyncWorker->hdr.proc) + { + LWLockRelease(SlotSyncWorkerLock); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("replication slot sync worker is already running, cannot attach"))); + } + + SlotSyncWorker->hdr.proc = MyProc; + + before_shmem_exit(slotsync_worker_detach, (Datum) 0); + + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Detach the slot sync worker (cleans up the worker info). + */ +void +slotsync_worker_detach(int code, Datum arg) +{ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + slotsync_worker_cleanup(SlotSyncWorker); + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Start slot sync background worker. + * + * Returns true on success, false on failure. + */ +static bool +slotsync_worker_launch() +{ + BackgroundWorker bgw; + BackgroundWorkerHandle *bgw_handle; + SlotSyncWorkerInfo *worker; + bool attach; + uint16 generation; + + /* The shared memory must only be modified under lock. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + + SlotSyncWorker = &LogicalRepCtx->ss_worker; + + worker = SlotSyncWorker; + + /* Prepare the new worker. */ + worker->hdr.in_use = true; + + /* TODO: do we really need 'generation', analyse more here */ + worker->hdr.generation++; + + /* + * 'proc' will be assigned in ReplSlotSyncWorkerMain when the worker + * attaches to SlotSyncWorkerInfo. + */ + worker->hdr.proc = NULL; + + /* Before releasing lock, remember generation for future identification. */ + generation = worker->hdr.generation; + + LWLockRelease(SlotSyncWorkerLock); + + /* Register the new dynamic worker. */ + memset(&bgw, 0, sizeof(bgw)); + bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + bgw.bgw_start_time = BgWorkerStart_ConsistentState; + snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); + + snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); + + snprintf(bgw.bgw_name, BGW_MAXLEN, "replication slot sync worker"); + + snprintf(bgw.bgw_type, BGW_MAXLEN, "slot sync worker"); + + bgw.bgw_restart_time = BGW_NEVER_RESTART; + bgw.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&bgw, &bgw_handle)) + { + /* Failed to start worker, so clean up the worker slot. */ + LWLockAcquire(SlotSyncWorkerLock, LW_EXCLUSIVE); + Assert(generation == worker->hdr.generation); + slotsync_worker_cleanup(worker); + LWLockRelease(SlotSyncWorkerLock); + + ereport(WARNING, + (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), + errmsg("out of background worker slots"), + errhint("You might need to increase %s.", + "max_worker_processes"))); + return false; + } + + /* Now wait until it attaches. */ + attach = WaitForReplicationWorkerAttach((LogicalWorkerHeader *) worker, + generation, + bgw_handle, + SlotSyncWorkerLock); + + if (!attach) + ereport(WARNING, + (errmsg("replication slot sync worker failed to attach"))); + + return attach; +} + +/* + * Internal function to stop the slot sync worker and cleanup afterwards. + */ +static void +slotsync_worker_stop_internal(SlotSyncWorkerInfo *worker) +{ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + ereport(LOG, + (errmsg("stopping replication slot sync worker with pid: %d", + worker->hdr.proc->pid))); + logicalrep_worker_stop_internal((LogicalWorkerHeader *) worker, + SIGINT, + SlotSyncWorkerLock); + LWLockRelease(SlotSyncWorkerLock); +} + +/* + * Validate if db with given dbname exists. + * + * Can't use existing functions like 'get_database_oid' from dbcommands.c for + * validity purpose as they need db connection. + */ +static bool +validate_dbname(const char *dbname) +{ + HeapTuple tuple; + Relation relation; + SysScanDesc scan; + ScanKeyData key[1]; + bool valid; + + /* Start a transaction so we can access pg_database */ + StartTransactionCommand(); + + /* Form a scan key */ + ScanKeyInit(&key[0], Anum_pg_database_datname, BTEqualStrategyNumber, + F_NAMEEQ, CStringGetDatum(dbname)); + + /* No db connection, force heap scan */ + relation = table_open(DatabaseRelationId, AccessShareLock); + scan = systable_beginscan(relation, DatabaseNameIndexId, false, + NULL, 1, key); + + tuple = systable_getnext(scan); + + if (HeapTupleIsValid(tuple)) + valid = true; + else + valid = false; + + /* all done */ + systable_endscan(scan); + table_close(relation, AccessShareLock); + + CommitTransactionCommand(); + return valid; +} + +/* + * Checks if GUC are set appropriately before starting slot sync worker + */ +static bool +slotsync_checks(long *wait_time, bool *retry) +{ + char *dbname; + + *retry = false; + + if (!enable_syncslot) + return false; + + /* + * Since the above GUC is set, check that other GUC settings + * (primary_slot_name, hot_standby_feedback, primary_conninfo, wal_level) + * are compatible with slot synchronization. If not, issue warnings. + */ + + /* + * A physical replication slot(primary_slot_name) is required on the + * primary to ensure that the rows needed by the standby are not removed + * after restarting, so that the synchronized slot on the standby will not + * be invalidated. + */ + if (!WalRcv || WalRcv->slotname[0] == '\0') + { + ereport(WARNING, + errmsg("skipping slots synchronization as primary_slot_name is not set")); + + /* + * It's possible that the Walreceiver has not been started yet, adjust + * the wait_time to retry sooner in the next synchronization cycle. + */ + *wait_time = wal_retrieve_retry_interval; + + /* + * Tell caller to retry the connection for the case where + * primary_slot_name is set but Walreceiver is not yet started. + */ + if (PrimarySlotName && strcmp(PrimarySlotName, "") != 0) + *retry = true; + + return false; + } + + /* + * Hot_standby_feedback must be enabled to cooperate with the physical + * replication slot, which allows informing the primary about the xmin and + * catalog_xmin values on the standby. + */ + if (!hot_standby_feedback) + { + ereport(WARNING, + errmsg("skipping slots synchronization as hot_standby_feedback is off")); + return false; + } + + /* + * Logical decoding requires wal_level >= logical and we currently only + * synchronize logical slots. + */ + if (wal_level < WAL_LEVEL_LOGICAL) + { + ereport(WARNING, + errmsg("skipping slots synchronisation as it requires wal_level >= logical")); + return false; + } + + /* + * The slot sync worker needs a database connection for walrcv_exec to + * work. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + { + ereport(WARNING, + errmsg("skipping slots synchronization as dbname is not specified in primary_conninfo")); + return false; + } + + if (!validate_dbname(dbname)) + { + ereport(WARNING, + errmsg("skipping slots synchronization as dbname specified in primary_conninfo is not a valid one")); + return false; + } + + return true; +} + +/* + * Shut down the slot sync worker. + */ +void +ShutDownSlotSync(void) +{ + if (LogicalRepCtx->ss_worker.hdr.in_use) + slotsync_worker_stop_internal(&LogicalRepCtx->ss_worker); +} + +/* + * Re-read the config file. + * + * If one of the slot sync options has changed, stop the slot sync worker + * and set ss_recheck flag to enable the caller to recheck slot sync GUCs + * before restarting the worker + */ +static void +LauncherRereadConfig(bool *ss_recheck) +{ + char *conninfo = pstrdup(PrimaryConnInfo); + char *slotname = pstrdup(PrimarySlotName); + bool syncslot = enable_syncslot; + bool standbyfeedback = hot_standby_feedback; + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + /* + * If any of the related GUCs changed, stop the slot sync worker. The + * worker will be relaunched in next sync-cycle using the new GUCs. + */ + if ((strcmp(conninfo, PrimaryConnInfo) != 0) || + (strcmp(slotname, PrimarySlotName) != 0) || + (syncslot != enable_syncslot) || + (standbyfeedback != hot_standby_feedback)) + { + ShutDownSlotSync(); + + /* Retry slot sync with new GUCs */ + *ss_recheck = true; + } + + pfree(conninfo); + pfree(slotname); +} + +/* + * Launch slot sync background worker. + */ +static void +LaunchSlotSyncWorker(long *wait_time) +{ + LWLockAcquire(SlotSyncWorkerLock, LW_SHARED); + + /* The worker is running already */ + if (SlotSyncWorker && SlotSyncWorker->hdr.in_use && + SlotSyncWorker->hdr.proc) + { + LWLockRelease(SlotSyncWorkerLock); + return; + } + + LWLockRelease(SlotSyncWorkerLock); + + /* + * If launch failed, adjust the wait_time to retry in the next sync-cycle + * sooner. + */ + if (!slotsync_worker_launch()) + { + *wait_time = Min(*wait_time, wal_retrieve_retry_interval); + } +} + +/* + * Launch logical replication apply workers for enabled subscriptions. + */ +static void +LaunchSubscriptionApplyWorker(long *wait_time) +{ + List *sublist; + ListCell *lc; + MemoryContext subctx; + MemoryContext oldctx; + + /* Use temporary context to avoid leaking memory across cycles. */ + subctx = AllocSetContextCreate(TopMemoryContext, + "Logical Replication Launcher sublist", + ALLOCSET_DEFAULT_SIZES); + oldctx = MemoryContextSwitchTo(subctx); + + /* Start any missing workers for enabled subscriptions. */ + sublist = get_subscription_list(); + foreach(lc, sublist) + { + Subscription *sub = (Subscription *) lfirst(lc); + LogicalRepWorker *w; + TimestampTz last_start; + TimestampTz now; + long elapsed; + + if (!sub->enabled) + continue; + + LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); + w = logicalrep_worker_find(sub->oid, InvalidOid, false); + LWLockRelease(LogicalRepWorkerLock); + + if (w != NULL) + continue; /* worker is running already */ + + /* + * If the worker is eligible to start now, launch it. Otherwise, + * adjust wait_time so that we'll wake up as soon as it can be + * started. + * + * Each subscription's apply worker can only be restarted once per + * wal_retrieve_retry_interval, so that errors do not cause us to + * repeatedly restart the worker as fast as possible. In cases where + * a restart is expected (e.g., subscription parameter changes), + * another process should remove the last-start entry for the + * subscription so that the worker can be restarted without waiting + * for wal_retrieve_retry_interval to elapse. + */ + last_start = ApplyLauncherGetWorkerStartTime(sub->oid); + now = GetCurrentTimestamp(); + if (last_start == 0 || + (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= + wal_retrieve_retry_interval) + { + ApplyLauncherSetWorkerStartTime(sub->oid, now); + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, + sub->owner, InvalidOid, + DSM_HANDLE_INVALID); + } + else + { + *wait_time = Min(*wait_time, + wal_retrieve_retry_interval - elapsed); + } + } + + /* Switch back to original memory context. */ + MemoryContextSwitchTo(oldctx); + /* Clean the temporary memory. */ + MemoryContextDelete(subctx); +} + +/* + * Main loop for the launcher process. + */ +void +LauncherMain(Datum main_arg) +{ + bool start_slotsync = false; + bool recheck_slotsync = true; + + elog(DEBUG1, "logical replication launcher started"); before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); @@ -1139,79 +1623,32 @@ ApplyLauncherMain(Datum main_arg) */ BackgroundWorkerInitializeConnection(NULL, NULL, 0); + load_file("libpqwalreceiver", false); + /* Enter main loop */ for (;;) { int rc; - List *sublist; - ListCell *lc; - MemoryContext subctx; - MemoryContext oldctx; long wait_time = DEFAULT_NAPTIME_PER_CYCLE; CHECK_FOR_INTERRUPTS(); - /* Use temporary context to avoid leaking memory across cycles. */ - subctx = AllocSetContextCreate(TopMemoryContext, - "Logical Replication Launcher sublist", - ALLOCSET_DEFAULT_SIZES); - oldctx = MemoryContextSwitchTo(subctx); - - /* Start any missing workers for enabled subscriptions. */ - sublist = get_subscription_list(); - foreach(lc, sublist) + /* + * If it is Hot standby, then try to launch slot sync worker else + * launch apply workers. + */ + if (RecoveryInProgress() && !PromoteIsTriggered()) { - Subscription *sub = (Subscription *) lfirst(lc); - LogicalRepWorker *w; - TimestampTz last_start; - TimestampTz now; - long elapsed; - - if (!sub->enabled) - continue; - - LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); - w = logicalrep_worker_find(sub->oid, InvalidOid, false); - LWLockRelease(LogicalRepWorkerLock); + /* Make validation checks first */ + if (recheck_slotsync) + start_slotsync = slotsync_checks(&wait_time, &recheck_slotsync); - if (w != NULL) - continue; /* worker is running already */ - - /* - * If the worker is eligible to start now, launch it. Otherwise, - * adjust wait_time so that we'll wake up as soon as it can be - * started. - * - * Each subscription's apply worker can only be restarted once per - * wal_retrieve_retry_interval, so that errors do not cause us to - * repeatedly restart the worker as fast as possible. In cases - * where a restart is expected (e.g., subscription parameter - * changes), another process should remove the last-start entry - * for the subscription so that the worker can be restarted - * without waiting for wal_retrieve_retry_interval to elapse. - */ - last_start = ApplyLauncherGetWorkerStartTime(sub->oid); - now = GetCurrentTimestamp(); - if (last_start == 0 || - (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) - { - ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(WORKERTYPE_APPLY, - sub->dbid, sub->oid, sub->name, - sub->owner, InvalidOid, - DSM_HANDLE_INVALID); - } - else - { - wait_time = Min(wait_time, - wal_retrieve_retry_interval - elapsed); - } + /* Start slot sync workers if checks passed */ + if (start_slotsync) + LaunchSlotSyncWorker(&wait_time); } - - /* Switch back to original memory context. */ - MemoryContextSwitchTo(oldctx); - /* Clean the temporary memory. */ - MemoryContextDelete(subctx); + else + LaunchSubscriptionApplyWorker(&wait_time); /* Wait for more work. */ rc = WaitLatch(MyLatch, @@ -1226,10 +1663,7 @@ ApplyLauncherMain(Datum main_arg) } if (ConfigReloadPending) - { - ConfigReloadPending = false; - ProcessConfigFile(PGC_SIGHUP); - } + LauncherRereadConfig(&recheck_slotsync); } /* Not reachable */ @@ -1260,7 +1694,8 @@ GetLeaderApplyWorkerPid(pid_t pid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->proc && pid == w->proc->pid) + if (isParallelApplyWorker(w) && w->hdr.proc && + pid == w->hdr.proc->pid) { leader_pid = w->leader_pid; break; @@ -1298,13 +1733,13 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) memcpy(&worker, &LogicalRepCtx->workers[i], sizeof(LogicalRepWorker)); - if (!worker.proc || !IsBackendPid(worker.proc->pid)) + if (!worker.hdr.proc || !IsBackendPid(worker.hdr.proc->pid)) continue; if (OidIsValid(subid) && worker.subid != subid) continue; - worker_pid = worker.proc->pid; + worker_pid = worker.hdr.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); if (isTablesyncWorker(&worker)) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 8288da5277..5bef0138b4 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -524,6 +524,28 @@ CreateDecodingContext(XLogRecPtr start_lsn, errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); + /* + * Slots in state SYNCSLOT_STATE_INITIATED should have been dropped on + * promotion. + */ + if (!RecoveryInProgress() && slot->data.sync_state == SYNCSLOT_STATE_INITIATED) + elog(ERROR, "replication slot \"%s\" was not synced completely from the primary server", + NameStr(slot->data.name)); + + /* + * Do not allow consumption of a "synchronized" slot until the standby + * gets promoted. Also do not allow consumption of slots with sync_state + * as SYNCSLOT_STATE_INITIATED as they are not synced completely to be + * used. + */ + if (RecoveryInProgress() && slot->data.sync_state != SYNCSLOT_STATE_NONE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use replication slot \"%s\" for logical decoding", + NameStr(slot->data.name)), + errdetail("This slot is being synced from the primary server."), + errhint("Specify another replication slot."))); + /* * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid * "cannot get changes" wording in this errmsg because that'd be diff --git a/src/backend/replication/logical/meson.build b/src/backend/replication/logical/meson.build index d48cd4c590..9e52ec421f 100644 --- a/src/backend/replication/logical/meson.build +++ b/src/backend/replication/logical/meson.build @@ -11,6 +11,7 @@ backend_sources += files( 'proto.c', 'relation.c', 'reorderbuffer.c', + 'slotsync.c', 'snapbuild.c', 'tablesync.c', 'worker.c', diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c new file mode 100644 index 0000000000..61bbf42933 --- /dev/null +++ b/src/backend/replication/logical/slotsync.c @@ -0,0 +1,1032 @@ +/*------------------------------------------------------------------------- + * slotsync.c + * PostgreSQL worker for synchronizing slots to a standby server from the + * primary server. + * + * Copyright (c) 2023, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/replication/logical/slotsync.c + * + * This file contains the code for slot sync worker on a physical standby + * to fetch logical failover slots information from the primary server, + * create the slots on the standby and synchronize them periodically. + * + * It also takes care of dropping the slots which were created by it and are + * currently not needed to be synchronized. + * + * It takes a nap of WORKER_DEFAULT_NAPTIME_MS before every next + * synchronization. If there is no activity observed on the primary server for + * some time, the nap time is increased to WORKER_INACTIVITY_NAPTIME_MS, but if + * any activity is observed, the nap time reverts to the default value. + *--------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlogrecovery.h" +#include "commands/dbcommands.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/interrupt.h" +#include "replication/logical.h" +#include "replication/logicallauncher.h" +#include "replication/logicalworker.h" +#include "replication/walreceiver.h" +#include "replication/worker_internal.h" +#include "storage/ipc.h" +#include "storage/procarray.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/guc_hooks.h" +#include "utils/pg_lsn.h" +#include "utils/varlena.h" + +/* + * Structure to hold information fetched from the primary server about a logical + * replication slot. + */ +typedef struct RemoteSlot +{ + char *name; + char *plugin; + char *database; + bool two_phase; + bool conflicting; + XLogRecPtr restart_lsn; + XLogRecPtr confirmed_lsn; + TransactionId catalog_xmin; + + /* RS_INVAL_NONE if valid, or the reason of invalidation */ + ReplicationSlotInvalidationCause invalidated; +} RemoteSlot; + +/* Worker's nap time in case of regular activity on the primary server */ +#define WORKER_DEFAULT_NAPTIME_MS 10L /* 10 ms */ + +/* Worker's nap time in case of no-activity on the primary server */ +#define WORKER_INACTIVITY_NAPTIME_MS 10000L /* 10 sec */ + +/* + * Inactivity Threshold in ms before increasing nap time of worker. + * + * If the lsn of slot being monitored did not change for this threshold time, + * then increase nap time of current worker from WORKER_DEFAULT_NAPTIME_MS to + * WORKER_INACTIVITY_NAPTIME_MS. + */ +#define WORKER_INACTIVITY_THRESHOLD_MS 10000L /* 10 sec */ + +/* + * Number of attempts for wait_for_primary_slot_catchup() after + * which it aborts the wait and the slot sync worker then moves + * to the next slot creation/sync. + */ +#define WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS 5 + +/* + * Wait for remote slot to pass locally reserved position. + * + * Ping and wait for the primary server for + * WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS during a slot creation, if it still + * does not catch up, abort the wait. The ones for which wait is aborted will + * attempt the wait and sync in the next sync-cycle. + * + * *persist will be set to false if the slot has disappeared or was invalidated + * on the primary; otherwise, it will be set to true. + */ +static bool +wait_for_primary_slot_catchup(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *persist) +{ +#define WAIT_OUTPUT_COLUMN_COUNT 4 + StringInfoData cmd; + int wait_count = 0; + + *persist = true; + + ereport(LOG, + errmsg("waiting for remote slot \"%s\" LSN (%X/%X) and catalog xmin" + " (%u) to pass local slot LSN (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->restart_lsn), + remote_slot->catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT conflicting, restart_lsn, confirmed_flush_lsn," + " catalog_xmin FROM pg_catalog.pg_replication_slots" + " WHERE slot_name = %s", + quote_literal_cstr(remote_slot->name)); + + for (;;) + { + XLogRecPtr new_invalidated; + XLogRecPtr new_restart_lsn; + XLogRecPtr new_confirmed_lsn; + TransactionId new_catalog_xmin; + WalRcvExecResult *res; + TupleTableSlot *slot; + int rc; + bool isnull; + Oid slotRow[WAIT_OUTPUT_COLUMN_COUNT] = {BOOLOID, LSNOID, LSNOID, + XIDOID}; + + CHECK_FOR_INTERRUPTS(); + + Assert(RecoveryInProgress()); + + res = walrcv_exec(wrconn, cmd.data, WAIT_OUTPUT_COLUMN_COUNT, slotRow); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch slot info for slot \"%s\" from the" + " primary server: %s", + remote_slot->name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + ereport(WARNING, + (errmsg("slot \"%s\" disappeared from the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + walrcv_clear_result(res); + + /* + * The slot being created will be dropped when it is released (see + * ReplicationSlotRelease). + */ + *persist = false; + + return false; + } + + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + new_invalidated = DatumGetBool(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + new_restart_lsn = DatumGetLSN(slot_getattr(slot, 2, &isnull)); + if (new_invalidated || isnull) + { + ereport(WARNING, + (errmsg("slot \"%s\" invalidated on the primary server," + " slot creation aborted", remote_slot->name))); + pfree(cmd.data); + ExecClearTuple(slot); + walrcv_clear_result(res); + + /* + * The slot being created will be dropped when it is released (see + * ReplicationSlotRelease). + */ + *persist = false; + + return false; + } + + /* + * Once we got valid restart_lsn, then confirmed_lsn and catalog_xmin + * are expected to be valid/non-null. + */ + new_confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); + + new_catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 4, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + if (new_restart_lsn >= MyReplicationSlot->data.restart_lsn && + TransactionIdFollowsOrEquals(new_catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + /* Update new values in remote_slot */ + remote_slot->restart_lsn = new_restart_lsn; + remote_slot->confirmed_lsn = new_confirmed_lsn; + remote_slot->catalog_xmin = new_catalog_xmin; + + ereport(LOG, + errmsg("wait over for remote slot \"%s\" as its LSN (%X/%X)" + " and catalog xmin (%u) has now passed local slot LSN" + " (%X/%X) and catalog xmin (%u)", + remote_slot->name, + LSN_FORMAT_ARGS(new_restart_lsn), + new_catalog_xmin, + LSN_FORMAT_ARGS(MyReplicationSlot->data.restart_lsn), + MyReplicationSlot->data.catalog_xmin)); + pfree(cmd.data); + + return true; + } + + if (++wait_count >= WORKER_PRIMARY_CATCHUP_WAIT_ATTEMPTS) + { + ereport(LOG, + errmsg("aborting the wait for remote slot \"%s\" and moving" + " to the next slot, will attempt creating it again", + remote_slot->name)); + pfree(cmd.data); + + return false; + } + + /* + * XXX: Is waiting for 2 seconds before retrying enough or more or + * less? + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + 2000L, + WAIT_EVENT_REPL_SLOTSYNC_PRIMARY_CATCHUP); + + ResetLatch(MyLatch); + + /* Emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + } +} + +/* + * Update local slot metadata as per remote_slot's positions + */ +static void +local_slot_update(RemoteSlot *remote_slot) +{ + Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); + + LogicalConfirmReceivedLocation(remote_slot->confirmed_lsn); + LogicalIncreaseXminForSlot(remote_slot->confirmed_lsn, + remote_slot->catalog_xmin); + LogicalIncreaseRestartDecodingForSlot(remote_slot->confirmed_lsn, + remote_slot->restart_lsn); + + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&MyReplicationSlot->mutex); + + ReplicationSlotMarkDirty(); +} + +/* + * Wait for remote slot to pass locally reserved position and + * sync the slot locally if wait is over. + */ +static void +wait_for_primary_and_sync(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *slot_updated) +{ + /* + * If the local restart_lsn and/or local catalog_xmin is ahead of those on + * the remote then we cannot create the local slot in sync with the + * primary server because that would mean moving the local slot backwards + * and we might not have WALs retained for old LSN. In this case we will + * wait for the primary server's restart_lsn and catalog_xmin to catch up + * with the local one before attempting the sync. + */ + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin)) + { + bool persist; + + if (!wait_for_primary_slot_catchup(wrconn, remote_slot, &persist)) + { + /* + * The remote slot didn't catch up to locally reserved position. + * + * We do not drop the slot because the restart_lsn can be ahead of + * the current location when recreating the slot in the next cycle. + * It may take more time to create such a slot. Therefore, we + * persist it and attempt the wait and synchronization in the next + * cycle. + */ + if (persist && MyReplicationSlot->data.persistency != RS_PERSISTENT) + { + ReplicationSlotPersist(); + *slot_updated = true; + } + + return; + } + } + + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + + if (MyReplicationSlot->data.persistency != RS_PERSISTENT) + ReplicationSlotPersist(); + else + ReplicationSlotSave(); + + /* + * Wait for primary is over, mark the slot as READY for incremental syncs. + * Cascading standbys can also start syncing it. + */ + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.sync_state = SYNCSLOT_STATE_READY; + SpinLockRelease(&MyReplicationSlot->mutex); + + *slot_updated = true; + + ereport(LOG, errmsg("newly locally created slot \"%s\" has been synced", + remote_slot->name)); +} + +/* + * Drop the slots for which sync is initiated but not yet completed + * i.e. they are still waiting for the primary server to catch up. + */ +void +slotsync_drop_initiated_slots(void) +{ + List *slots = NIL; + ListCell *lc; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->in_use && s->data.sync_state == SYNCSLOT_STATE_INITIATED) + slots = lappend(slots, s); + } + + LWLockRelease(ReplicationSlotControlLock); + + foreach(lc, slots) + { + ReplicationSlot *s = (ReplicationSlot *) lfirst(lc); + + ReplicationSlotDrop(NameStr(s->data.name), true, false); + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d as it " + "was not sync-ready", NameStr(s->data.name), + s->data.database))); + } + + list_free(slots); +} + +/* + * Get list of local logical slot names which are synchronized from + * the primary server. + */ +static List * +get_local_synced_slot_names(void) +{ + List *localSyncedSlots = NIL; + + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + /* Check if it is logical synchronized slot */ + if (s->in_use && SlotIsLogical(s) && + (s->data.sync_state != SYNCSLOT_STATE_NONE)) + { + localSyncedSlots = lappend(localSyncedSlots, s); + } + } + + LWLockRelease(ReplicationSlotControlLock); + + return localSyncedSlots; +} + +/* + * Helper function to check if local_slot is present in remote_slots list. + * + * It also checks if logical slot is locally invalidated i.e. invalidated on + * the standby but valid on the primary server. If found so, it sets + * locally_invalidated to true. + */ +static bool +slot_exists_in_list(ReplicationSlot *local_slot, List *remote_slots, + bool *locally_invalidated) +{ + ListCell *cell; + + foreach(cell, remote_slots) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + if (strcmp(remote_slot->name, NameStr(local_slot->data.name)) == 0) + { + /* + * if remote slot is marked as non-conflicting (i.e. not + * invalidated) but local slot is marked as invalidated, then set + * the bool. + */ + *locally_invalidated = + !remote_slot->conflicting && + (local_slot->data.invalidated != RS_INVAL_NONE); + + return true; + } + } + + return false; +} + +/* + * This gets invalidation cause of the remote slot. + */ +static ReplicationSlotInvalidationCause +get_remote_invalidation_cause(WalReceiverConn *wrconn, char *slot_name) +{ + WalRcvExecResult *res; + Oid slotRow[1] = {INT2OID}; + StringInfoData cmd; + bool isnull; + TupleTableSlot *slot; + ReplicationSlotInvalidationCause cause; + MemoryContext oldctx = CurrentMemoryContext; + + /* Syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + initStringInfo(&cmd); + appendStringInfo(&cmd, + "SELECT pg_get_slot_invalidation_cause(%s)", + quote_literal_cstr(slot_name)); + res = walrcv_exec(wrconn, cmd.data, 1, slotRow); + pfree(cmd.data); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch invalidation cause for slot \"%s\" from" + " the primary server: %s", slot_name, res->err))); + + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + ereport(ERROR, + (errmsg("slot \"%s\" disappeared from the primary server", + slot_name))); + + cause = DatumGetInt16(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + ExecClearTuple(slot); + walrcv_clear_result(res); + + return cause; +} + +/* + * Drop obsolete slots + * + * Drop the slots that no longer need to be synced i.e. these either do not + * exist on the primary or are no longer enabled for failover. + * + * Also drop the slots that are valid on the primary that got invalidated + * on the standby due to conflict (say required rows removed on the primary). + * The assumption is, that these will get recreated in next sync-cycle and + * it is okay to drop and recreate such slots as long as these are not + * consumable on the standby (which is the case currently). + */ +static void +drop_obsolete_slots(List *remote_slot_list) +{ + List *local_slot_list = NIL; + ListCell *lc_slot; + + /* + * Get the list of local 'synced' slot so that those not on remote could + * be dropped. + */ + local_slot_list = get_local_synced_slot_names(); + + foreach(lc_slot, local_slot_list) + { + ReplicationSlot *local_slot = (ReplicationSlot *) lfirst(lc_slot); + bool local_exists = false; + bool locally_invalidated = false; + + local_exists = slot_exists_in_list(local_slot, remote_slot_list, + &locally_invalidated); + + /* + * Drop the local slot either if it is not in the remote slots list or + * is invalidated while remote slot is still valid. + */ + if (!local_exists || locally_invalidated) + { + ReplicationSlotDrop(NameStr(local_slot->data.name), true, false); + + ereport(LOG, + (errmsg("dropped replication slot \"%s\" of dbid %d", + NameStr(local_slot->data.name), + local_slot->data.database))); + } + } +} + +/* + * Constructs the query in order to get failover logical slots + * information from the primary server. + */ +static void +construct_slot_query(StringInfo s) +{ + /* + * Fetch data for logical failover slots with sync_state either as + * SYNCSLOT_STATE_NONE or SYNCSLOT_STATE_READY. + */ + appendStringInfo(s, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase, conflicting, " + " database FROM pg_catalog.pg_replication_slots" + " WHERE failover and sync_state != 'i'"); +} + +/* + * Synchronize single slot to given position. + * + * This creates a new slot if there is no existing one and updates the + * metadata of the slot as per the data received from the primary server. + * + * The 'sync_state' in slot.data is set to SYNCSLOT_STATE_INITIATED + * immediately after creation. It stays in same state until the + * initialization is complete. The initialization is considered to + * be completed once the remote_slot catches up with locally reserved + * position and local slot is updated. The sync_state is then changed + * to SYNCSLOT_STATE_READY. + */ +static void +synchronize_one_slot(WalReceiverConn *wrconn, RemoteSlot *remote_slot, + bool *slot_updated) +{ + MemoryContext oldctx = CurrentMemoryContext; + ReplicationSlot *s; + char sync_state = 0; + + /* + * Make sure that concerned WAL is received before syncing slot to target + * lsn received from the primary server. + * + * This check should never pass as on the primary server, we have waited + * for the standby's confirmation before updating the logical slot. But to + * take care of any bug in that flow, we should retain this check. + */ + if (remote_slot->confirmed_lsn > WalRcv->latestWalEnd) + { + elog(LOG, "skipping sync of slot \"%s\" as the received slot sync " + "LSN %X/%X is ahead of the standby position %X/%X", + remote_slot->name, + LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), + LSN_FORMAT_ARGS(WalRcv->latestWalEnd)); + + return; + } + + /* Search for the named slot */ + if ((s = SearchNamedReplicationSlot(remote_slot->name, true))) + { + SpinLockAcquire(&s->mutex); + sync_state = s->data.sync_state; + SpinLockRelease(&s->mutex); + } + + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* + * Already existing slot (created by slot sync worker) and ready for sync, + * acquire and sync it. + */ + if (sync_state == SYNCSLOT_STATE_READY) + { + ReplicationSlotAcquire(remote_slot->name, true); + + /* + * Copy the invalidation cause from remote only if local slot is not + * invalidated locally, we don't want to overwrite existing one. + */ + if (MyReplicationSlot->data.invalidated == RS_INVAL_NONE) + { + SpinLockAcquire(&MyReplicationSlot->mutex); + MyReplicationSlot->data.invalidated = remote_slot->invalidated; + SpinLockRelease(&MyReplicationSlot->mutex); + } + + /* Skip the sync if slot has been invalidated locally. */ + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + goto cleanup; + + if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn) + { + ereport(WARNING, + errmsg("not synchronizing slot %s; synchronization would" + " move it backwards", remote_slot->name)); + + goto cleanup; + } + + if (remote_slot->confirmed_lsn != MyReplicationSlot->data.confirmed_flush || + remote_slot->restart_lsn != MyReplicationSlot->data.restart_lsn || + remote_slot->catalog_xmin != MyReplicationSlot->data.catalog_xmin) + { + /* Update LSN of slot to remote slot's current position */ + local_slot_update(remote_slot); + ReplicationSlotSave(); + *slot_updated = true; + } + } + + /* + * Already existing slot but not ready (i.e. waiting for the primary + * server to catch-up), lets attempt to finish the first sync now. + */ + else if (sync_state == SYNCSLOT_STATE_INITIATED) + { + ReplicationSlotAcquire(remote_slot->name, true); + + /* Skip the sync if slot has been invalidated locally. */ + if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE) + goto cleanup; + + wait_for_primary_and_sync(wrconn, remote_slot, slot_updated); + } + /* User created slot with the same name exists, raise ERROR. */ + else if (sync_state == SYNCSLOT_STATE_NONE) + { + ereport(ERROR, + errmsg("not synchronizing slot %s; it is a user created slot", + remote_slot->name)); + } + /* Otherwise create the slot first. */ + else + { + TransactionId xmin_horizon = InvalidTransactionId; + ReplicationSlot *slot; + + ReplicationSlotCreate(remote_slot->name, true, RS_EPHEMERAL, + remote_slot->two_phase, false); + slot = MyReplicationSlot; + + SpinLockAcquire(&slot->mutex); + slot->data.database = get_database_oid(remote_slot->database, false); + + /* Mark it as sync initiated by slot sync worker */ + slot->data.sync_state = SYNCSLOT_STATE_INITIATED; + slot->data.failover = true; + + namestrcpy(&slot->data.plugin, remote_slot->plugin); + SpinLockRelease(&slot->mutex); + + ReplicationSlotReserveWal(); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + xmin_horizon = GetOldestSafeDecodingTransactionId(true); + SpinLockAcquire(&slot->mutex); + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(true); + LWLockRelease(ProcArrayLock); + + wait_for_primary_and_sync(wrconn, remote_slot, slot_updated); + + } + +cleanup: + + ReplicationSlotRelease(); + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + return; +} + +/* + * Synchronize slots. + * + * Gets the failover logical slots info from the primary server and update + * the slots locally. Creates the slots if not present on the standby. + * + * Returns nap time for the next sync-cycle. + */ +static long +synchronize_slots(WalReceiverConn *wrconn) +{ +#define SLOTSYNC_COLUMN_COUNT 8 + Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID}; + + WalRcvExecResult *res; + TupleTableSlot *slot; + StringInfoData s; + List *remote_slot_list = NIL; + MemoryContext oldctx = CurrentMemoryContext; + long naptime = WORKER_DEFAULT_NAPTIME_MS; + ListCell *cell; + bool slot_updated = false; + TimestampTz now; + + /* The primary_slot_name is not set yet or WALs not received yet */ + if (!WalRcv || + (WalRcv->slotname[0] == '\0') || + XLogRecPtrIsInvalid(WalRcv->latestWalEnd)) + return naptime; + + /* The syscache access needs a transaction env. */ + StartTransactionCommand(); + + /* Make things live outside TX context */ + MemoryContextSwitchTo(oldctx); + + /* Construct query to get slots info from the primary server */ + initStringInfo(&s); + construct_slot_query(&s); + + elog(DEBUG2, "slot sync worker's query:%s \n", s.data); + + /* Execute the query */ + res = walrcv_exec(wrconn, s.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(s.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errmsg("could not fetch failover logical slots info " + "from the primary server: %s", res->err))); + + CommitTransactionCommand(); + + /* Switch to oldctx we saved */ + MemoryContextSwitchTo(oldctx); + + /* Construct the remote_slot tuple and synchronize each slot locally */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + bool isnull; + RemoteSlot *remote_slot = palloc0(sizeof(RemoteSlot)); + + remote_slot->name = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + + remote_slot->plugin = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* + * It is possible to get null values for LSN and Xmin if slot is + * invalidated on the primary server, so handle accordingly. + */ + remote_slot->confirmed_lsn = DatumGetLSN(slot_getattr(slot, 3, &isnull)); + if (isnull) + remote_slot->confirmed_lsn = InvalidXLogRecPtr; + + remote_slot->restart_lsn = DatumGetLSN(slot_getattr(slot, 4, &isnull)); + if (isnull) + remote_slot->restart_lsn = InvalidXLogRecPtr; + + remote_slot->catalog_xmin = DatumGetTransactionId(slot_getattr(slot, + 5, &isnull)); + if (isnull) + remote_slot->catalog_xmin = InvalidTransactionId; + + remote_slot->two_phase = DatumGetBool(slot_getattr(slot, 6, &isnull)); + Assert(!isnull); + + remote_slot->conflicting = DatumGetBool(slot_getattr(slot, 7, &isnull)); + Assert(!isnull); + + remote_slot->database = TextDatumGetCString(slot_getattr(slot, + 8, &isnull)); + Assert(!isnull); + + if (remote_slot->conflicting) + remote_slot->invalidated = get_remote_invalidation_cause(wrconn, + remote_slot->name); + else + remote_slot->invalidated = RS_INVAL_NONE; + + /* Create list of remote slots */ + remote_slot_list = lappend(remote_slot_list, remote_slot); + + ExecClearTuple(slot); + } + + /* + * Drop local slots that no longer need to be synced. Do it before + * synchronize_one_slot to allow dropping of slots before actual sync + * which are invalidated locally while still valid on the primary server. + */ + drop_obsolete_slots(remote_slot_list); + + /* Now sync the slots locally */ + foreach(cell, remote_slot_list) + { + RemoteSlot *remote_slot = (RemoteSlot *) lfirst(cell); + + synchronize_one_slot(wrconn, remote_slot, &slot_updated); + } + + now = GetCurrentTimestamp(); + + /* + * If any of the slots get updated in this sync-cycle, retain default + * naptime and update 'last_update_time' in slot sync worker. But if no + * activity is observed in this sync-cycle, then increase naptime provided + * inactivity time reaches threshold. + */ + if (slot_updated) + SlotSyncWorker->last_update_time = now; + + else if (TimestampDifferenceExceeds(SlotSyncWorker->last_update_time, + now, WORKER_INACTIVITY_THRESHOLD_MS)) + naptime = WORKER_INACTIVITY_NAPTIME_MS; + + /* We are done, free remote_slot_list elements */ + list_free_deep(remote_slot_list); + + walrcv_clear_result(res); + + return naptime; +} + +/* + * Connect to the remote (primary) server. + * + * This uses GUC primary_conninfo in order to connect to the primary. + * For slot sync to work, primary_conninfo is required to specify dbname + * as well. + */ +static WalReceiverConn * +remote_connect(void) +{ + WalReceiverConn *wrconn = NULL; + char *err; + + wrconn = walrcv_connect(PrimaryConnInfo, true, false, "slot sync", &err); + if (wrconn == NULL) + ereport(ERROR, + (errmsg("could not connect to the primary server: %s", err))); + return wrconn; +} + +/* + * Re-read the config file. + * + * If primary_conninfo has changed, reconnect to primary. + */ +static void +slotsync_reread_config(WalReceiverConn **wrconn) +{ + char *conninfo = pstrdup(PrimaryConnInfo); + + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + + /* Reconnect if GUC primary_conninfo got changed */ + if (strcmp(conninfo, PrimaryConnInfo) != 0) + { + if (*wrconn) + walrcv_disconnect(*wrconn); + + *wrconn = remote_connect(); + } + + pfree(conninfo); +} + +/* + * Interrupt handler for main loop of slot sync worker. + */ +static void +ProcessSlotSyncInterrupts(WalReceiverConn **wrconn) +{ + CHECK_FOR_INTERRUPTS(); + + if (ShutdownRequestPending) + { + ereport(LOG, + errmsg("replication slot sync worker is shutting" + " down on receiving SIGINT")); + + walrcv_disconnect(*wrconn); + proc_exit(0); + } + + + if (ConfigReloadPending) + slotsync_reread_config(wrconn); +} + +/* + * The main loop of our worker process. + */ +void +ReplSlotSyncWorkerMain(Datum main_arg) +{ + WalReceiverConn *wrconn = NULL; + char *dbname; + + /* Setup signal handling */ + pqsignal(SIGHUP, SignalHandlerForConfigReload); + pqsignal(SIGINT, SignalHandlerForShutdownRequest); + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + slotsync_worker_attach(); + + /* + * If the standby has been promoted, skip the slot synchronization process. + * + * Although the startup process stops all the slot sync workers on + * promotion, the launcher may not have realized the promotion and could + * start additional workers after that. Therefore, this check is still + * necessary to prevent these additional workers from running. + */ + if (PromoteIsTriggered()) + exit(0); + + ereport(LOG, errmsg("replication slot sync worker started")); + + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + + /* + * Get the user provided dbname from the connection string, if dbname not + * provided, skip sync. + */ + dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (dbname == NULL) + proc_exit(0); + + /* + * Connect to the database specified by user in PrimaryConnInfo. We need a + * database connection for walrcv_exec to work. Please see comments atop + * libpqrcv_exec. + */ + BackgroundWorkerInitializeConnection(dbname, + NULL, + 0); + + /* Connect to the primary server */ + wrconn = remote_connect(); + + /* Main wait loop. */ + for (;;) + { + int rc; + long naptime; + + ProcessSlotSyncInterrupts(&wrconn); + + /* Check if got promoted */ + if (!RecoveryInProgress()) + { + /* + * Drop the slots for which sync is initiated but not yet + * completed i.e. they are still waiting for the primary server to + * catch up. + */ + slotsync_drop_initiated_slots(); + ereport(LOG, + errmsg("exiting slot sync woker on promotion of standby")); + proc_exit(0); + } + + naptime = synchronize_slots(wrconn); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + naptime, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + + /* + * The slot sync worker can not get here because it will only stop when it + * receives a SIGINT from the logical replication launcher, or when there + * is an error. + */ + Assert(false); +} diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 1eaee4197b..73aacef25f 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "commands/subscriptioncmds.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -246,7 +247,7 @@ wait_for_worker_state_change(char expected_state) LWLockAcquire(LogicalRepWorkerLock, LW_SHARED); worker = logicalrep_worker_find(MyLogicalRepWorker->subid, InvalidOid, false); - if (worker && worker->proc) + if (worker && worker->hdr.proc) logicalrep_worker_wakeup_ptr(worker); LWLockRelease(LogicalRepWorkerLock); if (!worker) @@ -535,7 +536,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) if (rstate->state == SUBREL_STATE_SYNCWAIT) { /* Signal the sync worker, as it may be waiting for us. */ - if (syncworker->proc) + if (syncworker->hdr.proc) logicalrep_worker_wakeup_ptr(syncworker); /* Now safe to release the LWLock */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c96a1700cd..2c059dffca 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/slot.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -325,6 +326,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; + slot->data.sync_state = SYNCSLOT_STATE_NONE; /* and then data only present in shared memory */ slot->just_dirtied = false; @@ -684,12 +686,26 @@ restart: * Permanently drop replication slot identified by the passed in name. */ void -ReplicationSlotDrop(const char *name, bool nowait) +ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd) { Assert(MyReplicationSlot == NULL); ReplicationSlotAcquire(name, nowait); + /* + * Do not allow users to drop the slots which are currently being synced + * from the primary to the standby. + */ + if (user_cmd && RecoveryInProgress() && + MyReplicationSlot->data.sync_state != SYNCSLOT_STATE_NONE) + { + ReplicationSlotRelease(); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot drop replication slot \"%s\"", name), + errdetail("This slot is being synced from the primary."))); + } + ReplicationSlotDropAcquired(); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index fb6e37d2c3..7b1e0c1552 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -226,11 +226,38 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); - ReplicationSlotDrop(NameStr(*name), true); + ReplicationSlotDrop(NameStr(*name), true, true); PG_RETURN_VOID(); } +/* + * SQL function for getting invalidation cause of a slot. + * + * Returns ReplicationSlotInvalidationCause enum value for valid slot_name; + * returns NULL if slot with given name is not found. + * + * Returns RS_INVAL_NONE if the given slot is not invalidated. + */ +Datum +pg_get_slot_invalidation_cause(PG_FUNCTION_ARGS) +{ + Name name = PG_GETARG_NAME(0); + ReplicationSlot *s; + ReplicationSlotInvalidationCause cause; + + s = SearchNamedReplicationSlot(NameStr(*name), true); + + if (s == NULL) + PG_RETURN_NULL(); + + SpinLockAcquire(&s->mutex); + cause = s->data.invalidated; + SpinLockRelease(&s->mutex); + + PG_RETURN_INT16(cause); +} + /* * pg_get_replication_slots - SQL SRF showing all replication slots * that currently exist on the database cluster. @@ -238,7 +265,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 16 +#define PG_GET_REPLICATION_SLOTS_COLS 17 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -420,6 +447,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) values[i++] = BoolGetDatum(slot_contents.data.failover); + values[i++] = CharGetDatum(slot_contents.data.sync_state); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 3e44228bde..edb3656cd2 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1254,7 +1254,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) static void DropReplicationSlot(DropReplicationSlotCmd *cmd) { - ReplicationSlotDrop(cmd->slotname, !cmd->wait); + ReplicationSlotDrop(cmd->slotname, !cmd->wait, false); } /* diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 315a78cda9..fd9e73a49b 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -190,6 +190,8 @@ static const char *const BuiltinTrancheNames[] = { "LogicalRepLauncherDSA", /* LWTRANCHE_LAUNCHER_HASH: */ "LogicalRepLauncherHash", + /* LWTRANCHE_SLOTSYNC_DSA: */ + "SlotSyncWorkerDSA", }; StaticAssertDecl(lengthof(BuiltinTrancheNames) == diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt index f72f2906ce..e62a3f1bc0 100644 --- a/src/backend/storage/lmgr/lwlocknames.txt +++ b/src/backend/storage/lmgr/lwlocknames.txt @@ -54,3 +54,4 @@ XactTruncationLock 44 WrapLimitsVacuumLock 46 NotifyQueueTailLock 47 WaitEventExtensionLock 48 +SlotSyncWorkerLock 49 diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index ede94a1ede..7eb735824c 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -53,6 +53,8 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." +REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_PRIMARY_CATCHUP "Waiting for the primary to catch-up, in slot sync worker." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index e5e7bb23f9..65a1e544c1 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -65,8 +65,11 @@ #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" #include "replication/logicallauncher.h" +#include "replication/reorderbuffer.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -2021,6 +2024,15 @@ struct config_bool ConfigureNamesBool[] = NULL, NULL, NULL }, + { + {"enable_syncslot", PGC_SIGHUP, REPLICATION_STANDBY, + gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), + }, + &enable_syncslot, + false, + NULL, NULL, NULL + }, + /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, false, NULL, NULL, NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 998080e0e4..eaafdb3035 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -355,6 +355,7 @@ #wal_retrieve_retry_interval = 5s # time to wait before retrying to # retrieve WAL after a failed attempt #recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery +#enable_syncslot = on # enables slot synchronization on the physical standby from the primary # - Subscribers - diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d906734750..6a8192ad83 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11095,14 +11095,18 @@ proname => 'pg_drop_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'void', proargtypes => 'name', prosrc => 'pg_drop_replication_slot' }, +{ oid => '8484', descr => 'what caused the replication slot to become invalid', + proname => 'pg_get_slot_invalidation_cause', provolatile => 's', proisstrict => 't', + prorettype => 'int2', proargtypes => 'name', + prosrc => 'pg_get_slot_invalidation_cause' }, { oid => '3781', descr => 'information about replication slots currently in use', proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,char}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,sync_state}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/commands/subscriptioncmds.h b/src/include/commands/subscriptioncmds.h index 214dc6c29e..75b4b2040d 100644 --- a/src/include/commands/subscriptioncmds.h +++ b/src/include/commands/subscriptioncmds.h @@ -17,6 +17,7 @@ #include "catalog/objectaddress.h" #include "parser/parse_node.h" +#include "replication/walreceiver.h" extern ObjectAddress CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, bool isTopLevel); @@ -28,4 +29,7 @@ extern void AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId); extern char defGetStreamingMode(DefElem *def); +extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, + char *slotname, bool missing_ok); + #endif /* SUBSCRIPTIONCMDS_H */ diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h index bef8a7162e..8a5b374cea 100644 --- a/src/include/nodes/replnodes.h +++ b/src/include/nodes/replnodes.h @@ -33,6 +33,15 @@ typedef struct IdentifySystemCmd NodeTag type; } IdentifySystemCmd; +/* ------------------------------- + * LIST_DBID_FOR_FAILOVER_SLOTS command + * ------------------------------- + */ +typedef struct ListDBForFailoverSlotsCmd +{ + NodeTag type; + List *slot_names; +} ListDBForFailoverSlotsCmd; /* ---------------------- * BASE_BACKUP command diff --git a/src/include/replication/logicallauncher.h b/src/include/replication/logicallauncher.h index a07c9cb311..02499a7e66 100644 --- a/src/include/replication/logicallauncher.h +++ b/src/include/replication/logicallauncher.h @@ -15,9 +15,11 @@ extern PGDLLIMPORT int max_logical_replication_workers; extern PGDLLIMPORT int max_sync_workers_per_subscription; extern PGDLLIMPORT int max_parallel_apply_workers_per_subscription; +extern PGDLLIMPORT bool enable_syncslot; + extern void ApplyLauncherRegister(void); -extern void ApplyLauncherMain(Datum main_arg); +extern void LauncherMain(Datum main_arg); extern Size ApplyLauncherShmemSize(void); extern void ApplyLauncherShmemInit(void); @@ -31,4 +33,9 @@ extern bool IsLogicalLauncher(void); extern pid_t GetLeaderApplyWorkerPid(pid_t pid); +extern void ShutDownSlotSync(void); + +extern PGDLLIMPORT char *PrimaryConnInfo; +extern PGDLLIMPORT char *PrimarySlotName; + #endif /* LOGICALLAUNCHER_H */ diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h index bbd71d0b42..baad5a8f3a 100644 --- a/src/include/replication/logicalworker.h +++ b/src/include/replication/logicalworker.h @@ -19,6 +19,7 @@ extern PGDLLIMPORT volatile sig_atomic_t ParallelApplyMessagePending; extern void ApplyWorkerMain(Datum main_arg); extern void ParallelApplyWorkerMain(Datum main_arg); extern void TablesyncWorkerMain(Datum main_arg); +extern void ReplSlotSyncWorkerMain(Datum main_arg); extern bool IsLogicalWorker(void); extern bool IsLogicalParallelApplyWorker(void); diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index ca06e5b1ad..6c300da45a 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -15,7 +15,6 @@ #include "storage/lwlock.h" #include "storage/shmem.h" #include "storage/spin.h" -#include "replication/walreceiver.h" /* * Behaviour of replication slots, upon release or crash. @@ -52,6 +51,14 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, } ReplicationSlotInvalidationCause; +/* The possible values for 'sync_state' in ReplicationSlotPersistentData */ +#define SYNCSLOT_STATE_NONE 'n' /* None for user created slots */ +#define SYNCSLOT_STATE_INITIATED 'i' /* Sync initiated for the slot but + * not completed yet, waiting for + * the primary server to catch-up */ +#define SYNCSLOT_STATE_READY 'r' /* Initialization complete, ready + * to be synced further */ + /* * On-Disk data of a replication slot, preserved across restarts. */ @@ -112,6 +119,13 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + /* + * Is this a slot created by a sync-slot worker? + * + * Relevant for logical slots on the physical standby. + */ + char sync_state; + /* * Is this a failover slot (sync candidate for physical standbys)? * Only relevant for logical slots on the primary server. @@ -227,7 +241,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover); extern void ReplicationSlotPersist(void); -extern void ReplicationSlotDrop(const char *name, bool nowait); +extern void ReplicationSlotDrop(const char *name, bool nowait, bool user_cmd); extern void ReplicationSlotAlter(const char *name, bool failover); extern void ReplicationSlotAcquire(const char *name, bool nowait); @@ -253,7 +267,6 @@ extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_l extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot); -extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok); extern void StartupReplicationSlots(void); extern void CheckPointReplicationSlots(bool is_shutdown); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 61bc8de72c..87e45c990e 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -20,6 +20,7 @@ #include "pgtime.h" #include "port/atomics.h" #include "replication/logicalproto.h" +#include "replication/slot.h" #include "replication/walsender.h" #include "storage/condition_variable.h" #include "storage/latch.h" @@ -191,6 +192,14 @@ typedef struct } proto; } WalRcvStreamOptions; +/* + * Failover logical slots data received from remote. + */ +typedef struct WalRcvFailoverSlotsData +{ + Oid dboid; +} WalRcvFailoverSlotsData; + struct WalReceiverConn; typedef struct WalReceiverConn WalReceiverConn; @@ -280,6 +289,21 @@ typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn, typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn, TimeLineID *primary_tli); +/* + * walrcv_get_dbinfo_for_failover_slots_fn + * + * Run LIST_DBID_FOR_FAILOVER_SLOTS on primary server to get the + * list of unique DBIDs for failover logical slots + */ +typedef List *(*walrcv_get_dbinfo_for_failover_slots_fn) (WalReceiverConn *conn); + +/* + * walrcv_get_dbname_from_conninfo_fn + * + * Returns the dbid from the primary_conninfo + */ +typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo); + /* * walrcv_server_version_fn * @@ -404,6 +428,7 @@ typedef struct WalReceiverFunctionsType walrcv_get_conninfo_fn walrcv_get_conninfo; walrcv_get_senderinfo_fn walrcv_get_senderinfo; walrcv_identify_system_fn walrcv_identify_system; + walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo; walrcv_server_version_fn walrcv_server_version; walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile; walrcv_startstreaming_fn walrcv_startstreaming; @@ -429,6 +454,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions; WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port) #define walrcv_identify_system(conn, primary_tli) \ WalReceiverFunctions->walrcv_identify_system(conn, primary_tli) +#define walrcv_get_dbname_from_conninfo(conninfo) \ + WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo) #define walrcv_server_version(conn) \ WalReceiverFunctions->walrcv_server_version(conn) #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \ diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 4378690ab0..4161de1019 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -1,7 +1,8 @@ /*------------------------------------------------------------------------- * * worker_internal.h - * Internal headers shared by logical replication workers. + * Internal headers shared by logical replication workers + * and slotsync workers. * * Portions Copyright (c) 2016-2023, PostgreSQL Global Development Group * @@ -36,14 +37,9 @@ typedef enum LogicalRepWorkerType WORKERTYPE_PARALLEL_APPLY, } LogicalRepWorkerType; -typedef struct LogicalRepWorker +/* Common data for Slotsync and LogicalRep workers */ +typedef struct LogicalWorkerHeader { - /* What type of worker is this? */ - LogicalRepWorkerType type; - - /* Time at which this worker was launched. */ - TimestampTz launch_time; - /* Indicates if this slot is used or free. */ bool in_use; @@ -53,6 +49,19 @@ typedef struct LogicalRepWorker /* Pointer to proc array. NULL if not running. */ PGPROC *proc; +} LogicalWorkerHeader; + +/* Shared memory structure for logical replication workers. */ +typedef struct LogicalRepWorker +{ + LogicalWorkerHeader hdr; + + /* Time at which this worker was launched. */ + TimestampTz launch_time; + + /* What type of worker is this? */ + LogicalRepWorkerType type; + /* Database id to connect to. */ Oid dbid; @@ -96,6 +105,24 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; +/* + * Shared memory structure for Slot-Sync worker. It is allocated by logical + * replication launcher and then read by each slot sync worker. + * + * It is protected by LWLock (SlotSyncWorkerLock). Each slot sync worker + * reading the structure needs to hold the lock in shared mode, whereas + * the logical replication launcher which updates it needs to hold the lock + * in exclusive mode. + */ +typedef struct SlotSyncWorkerInfo +{ + LogicalWorkerHeader hdr; + + /* The last sync-cycle time when the worker updated any of the slots. */ + TimestampTz last_update_time; + +} SlotSyncWorkerInfo; + /* * State of the transaction in parallel apply worker. * @@ -234,12 +261,16 @@ extern PGDLLIMPORT struct WalReceiverConn *LogRepWorkerWalRcvConn; /* Worker and subscription objects. */ extern PGDLLIMPORT Subscription *MySubscription; extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; +extern PGDLLIMPORT SlotSyncWorkerInfo *SlotSyncWorker; extern PGDLLIMPORT bool in_remote_transaction; extern PGDLLIMPORT bool InitializingApplyWorker; extern void logicalrep_worker_attach(int slot); +extern void slotsync_worker_attach(void); +extern void slotsync_worker_detach(int code, Datum arg); +extern void slotsync_drop_initiated_slots(void); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); extern List *logicalrep_workers_find(Oid subid, bool only_running); @@ -328,9 +359,9 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->in_use && \ +#define isParallelApplyWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->in_use && \ +#define isTablesyncWorker(worker) ((worker)->hdr.in_use && \ (worker)->type == WORKERTYPE_TABLESYNC) static inline bool @@ -342,14 +373,14 @@ am_tablesync_worker(void) static inline bool am_leader_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool am_parallel_apply_worker(void) { - Assert(MyLogicalRepWorker->in_use); + Assert(MyLogicalRepWorker->hdr.in_use); return isParallelApplyWorker(MyLogicalRepWorker); } diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index b038e599c0..0621ee70fc 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -207,6 +207,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_PGSTATS_DATA, LWTRANCHE_LAUNCHER_DSA, LWTRANCHE_LAUNCHER_HASH, + LWTRANCHE_SLOTSYNC_DSA, LWTRANCHE_FIRST_USER_DEFINED, } BuiltinTrancheIds; diff --git a/src/test/recovery/t/050_verify_slot_order.pl b/src/test/recovery/t/050_verify_slot_order.pl index bff0f52a46..e7a00bde12 100644 --- a/src/test/recovery/t/050_verify_slot_order.pl +++ b/src/test/recovery/t/050_verify_slot_order.pl @@ -146,4 +146,131 @@ $result = $subscriber1->safe_psql('postgres', "SELECT count(*) = $primary_row_count FROM tab_int;"); is($result, 't', "subscriber1 gets data from primary after standby1 acknowledges changes"); +# Test logical failover slots on the standby +# Configure standby3 to replicate and synchronize logical slots configured +# for failover on the primary +# +# failover slot lsub1_slot->| ----> subscriber1 (connected via logical replication) +# primary ---> | +# physical slot sb3_slot--->| ----> standby3 (connected via streaming replication) +# | lsub1_slot(synced_slot) + +# Cleanup old standby_slot_names +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = '' +)); +$primary->start; + +$primary->psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb3_slot');}); + +$backup_name = 'backup2'; +$primary->backup($backup_name); + +# Create standby3 +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup( + $primary, $backup_name, + has_streaming => 1, + has_restoring => 1); + +my $connstr_1 = $primary->connstr; +$standby3->stop; +$standby3->append_conf( + 'postgresql.conf', q{ +enable_syncslot = true +hot_standby_feedback = on +primary_slot_name = 'sb3_slot' +}); +$standby3->append_conf( + 'postgresql.conf', qq( +primary_conninfo = '$connstr_1 dbname=postgres' +)); +$standby3->start; + +# Add this standby into the primary's configuration +$primary->stop; +$primary->append_conf( + 'postgresql.conf', qq( +standby_slot_names = 'sb3_slot' +)); +$primary->start; + +# Restart the standby +$standby3->restart; + +# Wait for the standby to start sync +my $offset = -s $standby3->logfile; +$standby3->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? waiting for remote slot \"lsub1_slot\"/, + $offset); + +# Advance lsn on the primary +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); +$primary->safe_psql('postgres', + "SELECT pg_log_standby_snapshot();"); + +# Wait for the standby to finish sync +$offset = -s $standby3->logfile; +$standby3->wait_for_log( + qr/LOG: ( [A-Z0-9]+:)? wait over for remote slot \"lsub1_slot\"/, + $offset); + +# Confirm that logical failover slot is created on the standby +is( $standby3->safe_psql('postgres', + q{SELECT slot_name FROM pg_replication_slots;} + ), + 'lsub1_slot', + 'failover slot was created'); + +# Verify slot properties on the standby +is( $standby3->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|r", + 'logical slot has sync_state as ready and failover as true on standby'); + +# Verify slot properties on the primary +is( $primary->safe_psql('postgres', + q{SELECT failover, sync_state FROM pg_replication_slots WHERE slot_name = 'lsub1_slot';} + ), + "t|n", + 'logical slot has sync_state as none and failover as true on primary'); + +# Test to confirm that restart_lsn of the logical slot on the primary is synced to the standby + +# Truncate table on primary +$primary->safe_psql('postgres', + "TRUNCATE TABLE tab_int;"); + +# Insert data on the primary +$primary->safe_psql('postgres', + "INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);"); + +# let the slots get synced on the standby +sleep 2; + +# Get the restart_lsn for the logical slot lsub1_slot on the primary +my $primary_lsn = $primary->safe_psql('postgres', + "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that restart_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'restart_lsn of slot lsub1_slot synced to standby'); + +# Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary +$primary_lsn = $primary->safe_psql('postgres', + "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); + +# Confirm that confirmed_flush_lsn of lsub1_slot slot is synced to the standby +$result = $standby3->safe_psql('postgres', + qq[SELECT '$primary_lsn' <= confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';]); +is($result, 't', 'confirmed_flush_lsn of slot lsub1_slot synced to the standby'); + done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index cb3b04aa0c..f2e5a3849c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1474,8 +1474,9 @@ pg_replication_slots| SELECT l.slot_name, l.safe_wal_size, l.two_phase, l.conflicting, - l.failover - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover) + l.failover, + l.sync_state + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, sync_state) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index 271313ebf8..aac83755de 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -132,8 +132,9 @@ select name, setting from pg_settings where name like 'enable%'; enable_self_join_removal | on enable_seqscan | on enable_sort | on + enable_syncslot | off enable_tidscan | on -(22 rows) +(23 rows) -- There are always wait event descriptions for various types. select type, count(*) > 0 as ok FROM pg_wait_events diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d80d30e99c..b559974879 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1432,6 +1432,7 @@ LimitState LimitStateCond List ListCell +ListDBForLogicalSlotsCmd ListDictionary ListParsedLex ListenAction @@ -1511,6 +1512,7 @@ LogicalSlotInfo LogicalSlotInfoArr LogicalTape LogicalTapeSet +LogicalWorkerHeader LsnReadQueue LsnReadQueueNextFun LsnReadQueueNextStatus @@ -2314,6 +2316,7 @@ RelocationBufferInfo RelptrFreePageBtree RelptrFreePageManager RelptrFreePageSpanLeader +RemoteSlot RenameStmt ReopenPtrType ReorderBuffer @@ -2572,6 +2575,8 @@ SlabBlock SlabContext SlabSlot SlotNumber +SlotSyncWorker +SlotSyncWorkerInfo SlruCtl SlruCtlData SlruErrorCause @@ -3019,6 +3024,7 @@ WalLevel WalRcvData WalRcvExecResult WalRcvExecStatus +WalRcvFailoverSlotsData WalRcvState WalRcvStreamOptions WalRcvWakeupReason -- 2.30.0.windows.2