From a0c5734905f01c52ca288fcc146cb7d425c9d52f Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Wed, 24 Jan 2024 16:19:04 +0530 Subject: [PATCH v68 5/8] Slot-sync worker as a special process This patch attempts to start slot-sync worker as a special process which is neither a bgworker nor an Auxiliary process. The benefit we get here is we can control the start-conditions of the worker which further allows us to define 'enable_syncslot' as PGC_SIGHUP which was otherwise a PGC_POSTMASTER GUC when slotsync worker was registered as bgworker. --- doc/src/sgml/bgworker.sgml | 65 +--- src/backend/postmaster/bgworker.c | 3 - src/backend/postmaster/postmaster.c | 86 +++-- src/backend/replication/logical/slotsync.c | 360 ++++++++++++++---- src/backend/storage/lmgr/proc.c | 13 +- src/backend/tcop/postgres.c | 11 - src/backend/utils/activity/pgstat_io.c | 1 + .../utils/activity/wait_event_names.txt | 1 + src/backend/utils/init/miscinit.c | 9 +- src/backend/utils/init/postinit.c | 8 +- src/backend/utils/misc/guc_tables.c | 2 +- src/include/miscadmin.h | 1 + src/include/postmaster/bgworker.h | 1 - src/include/replication/worker_internal.h | 8 +- 14 files changed, 387 insertions(+), 182 deletions(-) diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index a7cfe6c58c..2c393385a9 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,59 +114,18 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process. Note that this setting - only indicates when the processes are to be started; they do not stop when - a different state is reached. Possible values are: - - - - BgWorkerStart_PostmasterStart - - - BgWorkerStart_PostmasterStart - Start as soon as postgres itself has finished its own initialization; - processes requesting this are not eligible for database connections. - - - - - - BgWorkerStart_ConsistentState - - - BgWorkerStart_ConsistentState - Start as soon as a consistent state has been reached in a hot-standby, - allowing processes to connect to databases and run read-only queries. - - - - - - BgWorkerStart_ConsistentState_HotStandby - - - BgWorkerStart_ConsistentState_HotStandby - Same meaning as BgWorkerStart_ConsistentState but - it is more strict in terms of the server i.e. start the worker only - if it is hot-standby. - - - - - - BgWorkerStart_RecoveryFinished - - - BgWorkerStart_RecoveryFinished - Start as soon as the system has entered normal read-write state. Note - that the BgWorkerStart_ConsistentState and - BgWorkerStart_RecoveryFinished are equivalent - in a server that's not a hot standby. - - - - - + postgres should start the process; it can be one of + BgWorkerStart_PostmasterStart (start as soon as + postgres itself has finished its own initialization; processes + requesting this are not eligible for database connections), + BgWorkerStart_ConsistentState (start as soon as a consistent state + has been reached in a hot standby, allowing processes to connect to + databases and run read-only queries), and + BgWorkerStart_RecoveryFinished (start as soon as the system has + entered normal read-write state). Note the last two values are equivalent + in a server that's not a hot standby. Note that this setting only indicates + when the processes are to be started; they do not stop when a different state + is reached. diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 46828b8a89..1add449f7c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -130,9 +130,6 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, - { - "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain - }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d90d5d1576..adfaf75cf9 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -168,11 +168,11 @@ * they will never become live backends. dead_end children are not assigned a * PMChildSlot. dead_end children have bkend_type NORMAL. * - * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. They are tracked via StartupPID and other - * pid_t variables below. (Thus, there can't be more than one of any given - * "special" child process type. We use BackendList entries for any child - * process there can be more than one of.) + * "Special" children such as the startup, bgwriter, autovacuum launcher and + * slot sync worker tasks are not in this list. They are tracked via StartupPID + * and other pid_t variables below. (Thus, there can't be more than one of any + * given "special" child process type. We use BackendList entries for any + * child process there can be more than one of.) */ typedef struct bkend { @@ -255,7 +255,8 @@ static pid_t StartupPID = 0, WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, - SysLoggerPID = 0; + SysLoggerPID = 0, + SlotSyncWorkerPID = 0; /* Startup process's status */ typedef enum @@ -459,6 +460,10 @@ static void InitPostmasterDeathWatchHandle(void); (pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) && \ PgArchCanRestart()) +#define SlotSyncWorkerAllowed() \ + (enable_syncslot && pmState == PM_HOT_STANDBY && \ + SlotSyncWorkerCanRestart()) + #ifdef EXEC_BACKEND #ifdef WIN32 @@ -1011,12 +1016,6 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); - /* - * Register the slot sync worker here to kick start slot-sync operation - * sooner on the physical standby. - */ - SlotSyncWorkerRegister(); - /* * process any libraries that should be preloaded at postmaster start */ @@ -1837,6 +1836,10 @@ ServerLoop(void) if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = StartArchiver(); + /* If we need to start a slot sync worker, try to do that now */ + if (SlotSyncWorkerPID == 0 && SlotSyncWorkerAllowed()) + SlotSyncWorkerPID = StartSlotSyncWorker(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -2684,6 +2687,8 @@ process_pm_reload_request(void) signal_child(PgArchPID, SIGHUP); if (SysLoggerPID != 0) signal_child(SysLoggerPID, SIGHUP); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGHUP); /* Reload authentication config files too */ if (!load_hba()) @@ -3041,6 +3046,8 @@ process_pm_child_exit(void) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = StartArchiver(); + if (SlotSyncWorkerAllowed() && SlotSyncWorkerPID == 0) + SlotSyncWorkerPID = StartSlotSyncWorker(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -3211,6 +3218,22 @@ process_pm_child_exit(void) continue; } + /* + * Was it the slot sync worker? Normal exit or FATAL exit can be + * ignored (FATAL can be caused by libpqwalreceiver on receiving + * shutdown request by the startup process during promotion); we'll + * start a new one at the next iteration of the postmaster's main + * loop, if necessary. Any other exit condition is treated as a crash. + */ + if (pid == SlotSyncWorkerPID) + { + SlotSyncWorkerPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("slot sync worker process")); + continue; + } + /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3577,6 +3600,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (PgArchPID != 0 && take_action) sigquit_child(PgArchPID); + /* Take care of the slot sync worker too */ + if (pid == SlotSyncWorkerPID) + SlotSyncWorkerPID = 0; + else if (SlotSyncWorkerPID != 0 && take_action) + sigquit_child(SlotSyncWorkerPID); + /* We do NOT restart the syslogger */ if (Shutdown != ImmediateShutdown) @@ -3717,6 +3746,8 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); if (WalSummarizerPID != 0) signal_child(WalSummarizerPID, SIGTERM); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3732,13 +3763,13 @@ PostmasterStateMachine(void) /* * PM_WAIT_BACKENDS state ends when we have no regular backends * (including autovac workers), no bgworkers (including unconnected - * ones), and no walwriter, autovac launcher or bgwriter. If we are - * doing crash recovery or an immediate shutdown then we expect the - * checkpointer to exit as well, otherwise not. The stats and - * syslogger processes are disregarded since they are not connected to - * shared memory; we also disregard dead_end children here. Walsenders - * and archiver are also disregarded, they will be terminated later - * after writing the checkpoint record. + * ones), and no walwriter, autovac launcher, bgwriter or slot sync + * worker. If we are doing crash recovery or an immediate shutdown + * then we expect the checkpointer to exit as well, otherwise not. The + * stats and syslogger processes are disregarded since they are not + * connected to shared memory; we also disregard dead_end children + * here. Walsenders and archiver are also disregarded, they will be + * terminated later after writing the checkpoint record. */ if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && @@ -3748,7 +3779,8 @@ PostmasterStateMachine(void) (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && - AutoVacPID == 0) + AutoVacPID == 0 && + SlotSyncWorkerPID == 0) { if (Shutdown >= ImmediateShutdown || FatalError) { @@ -3846,6 +3878,7 @@ PostmasterStateMachine(void) Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); + Assert(SlotSyncWorkerPID == 0); /* syslogger is not considered here */ pmState = PM_NO_CHILDREN; } @@ -4069,6 +4102,8 @@ TerminateChildren(int signal) signal_child(AutoVacPID, signal); if (PgArchPID != 0) signal_child(PgArchPID, signal); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, signal); } /* @@ -4881,6 +4916,7 @@ SubPostmasterMain(int argc, char *argv[]) */ if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || + strcmp(argv[1], "--forkssworker") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkaux") == 0 || strcmp(argv[1], "--forkbgworker") == 0) @@ -4984,6 +5020,13 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkssworker") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */ + } if (strcmp(argv[1], "--forkbgworker") == 0) { /* do this as early as possible; in particular, before InitProcess() */ @@ -5806,9 +5849,6 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; - if (start_time == BgWorkerStart_ConsistentState_HotStandby && - pmState != PM_RUN) - return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 84b66104f7..2a033647b3 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -40,9 +40,12 @@ #include "access/xlogrecovery.h" #include "catalog/pg_database.h" #include "commands/dbcommands.h" +#include "libpq/pqsignal.h" #include "pgstat.h" #include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" #include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" #include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" @@ -55,6 +58,8 @@ #include "utils/fmgroids.h" #include "utils/guc_hooks.h" #include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" #include "utils/varlena.h" /* @@ -98,7 +103,8 @@ SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; /* GUC variable */ bool enable_syncslot = false; -/* The sleep time (ms) between slot-sync cycles varies dynamically +/* + * The sleep time (ms) between slot-sync cycles varies dynamically * (within a MIN/MAX range) according to slot activity. See * wait_for_slot_activity() for details. */ @@ -107,8 +113,16 @@ bool enable_syncslot = false; static long sleep_ms = MIN_WORKER_NAPTIME_MS; +/* Flag to tell if we are in a slot sync worker process */ +static bool am_slotsync_worker = false; + static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); +#ifdef EXEC_BACKEND +static pid_t slotsyncworker_forkexec(void); +#endif +NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); + /* * If necessary, update local slot metadata based on the data from the remote * slot. @@ -712,7 +726,8 @@ synchronize_slots(WalReceiverConn *wrconn) * standbys. */ static void -check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby, + bool *primary_slot_invalid) { #define PRIMARY_INFO_OUTPUT_COL_COUNT 2 WalRcvExecResult *res; @@ -727,8 +742,10 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) StartTransactionCommand(); Assert(am_cascading_standby != NULL); + Assert(primary_slot_invalid != NULL); *am_cascading_standby = false; /* overwritten later if cascading */ + *primary_slot_invalid = false; /* overwritten later if invalid */ initStringInfo(&cmd); appendStringInfo(&cmd, @@ -766,13 +783,16 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) Assert(!isnull); if (!valid) - ereport(ERROR, + { + *primary_slot_invalid = true; + ereport(LOG, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("bad configuration for slot synchronization"), /* translator: second %s is a GUC variable name */ errdetail("The primary server slot \"%s\" specified by" " \"%s\" is not valid.", PrimarySlotName, "primary_slot_name")); + } } ExecClearTuple(tupslot); @@ -781,20 +801,15 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) } /* - * Check that all necessary GUCs for slot synchronization are set - * appropriately. If not, raise an ERROR. + * Returns true if all necessary GUCs for slot synchronization are set + * appropriately, otherwise returns false. * * If all checks pass, extracts the dbname from the primary_conninfo GUC and - * returns it. + * and return it in output dbname arg. */ -static char * -validate_parameters_and_get_dbname(void) +static bool +validate_parameters_and_get_dbname(char **dbname) { - char *dbname; - - /* Sanity check. */ - Assert(enable_syncslot); - /* * A physical replication slot(primary_slot_name) is required on the * primary to ensure that the rows needed by the standby are not removed @@ -802,10 +817,13 @@ validate_parameters_and_get_dbname(void) * be invalidated. */ if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be defined.", "primary_slot_name")); + return false; + } /* * hot_standby_feedback must be enabled to cooperate with the physical @@ -813,45 +831,94 @@ validate_parameters_and_get_dbname(void) * catalog_xmin values on the standby. */ if (!hot_standby_feedback) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be enabled.", "hot_standby_feedback")); + return false; + } /* * Logical decoding requires wal_level >= logical and we currently only * synchronize logical slots. */ if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, - /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + { + ereport(LOG, + errmsg("bad configuration for slot synchronization"), errhint("\"wal_level\" must be >= logical.")); + return false; + } /* * The primary_conninfo is required to make connection to primary for * getting slots information. */ if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("bad configuration for slot synchronization"), errhint("\"%s\" must be defined.", "primary_conninfo")); + return false; + } /* * The slot sync worker needs a database connection for walrcv_exec to * work. */ - dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); - if (dbname == NULL) - ereport(ERROR, + *dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); + if (*dbname == NULL) + { + ereport(LOG, /* * translator: 'dbname' is a specific option; %s is a GUC variable * name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("bad configuration for slot synchronization"), errhint("'dbname' must be specified in \"%s\".", "primary_conninfo")); + return false; + } + + return true; +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, sleep for MAX_WORKER_NAPTIME_MS and check again. + * The idea is to become no-op until we get valid GUCs values. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static char * +wait_for_valid_params_and_get_dbname(void) +{ + char *dbname; + int rc; + + /* Sanity check. */ + Assert(enable_syncslot); + + for (;;) + { + if (validate_parameters_and_get_dbname(&dbname)) + break; + + ereport(LOG, errmsg("skipping slot synchronization")); + + ProcessSlotSyncInterrupts(NULL); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + MAX_WORKER_NAPTIME_MS, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } return dbname; } @@ -867,16 +934,28 @@ slotsync_reread_config(void) { char *old_primary_conninfo = pstrdup(PrimaryConnInfo); char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_enable_syncslot = enable_syncslot; bool old_hot_standby_feedback = hot_standby_feedback; bool conninfo_changed; bool primary_slotname_changed; + Assert(enable_syncslot); + ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); conninfo_changed = strcmp(old_primary_conninfo, PrimaryConnInfo) != 0; primary_slotname_changed = strcmp(old_primary_slotname, PrimarySlotName) != 0; + if (old_enable_syncslot != enable_syncslot) + { + ereport(LOG, + /* translator: %s is a GUC variable name */ + errmsg("slot sync worker will shutdown because" + " %s is disabled", "enable_syncslot")); + proc_exit(0); + } + if (conninfo_changed || primary_slotname_changed || (old_hot_standby_feedback != hot_standby_feedback)) @@ -884,8 +963,7 @@ slotsync_reread_config(void) ereport(LOG, errmsg("slot sync worker will restart because of a parameter change")); - /* The exit code 1 will make postmaster restart this worker */ - proc_exit(1); + proc_exit(0); } pfree(old_primary_conninfo); @@ -902,7 +980,8 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) if (ShutdownRequestPending) { - walrcv_disconnect(wrconn); + if (wrconn) + walrcv_disconnect(wrconn); ereport(LOG, errmsg("replication slot sync worker is shutting down on receiving SIGINT")); proc_exit(0); @@ -934,17 +1013,16 @@ slotsync_worker_onexit(int code, Datum arg) * sync-cycles is reset to the minimum (200ms). */ static void -wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) +wait_for_slot_activity(bool some_slot_updated, bool recheck_primary_info) { int rc; - if (am_cascading_standby) + if (recheck_primary_info) { /* - * Slot synchronization is currently not supported on cascading - * standby. So if we are on the cascading standby, we will skip the - * sync and take a longer nap before we check again whether we are - * still cascading standby or not. + * If we are on the cascading standby or primary_slot_name configured + * is not valid, then we will skip the sync and take a longer nap + * before we can do check_primary_info() again. */ sleep_ms = MAX_WORKER_NAPTIME_MS; } @@ -980,20 +1058,38 @@ wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) * It connects to the primary server, fetches logical failover slots * information periodically in order to create and sync the slots. */ -void -ReplSlotSyncWorkerMain(Datum main_arg) +NON_EXEC_STATIC void +ReplSlotSyncWorkerMain(int argc, char *argv[]) { WalReceiverConn *wrconn = NULL; char *dbname; bool am_cascading_standby; + bool primary_slot_invalid; char *err; + sigjmp_buf local_sigjmp_buf; - ereport(LOG, errmsg("replication slot sync worker started")); + am_slotsync_worker = true; - on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + MyBackendType = B_SLOTSYNC_WORKER; - SpinLockAcquire(&SlotSyncWorker->mutex); + init_ps_display(NULL); + + SetProcessingMode(InitProcessing); + /* + * Create a per-backend PGPROC struct in shared memory. We must do this + * before we access any shared memory. + */ + InitProcess(); + + /* + * Early initialization. + */ + BaseInit(); + + Assert(SlotSyncWorker != NULL); + + SpinLockAcquire(&SlotSyncWorker->mutex); Assert(SlotSyncWorker->pid == InvalidPid); /* @@ -1008,26 +1104,77 @@ ReplSlotSyncWorkerMain(Datum main_arg) /* Advertise our PID so that the startup process can kill us on promotion */ SlotSyncWorker->pid = MyProcPid; - SpinLockRelease(&SlotSyncWorker->mutex); + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGINT, SignalHandlerForShutdownRequest); pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); + pqsignal(SIGFPE, FloatExceptionHandler); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGCHLD, SIG_DFL); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - dbname = validate_parameters_and_get_dbname(); + /* + * If an exception is encountered, processing resumes here. + * + * We just need to clean up, report the error, and go away. + * + * If we do not have this handling here, then since this worker process + * operates at the bottom of the exception stack, ERRORs turn into FATALs. + * Therefore, we create our own exception handler to catch ERRORs. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. + */ + proc_exit(0); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* + * Unblock signals (they were blocked when the postmaster forked us) + */ + sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); + + dbname = wait_for_valid_params_and_get_dbname(); /* * Connect to the database specified by user in primary_conninfo. We need * a database connection for walrcv_exec to work. Please see comments atop * libpqrcv_exec. */ - BackgroundWorkerInitializeConnection(dbname, NULL, 0); + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); /* * Establish the connection to the primary server for slots @@ -1046,26 +1193,29 @@ ReplSlotSyncWorkerMain(Datum main_arg) * cascading standby and validates primary_slot_name for * non-cascading-standbys. */ - check_primary_info(wrconn, &am_cascading_standby); + check_primary_info(wrconn, &am_cascading_standby, &primary_slot_invalid); /* Main wait loop */ for (;;) { bool some_slot_updated = false; + bool recheck_primary_info = am_cascading_standby || primary_slot_invalid; ProcessSlotSyncInterrupts(wrconn); - if (!am_cascading_standby) + if (!recheck_primary_info) some_slot_updated = synchronize_slots(wrconn); + else if (primary_slot_invalid) + ereport(LOG, errmsg("skipping slot synchronization")); - wait_for_slot_activity(some_slot_updated, am_cascading_standby); + wait_for_slot_activity(some_slot_updated, recheck_primary_info); /* * If the standby was promoted then what was previously a cascading * standby might no longer be one, so recheck each time. */ - if (am_cascading_standby) - check_primary_info(wrconn, &am_cascading_standby); + if (recheck_primary_info) + check_primary_info(wrconn, &am_cascading_standby, &primary_slot_invalid); } /* @@ -1081,7 +1231,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) bool IsLogicalSlotSyncWorker(void) { - return SlotSyncWorker->pid == MyProcPid; + return am_slotsync_worker; } /* @@ -1111,7 +1261,7 @@ ShutDownSlotSync(void) /* Wait a bit, we don't expect to have to wait long */ rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - 10L, WAIT_EVENT_BGWORKER_SHUTDOWN); + 10L, WAIT_EVENT_REPL_SLOTSYNC_SHUTDOWN); if (rc & WL_LATCH_SET) { @@ -1154,42 +1304,92 @@ SlotSyncWorkerShmemInit(void) } } +#ifdef EXEC_BACKEND /* - * Register the background worker for slots synchronization provided - * enable_syncslot is ON. + * The forkexec routine for the slot sync worker process. + * + * Format up the arglist, then fork and exec. */ -void -SlotSyncWorkerRegister(void) +static pid_t +slotsyncworker_forkexec(void) { - BackgroundWorker bgw; + char *av[10]; + int ac = 0; - if (!enable_syncslot) - { - ereport(LOG, - errmsg("skipping slot synchronization"), - errdetail("\"enable_syncslot\" is disabled.")); - return; - } + av[ac++] = "postgres"; + av[ac++] = "--forkssworker"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; - memset(&bgw, 0, sizeof(bgw)); + Assert(ac < lengthof(av)); - /* We need database connection which needs shared-memory access as well */ - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | - BGWORKER_BACKEND_DATABASE_CONNECTION; + return postmaster_forkexec(ac, av); +} +#endif - /* Start as soon as a consistent state has been reached in a hot standby */ - bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; +/* + * SlotSyncWorkerCanRestart + * + * Returns true if the worker is allowed to restart if enough time has + * passed (SLOTSYNC_RESTART_INTERVAL_SEC) since it was launched last. + * Otherwise returns false. + * + * This is a safety valve to protect against continuous respawn attempts if the + * worker is dying immediately at launch. Note that since we will retry to + * launch the worker from the postmaster main loop, we will get another + * chance later. + */ +bool +SlotSyncWorkerCanRestart(void) +{ +#define SLOTSYNC_RESTART_INTERVAL_SEC 10 - snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "replication slot sync worker"); - snprintf(bgw.bgw_type, BGW_MAXLEN, - "slot sync worker"); + static time_t last_slotsync_start_time = 0; + time_t curtime = time(NULL); - bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; - bgw.bgw_notify_pid = 0; - bgw.bgw_main_arg = (Datum) 0; + /* Return false if too soon since last start. */ + if ((unsigned int) (curtime - last_slotsync_start_time) < + (unsigned int) SLOTSYNC_RESTART_INTERVAL_SEC) + return false; + + last_slotsync_start_time = curtime; + return true; +} + +/* + * Main entry point for slot sync worker process, to be called from the + * postmaster. + */ +int +StartSlotSyncWorker(void) +{ + pid_t pid; + +#ifdef EXEC_BACKEND + switch ((pid = slotsyncworker_forkexec())) + { +#else + switch ((pid = fork_process())) + { + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); + + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); + + ReplSlotSyncWorkerMain(0, NULL); + break; +#endif + case -1: + ereport(LOG, + (errmsg("could not fork slot sync worker process: %m"))); + return 0; + + default: + return (int) pid; + } - RegisterBackgroundWorker(&bgw); + /* shouldn't get here */ + return 0; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 4ad96beb87..aa53a57077 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -42,6 +42,7 @@ #include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walsender.h" +#include "replication/logicalworker.h" #include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -364,8 +365,12 @@ InitProcess(void) * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in * this; it probably should.) + * + * Slot sync worker also does not participate in it, see comments atop + * 'struct bkend' in postmaster.c. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildActive(); /* @@ -934,8 +939,12 @@ ProcKill(int code, Datum arg) * This process is no longer present in shared memory in any meaningful * way, so tell the postmaster we've cleaned up acceptably well. (XXX * autovac launcher should be included here someday) + * + * Slot sync worker is also not a postmaster child, so skip this shared + * memory related processing here. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildInactive(); /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index e8c530acd9..1a34bd3715 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,17 +3286,6 @@ ProcessInterrupts(void) */ proc_exit(1); } - else if (IsLogicalSlotSyncWorker()) - { - elog(DEBUG1, - "replication slot sync worker is shutting down due to administrator command"); - - /* - * Slot sync worker can be stopped at any time. Use exit status 1 - * so the background worker is restarted. - */ - proc_exit(1); - } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index 43c393d6fe..9d6e067382 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -338,6 +338,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_BG_WORKER: case B_BG_WRITER: case B_CHECKPOINTER: + case B_SLOTSYNC_WORKER: case B_STANDALONE_BACKEND: case B_STARTUP: case B_WAL_SENDER: diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index 3e6203322a..4c0ee2dd29 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -54,6 +54,7 @@ LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher proc LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." REPL_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPL_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." WAL_SENDER_MAIN "Waiting in main loop of WAL sender process." diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 23f77a59e5..309aa33a62 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -293,6 +294,9 @@ GetBackendTypeDesc(BackendType backendType) case B_LOGGER: backendDesc = "logger"; break; + case B_SLOTSYNC_WORKER: + backendDesc = "slotsyncworker"; + break; case B_STANDALONE_BACKEND: backendDesc = "standalone backend"; break; @@ -835,9 +839,10 @@ InitializeSessionUserIdStandalone(void) { /* * This function should only be called in single-user mode, in autovacuum - * workers, and in background workers. + * workers, in slot sync worker and in background workers. */ - Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); + Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || + IsLogicalSlotSyncWorker() || IsBackgroundWorker); /* call only once */ Assert(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 1ad3367159..a5af0f410a 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -43,6 +43,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" #include "replication/slot.h" +#include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -874,10 +875,11 @@ InitPostgres(const char *in_dbname, Oid dboid, * Perform client authentication if necessary, then figure out our * postgres user ID, and see if we are a superuser. * - * In standalone mode and in autovacuum worker processes, we use a fixed - * ID, otherwise we figure it out from the authenticated user name. + * In standalone mode, autovacuum worker processes and slot sync worker + * process, we use a fixed ID, otherwise we figure it out from the + * authenticated user name. */ - if (bootstrap || IsAutoVacuumWorkerProcess()) + if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker()) { InitializeSessionUserIdStandalone(); am_superuser = true; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index fe044c16de..3af11b2b80 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2056,7 +2056,7 @@ struct config_bool ConfigureNamesBool[] = }, { - {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + {"enable_syncslot", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), }, &enable_syncslot, diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 0b01c1f093..65819cb7a7 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -332,6 +332,7 @@ typedef enum BackendType B_BG_WRITER, B_CHECKPOINTER, B_LOGGER, + B_SLOTSYNC_WORKER, B_STANDALONE_BACKEND, B_STARTUP, B_WAL_RECEIVER, diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 7092fc72c6..22fc49ec27 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,7 +79,6 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, - BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2167720971..aa01f63648 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -329,9 +329,11 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); - -extern void ReplSlotSyncWorkerMain(Datum main_arg); -extern void SlotSyncWorkerRegister(void); +#ifdef EXEC_BACKEND +extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); +#endif +extern int StartSlotSyncWorker(void); +extern bool SlotSyncWorkerCanRestart(void); extern void ShutDownSlotSync(void); extern void SlotSyncWorkerShmemInit(void); -- 2.30.0.windows.2