From e29ffcf9bac74f5212ebc99b76a0faddad87b3f7 Mon Sep 17 00:00:00 2001 From: Shveta Malik Date: Tue, 16 Jan 2024 15:28:37 +0530 Subject: [PATCH v62 3/6] Slot-sync worker as a special process This patch attempts to start slot-sync worker as a special process which is neither a bgworker nor an Auxiliary process. The benefit we get here is we can control the start-conditions of the worker which further allows us to 'enable_syncslot' as PGC_SIGHUP which was otherwise a PGC_POSTMASTER GUC when slotsync worker was registered as bgworker. --- doc/src/sgml/bgworker.sgml | 65 +--- src/backend/postmaster/bgworker.c | 3 - src/backend/postmaster/postmaster.c | 85 ++++-- src/backend/replication/logical/slotsync.c | 329 ++++++++++++++++----- src/backend/storage/lmgr/proc.c | 9 +- src/backend/tcop/postgres.c | 11 - src/backend/utils/activity/pgstat_io.c | 1 + src/backend/utils/init/miscinit.c | 7 +- src/backend/utils/init/postinit.c | 8 +- src/backend/utils/misc/guc_tables.c | 2 +- src/include/miscadmin.h | 1 + src/include/postmaster/bgworker.h | 1 - src/include/replication/worker_internal.h | 7 +- 13 files changed, 354 insertions(+), 175 deletions(-) diff --git a/doc/src/sgml/bgworker.sgml b/doc/src/sgml/bgworker.sgml index a7cfe6c58c..2c393385a9 100644 --- a/doc/src/sgml/bgworker.sgml +++ b/doc/src/sgml/bgworker.sgml @@ -114,59 +114,18 @@ typedef struct BackgroundWorker bgw_start_time is the server state during which - postgres should start the process. Note that this setting - only indicates when the processes are to be started; they do not stop when - a different state is reached. Possible values are: - - - - BgWorkerStart_PostmasterStart - - - BgWorkerStart_PostmasterStart - Start as soon as postgres itself has finished its own initialization; - processes requesting this are not eligible for database connections. - - - - - - BgWorkerStart_ConsistentState - - - BgWorkerStart_ConsistentState - Start as soon as a consistent state has been reached in a hot-standby, - allowing processes to connect to databases and run read-only queries. - - - - - - BgWorkerStart_ConsistentState_HotStandby - - - BgWorkerStart_ConsistentState_HotStandby - Same meaning as BgWorkerStart_ConsistentState but - it is more strict in terms of the server i.e. start the worker only - if it is hot-standby. - - - - - - BgWorkerStart_RecoveryFinished - - - BgWorkerStart_RecoveryFinished - Start as soon as the system has entered normal read-write state. Note - that the BgWorkerStart_ConsistentState and - BgWorkerStart_RecoveryFinished are equivalent - in a server that's not a hot standby. - - - - - + postgres should start the process; it can be one of + BgWorkerStart_PostmasterStart (start as soon as + postgres itself has finished its own initialization; processes + requesting this are not eligible for database connections), + BgWorkerStart_ConsistentState (start as soon as a consistent state + has been reached in a hot standby, allowing processes to connect to + databases and run read-only queries), and + BgWorkerStart_RecoveryFinished (start as soon as the system has + entered normal read-write state). Note the last two values are equivalent + in a server that's not a hot standby. Note that this setting only indicates + when the processes are to be started; they do not stop when a different state + is reached. diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 46828b8a89..1add449f7c 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -130,9 +130,6 @@ static const struct { "ApplyWorkerMain", ApplyWorkerMain }, - { - "ReplSlotSyncWorkerMain", ReplSlotSyncWorkerMain - }, { "ParallelApplyWorkerMain", ParallelApplyWorkerMain }, diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index d90d5d1576..2fc979b1b8 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -168,11 +168,11 @@ * they will never become live backends. dead_end children are not assigned a * PMChildSlot. dead_end children have bkend_type NORMAL. * - * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. They are tracked via StartupPID and other - * pid_t variables below. (Thus, there can't be more than one of any given - * "special" child process type. We use BackendList entries for any child - * process there can be more than one of.) + * "Special" children such as the startup, bgwriter, autovacuum launcher and + * slot sync worker tasks are not in this list. They are tracked via StartupPID + * and other pid_t variables below. (Thus, there can't be more than one of any + * given "special" child process type. We use BackendList entries for any + * child process there can be more than one of.) */ typedef struct bkend { @@ -255,7 +255,8 @@ static pid_t StartupPID = 0, WalSummarizerPID = 0, AutoVacPID = 0, PgArchPID = 0, - SysLoggerPID = 0; + SysLoggerPID = 0, + SlotSyncWorkerPID = 0; /* Startup process's status */ typedef enum @@ -459,6 +460,9 @@ static void InitPostmasterDeathWatchHandle(void); (pmState == PM_RECOVERY || pmState == PM_HOT_STANDBY))) && \ PgArchCanRestart()) +#define SlotSyncWorkerAllowed() \ + (enable_syncslot && pmState == PM_HOT_STANDBY) + #ifdef EXEC_BACKEND #ifdef WIN32 @@ -1011,12 +1015,6 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); - /* - * Register the slot sync worker here to kick start slot-sync operation - * sooner on the physical standby. - */ - SlotSyncWorkerRegister(); - /* * process any libraries that should be preloaded at postmaster start */ @@ -1837,6 +1835,10 @@ ServerLoop(void) if (PgArchPID == 0 && PgArchStartupAllowed()) PgArchPID = StartArchiver(); + /* If we need to start a slot sync worker, try to do that now */ + if (SlotSyncWorkerPID == 0 && SlotSyncWorkerAllowed()) + SlotSyncWorkerPID = StartSlotSyncWorker(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -2684,6 +2686,8 @@ process_pm_reload_request(void) signal_child(PgArchPID, SIGHUP); if (SysLoggerPID != 0) signal_child(SysLoggerPID, SIGHUP); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGHUP); /* Reload authentication config files too */ if (!load_hba()) @@ -3041,6 +3045,8 @@ process_pm_child_exit(void) AutoVacPID = StartAutoVacLauncher(); if (PgArchStartupAllowed() && PgArchPID == 0) PgArchPID = StartArchiver(); + if (SlotSyncWorkerAllowed() && SlotSyncWorkerPID == 0) + SlotSyncWorkerPID = StartSlotSyncWorker(); /* workers may be scheduled to start now */ maybe_start_bgworkers(); @@ -3211,6 +3217,22 @@ process_pm_child_exit(void) continue; } + /* + * Was it the slot sycn worker? Normal exit or FATAL exit (FATAL can + * be caused by libpqwalreceiver on receiving shutdown request by the + * startup process during promotion) can be ignored; we'll start a new + * one at the next iteration of the postmaster's main loop, if + * necessary. Any other exit condition is treated as a crash. + */ + if (pid == SlotSyncWorkerPID) + { + SlotSyncWorkerPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("Slotsync worker process")); + continue; + } + /* Was it one of our background workers? */ if (CleanupBackgroundWorker(pid, exitstatus)) { @@ -3577,6 +3599,12 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) else if (PgArchPID != 0 && take_action) sigquit_child(PgArchPID); + /* Take care of the slot sync worker too */ + if (pid == SlotSyncWorkerPID) + SlotSyncWorkerPID = 0; + else if (SlotSyncWorkerPID != 0 && take_action) + sigquit_child(SlotSyncWorkerPID); + /* We do NOT restart the syslogger */ if (Shutdown != ImmediateShutdown) @@ -3717,6 +3745,8 @@ PostmasterStateMachine(void) signal_child(WalReceiverPID, SIGTERM); if (WalSummarizerPID != 0) signal_child(WalSummarizerPID, SIGTERM); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, SIGTERM); /* checkpointer, archiver, stats, and syslogger may continue for now */ /* Now transition to PM_WAIT_BACKENDS state to wait for them to die */ @@ -3732,13 +3762,13 @@ PostmasterStateMachine(void) /* * PM_WAIT_BACKENDS state ends when we have no regular backends * (including autovac workers), no bgworkers (including unconnected - * ones), and no walwriter, autovac launcher or bgwriter. If we are - * doing crash recovery or an immediate shutdown then we expect the - * checkpointer to exit as well, otherwise not. The stats and - * syslogger processes are disregarded since they are not connected to - * shared memory; we also disregard dead_end children here. Walsenders - * and archiver are also disregarded, they will be terminated later - * after writing the checkpoint record. + * ones), and no walwriter, autovac launcher or bgwriter or slot sync + * worker. If we are doing crash recovery or an immediate shutdown + * then we expect the checkpointer to exit as well, otherwise not. The + * stats and syslogger processes are disregarded since they are not + * connected to shared memory; we also disregard dead_end children + * here. Walsenders and archiver are also disregarded, they will be + * terminated later after writing the checkpoint record. */ if (CountChildren(BACKEND_TYPE_ALL - BACKEND_TYPE_WALSND) == 0 && StartupPID == 0 && @@ -3748,7 +3778,8 @@ PostmasterStateMachine(void) (CheckpointerPID == 0 || (!FatalError && Shutdown < ImmediateShutdown)) && WalWriterPID == 0 && - AutoVacPID == 0) + AutoVacPID == 0 && + SlotSyncWorkerPID == 0) { if (Shutdown >= ImmediateShutdown || FatalError) { @@ -3846,6 +3877,7 @@ PostmasterStateMachine(void) Assert(CheckpointerPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); + Assert(SlotSyncWorkerPID == 0); /* syslogger is not considered here */ pmState = PM_NO_CHILDREN; } @@ -4069,6 +4101,8 @@ TerminateChildren(int signal) signal_child(AutoVacPID, signal); if (PgArchPID != 0) signal_child(PgArchPID, signal); + if (SlotSyncWorkerPID != 0) + signal_child(SlotSyncWorkerPID, signal); } /* @@ -4881,6 +4915,7 @@ SubPostmasterMain(int argc, char *argv[]) */ if (strcmp(argv[1], "--forkbackend") == 0 || strcmp(argv[1], "--forkavlauncher") == 0 || + strcmp(argv[1], "--forkssworker") == 0 || strcmp(argv[1], "--forkavworker") == 0 || strcmp(argv[1], "--forkaux") == 0 || strcmp(argv[1], "--forkbgworker") == 0) @@ -4984,6 +5019,13 @@ SubPostmasterMain(int argc, char *argv[]) AutoVacWorkerMain(argc - 2, argv + 2); /* does not return */ } + if (strcmp(argv[1], "--forkssworker") == 0) + { + /* Restore basic shared memory pointers */ + InitShmemAccess(UsedShmemSegAddr); + + ReplSlotSyncWorkerMain(argc - 2, argv + 2); /* does not return */ + } if (strcmp(argv[1], "--forkbgworker") == 0) { /* do this as early as possible; in particular, before InitProcess() */ @@ -5806,9 +5848,6 @@ bgworker_should_start_now(BgWorkerStartTime start_time) case PM_HOT_STANDBY: if (start_time == BgWorkerStart_ConsistentState) return true; - if (start_time == BgWorkerStart_ConsistentState_HotStandby && - pmState != PM_RUN) - return true; /* fall through */ case PM_RECOVERY: diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 87d4f5e8b1..5dbb8ab4de 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -27,6 +27,8 @@ * It waits for a period of time before the next synchronization, with the * duration varying based on whether any slots were updated during the last * cycle. Refer to the comments above wait_for_slot_activity() for more details. + * + * Slot synchronization is currently not supported on the cascading standby. *--------------------------------------------------------------------------- */ @@ -40,7 +42,9 @@ #include "commands/dbcommands.h" #include "pgstat.h" #include "postmaster/bgworker.h" +#include "postmaster/fork_process.h" #include "postmaster/interrupt.h" +#include "postmaster/postmaster.h" #include "replication/logical.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" @@ -53,6 +57,8 @@ #include "utils/fmgroids.h" #include "utils/guc_hooks.h" #include "utils/pg_lsn.h" +#include "utils/ps_status.h" +#include "utils/timeout.h" #include "utils/varlena.h" /* @@ -78,12 +84,17 @@ typedef struct RemoteSlot * Struct for sharing information between startup process and slot * sync worker. * - * Slot sync worker's pid is needed by startup process in order to - * shut it down during promotion. + * Slot sync worker's pid is needed by the startup process in order to + * shut it down during promotion. Startup process shuts down the slot + * sync worker and also sets stopSignaled=true to handle the race condition + * when postmaster has not noticed the promotion yet and thus may end up + * restarting slot slyc worker. If stopSignaled is set, the worker will + * exit in such a case. */ typedef struct SlotSyncWorkerCtxStruct { pid_t pid; + bool stopSignaled; slock_t mutex; } SlotSyncWorkerCtxStruct; @@ -92,13 +103,25 @@ SlotSyncWorkerCtxStruct *SlotSyncWorker = NULL; /* GUC variable */ bool enable_syncslot = false; +/* Flag to tell if we are in an slot sync worker process */ +static bool am_slotsync_worker = false; + /* * Sleep time in ms between slot-sync cycles. * See wait_for_slot_activity() for how we adjust this */ static long sleep_ms; +/* Min and Max sleep time for slot sync worker */ +#define MIN_WORKER_NAPTIME_MS 200 +#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ + static void ProcessSlotSyncInterrupts(WalReceiverConn *wrconn); +#ifdef EXEC_BACKEND +static pid_t slotsyncworker_forkexec(void); +#endif +NON_EXEC_STATIC void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); + /* * If necessary, update local slot metadata based on the data from the remote @@ -683,7 +706,8 @@ synchronize_slots(WalReceiverConn *wrconn) * standbys. */ static void -check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) +check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby, + bool *primary_slot_invalid) { #define PRIMARY_INFO_OUTPUT_COL_COUNT 2 WalRcvExecResult *res; @@ -698,8 +722,10 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) StartTransactionCommand(); Assert(am_cascading_standby != NULL); + Assert(primary_slot_invalid != NULL); *am_cascading_standby = false; /* overwritten later if cascading */ + *primary_slot_invalid = false; /* overwritten later if invalid */ initStringInfo(&cmd); appendStringInfo(&cmd, @@ -737,12 +763,15 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) Assert(!isnull); if (!valid) - ereport(ERROR, + { + *primary_slot_invalid = true; + ereport(LOG, errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), /* translator: second %s is a GUC variable name */ errdetail("The primary server slot \"%s\" specified by %s is not valid.", PrimarySlotName, "primary_slot_name")); + } } ExecClearTuple(tupslot); @@ -752,18 +781,18 @@ check_primary_info(WalReceiverConn *wrconn, bool *am_cascading_standby) /* * Check that all necessary GUCs for slot synchronization are set - * appropriately. If not, raise an ERROR. + * appropriately. If not, log the message and pass 'valid' as false + * to the caller. * * If all checks pass, extracts the dbname from the primary_conninfo GUC and * returns it. */ static char * -validate_parameters_and_get_dbname(void) +validate_parameters_and_get_dbname(bool *valid) { char *dbname; - /* Sanity check. */ - Assert(enable_syncslot); + *valid = false; /* * A physical replication slot(primary_slot_name) is required on the @@ -772,10 +801,13 @@ validate_parameters_and_get_dbname(void) * be invalidated. */ if (PrimarySlotName == NULL || strcmp(PrimarySlotName, "") == 0) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), errhint("%s must be defined.", "primary_slot_name")); + return NULL; + } /* * hot_standby_feedback must be enabled to cooperate with the physical @@ -783,30 +815,39 @@ validate_parameters_and_get_dbname(void) * catalog_xmin values on the standby. */ if (!hot_standby_feedback) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), errhint("%s must be enabled.", "hot_standby_feedback")); + return NULL; + } /* * Logical decoding requires wal_level >= logical and we currently only * synchronize logical slots. */ if (wal_level < WAL_LEVEL_LOGICAL) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), errhint("wal_level must be >= logical.")); + return NULL; + } /* * The primary_conninfo is required to make connection to primary for * getting slots information. */ if (PrimaryConnInfo == NULL || strcmp(PrimaryConnInfo, "") == 0) - ereport(ERROR, + { + ereport(LOG, /* translator: %s is a GUC variable name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), errhint("%s must be defined.", "primary_conninfo")); + return NULL; + } /* * The slot sync worker needs a database connection for walrcv_exec to @@ -814,14 +855,61 @@ validate_parameters_and_get_dbname(void) */ dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo); if (dbname == NULL) - ereport(ERROR, + { + ereport(LOG, /* * translator: 'dbname' is a specific option; %s is a GUC variable * name */ - errmsg("exiting from slot synchronization due to bad configuration"), + errmsg("skipping slot synchronization due to bad configuration"), errhint("'dbname' must be specified in %s.", "primary_conninfo")); + return NULL; + } + + /* All good, set valid to true now */ + *valid = true; + + return dbname; +} + +/* + * Check that all necessary GUCs for slot synchronization are set + * appropriately. If not, sleep for MAX_WORKER_NAPTIME_MS and check again. + * The idea is to become no-op until we get valid GUCs values. + * + * If all checks pass, extracts the dbname from the primary_conninfo GUC and + * returns it. + */ +static char * +wait_for_valid_params_and_get_dbname(void) +{ + char *dbname; + int rc; + bool valid; + + /* Sanity check. */ + Assert(enable_syncslot); + + for (;;) + { + dbname = validate_parameters_and_get_dbname(&valid); + if (valid) + break; + else + { + ProcessSlotSyncInterrupts(NULL); + + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + MAX_WORKER_NAPTIME_MS, + WAIT_EVENT_REPL_SLOTSYNC_MAIN); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + + } + } return dbname; } @@ -837,6 +925,7 @@ slotsync_reread_config(void) { char *old_primary_conninfo = pstrdup(PrimaryConnInfo); char *old_primary_slotname = pstrdup(PrimarySlotName); + bool old_enable_syncslot = enable_syncslot; bool old_hot_standby_feedback = hot_standby_feedback; bool conninfo_changed; bool primary_slotname_changed; @@ -849,13 +938,13 @@ slotsync_reread_config(void) if (conninfo_changed || primary_slotname_changed || + old_enable_syncslot != enable_syncslot || (old_hot_standby_feedback != hot_standby_feedback)) { ereport(LOG, errmsg("slot sync worker will restart because of" " a parameter change")); - /* The exit code 1 will make postmaster restart this worker */ - proc_exit(1); + proc_exit(0); } pfree(old_primary_conninfo); @@ -872,7 +961,8 @@ ProcessSlotSyncInterrupts(WalReceiverConn *wrconn) if (ShutdownRequestPending) { - walrcv_disconnect(wrconn); + if (wrconn) + walrcv_disconnect(wrconn); ereport(LOG, errmsg("replication slot sync worker is shutting down" " on receiving SIGINT")); @@ -905,20 +995,16 @@ slotsync_worker_onexit(int code, Datum arg) * sync-cycles is reset to the minimum (200ms). */ static void -wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) +wait_for_slot_activity(bool some_slot_updated, bool recheck_primary_info) { -#define MIN_WORKER_NAPTIME_MS 200 -#define MAX_WORKER_NAPTIME_MS 30000 /* 30s */ - int rc; - if (am_cascading_standby) + if (recheck_primary_info) { /* - * Slot synchronization is currently not supported on cascading - * standby. So if we are on the cascading standby, we will skip the - * sync and take a longer nap before we check again whether we are - * still cascading standby or not. + * If we are on the cascading standby or primary_slot_name configured + * is not valid, then we will skip the sync and take a longer nap + * before we can do check_primary_info() again. */ sleep_ms = MAX_WORKER_NAPTIME_MS; } @@ -954,44 +1040,115 @@ wait_for_slot_activity(bool some_slot_updated, bool am_cascading_standby) * It connects to the primary server, fetches logical failover slots * information periodically in order to create and sync the slots. */ -void -ReplSlotSyncWorkerMain(Datum main_arg) +NON_EXEC_STATIC void +ReplSlotSyncWorkerMain(int argc, char *argv[]) { WalReceiverConn *wrconn = NULL; char *dbname; bool am_cascading_standby; + bool primary_slot_invalid; char *err; + sigjmp_buf local_sigjmp_buf; - ereport(LOG, errmsg("replication slot sync worker started")); + am_slotsync_worker = true; - on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + MyBackendType = B_SLOTSYNC_WORKER; - SpinLockAcquire(&SlotSyncWorker->mutex); + init_ps_display(NULL); + + SetProcessingMode(InitProcessing); + + /* + * Create a per-backend PGPROC struct in shared memory. We must do this + * before we access any shared memory. + */ + InitProcess(); + + /* + * Early initialization. + */ + BaseInit(); + Assert(SlotSyncWorker != NULL); + + SpinLockAcquire(&SlotSyncWorker->mutex); Assert(SlotSyncWorker->pid == InvalidPid); + /* + * Startup process signaled the slot sync worker to stop, so if meanwhile + * postmaster ended up starting the worker again, exit. + */ + if (SlotSyncWorker->stopSignaled) + { + SpinLockRelease(&SlotSyncWorker->mutex); + proc_exit(0); + } + /* Advertise our PID so that the startup process can kill us on promotion */ SlotSyncWorker->pid = MyProcPid; - SpinLockRelease(&SlotSyncWorker->mutex); + ereport(LOG, errmsg("replication slot sync worker started")); + + on_shmem_exit(slotsync_worker_onexit, (Datum) 0); + /* Setup signal handling */ pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGINT, SignalHandlerForShutdownRequest); pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); + + /* + * Establishes SIGALRM handler and initialize timeout module. It is needed + * by InitPostgres to register different timeouts. + */ + InitializeTimeouts(); /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); - dbname = validate_parameters_and_get_dbname(); + /* + * If an exception is encountered, processing resumes here. + * + * We just need to clean up, report the error, and go away. + * + * If we do not have this handling here, then since this worker process + * operates at the bottom of the exception stack, ERRORs turn into FATALs. + * Therefore, we create our own exception handler to catch ERRORs. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Prevents interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* + * We can now go away. Note that because we called InitProcess, a + * callback was registered to do ProcKill, which will clean up + * necessary state. + */ + proc_exit(0); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + BackgroundWorkerUnblockSignals(); + + dbname = wait_for_valid_params_and_get_dbname(); /* * Connect to the database specified by user in primary_conninfo. We need * a database connection for walrcv_exec to work. Please see comments atop * libpqrcv_exec. */ - BackgroundWorkerInitializeConnection(dbname, NULL, 0); + InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL); + + SetProcessingMode(NormalProcessing); /* * Establish the connection to the primary server for slots @@ -1010,26 +1167,27 @@ ReplSlotSyncWorkerMain(Datum main_arg) * cascading standby and validates primary_slot_name for * non-cascading-standbys. */ - check_primary_info(wrconn, &am_cascading_standby); + check_primary_info(wrconn, &am_cascading_standby, &primary_slot_invalid); /* Main wait loop */ for (;;) { bool some_slot_updated = false; + bool recheck_primary_info = am_cascading_standby || primary_slot_invalid; ProcessSlotSyncInterrupts(wrconn); - if (!am_cascading_standby) + if (!recheck_primary_info) some_slot_updated = synchronize_slots(wrconn); - wait_for_slot_activity(some_slot_updated, am_cascading_standby); + wait_for_slot_activity(some_slot_updated, recheck_primary_info); /* * If the standby was promoted then what was previously a cascading * standby might no longer be one, so recheck each time. */ - if (am_cascading_standby) - check_primary_info(wrconn, &am_cascading_standby); + if (recheck_primary_info) + check_primary_info(wrconn, &am_cascading_standby, &primary_slot_invalid); } /* @@ -1046,7 +1204,7 @@ ReplSlotSyncWorkerMain(Datum main_arg) bool IsLogicalSlotSyncWorker(void) { - return SlotSyncWorker->pid == MyProcPid; + return am_slotsync_worker; } /* @@ -1062,6 +1220,7 @@ ShutDownSlotSync(void) return; } + SlotSyncWorker->stopSignaled = true; kill(SlotSyncWorker->pid, SIGINT); SpinLockRelease(&SlotSyncWorker->mutex); @@ -1117,42 +1276,64 @@ SlotSyncWorkerShmemInit(void) } } +#ifdef EXEC_BACKEND /* - * Register the background worker for slots synchronization provided - * enable_syncslot is ON. + * The forkexec routine for the slot sync worker process. + * + * Format up the arglist, then fork and exec. */ -void -SlotSyncWorkerRegister(void) +static pid_t +slotsyncworker_forkexec(void) { - BackgroundWorker bgw; + char *av[10]; + int ac = 0; - if (!enable_syncslot) - { - ereport(LOG, - errmsg("skipping slot synchronization"), - errdetail("enable_syncslot is disabled.")); - return; - } + av[ac++] = "postgres"; + av[ac++] = "--forkssworker"; + av[ac++] = NULL; /* filled in by postmaster_forkexec */ + av[ac] = NULL; + + Assert(ac < lengthof(av)); + + return postmaster_forkexec(ac, av); +} +#endif - memset(&bgw, 0, sizeof(bgw)); +/* + * Main entry point for slot sync worker process, to be called from the + * postmaster. + */ +int +StartSlotSyncWorker(void) +{ + pid_t pid; - /* We need database connection which needs shared-memory access as well */ - bgw.bgw_flags = BGWORKER_SHMEM_ACCESS | - BGWORKER_BACKEND_DATABASE_CONNECTION; +#ifdef EXEC_BACKEND + switch ((pid = slotsyncworker_forkexec())) +#else + switch ((pid = fork_process())) +#endif + { + case -1: + ereport(LOG, + (errmsg("could not fork slot sync worker process: %m"))); + return 0; - /* Start as soon as a consistent state has been reached in a hot standby */ - bgw.bgw_start_time = BgWorkerStart_ConsistentState_HotStandby; +#ifndef EXEC_BACKEND + case 0: + /* in postmaster child ... */ + InitPostmasterChild(); - snprintf(bgw.bgw_library_name, MAXPGPATH, "postgres"); - snprintf(bgw.bgw_function_name, BGW_MAXLEN, "ReplSlotSyncWorkerMain"); - snprintf(bgw.bgw_name, BGW_MAXLEN, - "replication slot sync worker"); - snprintf(bgw.bgw_type, BGW_MAXLEN, - "slot sync worker"); + /* Close the postmaster's sockets */ + ClosePostmasterPorts(false); - bgw.bgw_restart_time = BGW_DEFAULT_RESTART_INTERVAL; - bgw.bgw_notify_pid = 0; - bgw.bgw_main_arg = (Datum) 0; + ReplSlotSyncWorkerMain(0, NULL); + break; +#endif + default: + return (int) pid; + } - RegisterBackgroundWorker(&bgw); + /* shouldn't get here */ + return 0; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 4ad96beb87..ad1352bf76 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -42,6 +42,7 @@ #include "replication/slot.h" #include "replication/syncrep.h" #include "replication/walsender.h" +#include "replication/logicalworker.h" #include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -364,8 +365,11 @@ InitProcess(void) * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in * this; it probably should.) + * + * Slot sync worker does not participate in it, see comments atop Backend. */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildActive(); /* @@ -935,7 +939,8 @@ ProcKill(int code, Datum arg) * way, so tell the postmaster we've cleaned up acceptably well. (XXX * autovac launcher should be included here someday) */ - if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) + if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess() && + !IsLogicalSlotSyncWorker()) MarkPostmasterChildInactive(); /* wake autovac launcher if needed -- see comments in FreeWorkerInfo */ diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 19b08c1b5f..1eaaf3c6c5 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3286,17 +3286,6 @@ ProcessInterrupts(void) */ proc_exit(1); } - else if (IsLogicalSlotSyncWorker()) - { - elog(DEBUG1, - "replication slot sync worker is shutting down due to administrator command"); - - /* - * Slot sync worker can be stopped at any time. Use exit status 1 - * so the background worker is restarted. - */ - proc_exit(1); - } else if (IsBackgroundWorker) ereport(FATAL, (errcode(ERRCODE_ADMIN_SHUTDOWN), diff --git a/src/backend/utils/activity/pgstat_io.c b/src/backend/utils/activity/pgstat_io.c index 43c393d6fe..c5de528b34 100644 --- a/src/backend/utils/activity/pgstat_io.c +++ b/src/backend/utils/activity/pgstat_io.c @@ -341,6 +341,7 @@ pgstat_tracks_io_bktype(BackendType bktype) case B_STANDALONE_BACKEND: case B_STARTUP: case B_WAL_SENDER: + case B_SLOTSYNC_WORKER: return true; } diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c index 23f77a59e5..d5e16f1df4 100644 --- a/src/backend/utils/init/miscinit.c +++ b/src/backend/utils/init/miscinit.c @@ -40,6 +40,7 @@ #include "postmaster/interrupt.h" #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" +#include "replication/logicalworker.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -311,6 +312,9 @@ GetBackendTypeDesc(BackendType backendType) case B_WAL_WRITER: backendDesc = "walwriter"; break; + case B_SLOTSYNC_WORKER: + backendDesc = "slotsyncworker"; + break; } return backendDesc; @@ -837,7 +841,8 @@ InitializeSessionUserIdStandalone(void) * This function should only be called in single-user mode, in autovacuum * workers, and in background workers. */ - Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || IsBackgroundWorker); + Assert(!IsUnderPostmaster || IsAutoVacuumWorkerProcess() || + IsLogicalSlotSyncWorker() || IsBackgroundWorker); /* call only once */ Assert(!OidIsValid(AuthenticatedUserId)); diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 1ad3367159..a5af0f410a 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -43,6 +43,7 @@ #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" #include "replication/slot.h" +#include "replication/logicalworker.h" #include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" @@ -874,10 +875,11 @@ InitPostgres(const char *in_dbname, Oid dboid, * Perform client authentication if necessary, then figure out our * postgres user ID, and see if we are a superuser. * - * In standalone mode and in autovacuum worker processes, we use a fixed - * ID, otherwise we figure it out from the authenticated user name. + * In standalone mode, autovacuum worker processes and slot sync worker + * process, we use a fixed ID, otherwise we figure it out from the + * authenticated user name. */ - if (bootstrap || IsAutoVacuumWorkerProcess()) + if (bootstrap || IsAutoVacuumWorkerProcess() || IsLogicalSlotSyncWorker()) { InitializeSessionUserIdStandalone(); am_superuser = true; diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 0f5ec63de1..5763454336 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2046,7 +2046,7 @@ struct config_bool ConfigureNamesBool[] = }, { - {"enable_syncslot", PGC_POSTMASTER, REPLICATION_STANDBY, + {"enable_syncslot", PGC_SIGHUP, REPLICATION_STANDBY, gettext_noop("Enables a physical standby to synchronize logical failover slots from the primary server."), }, &enable_syncslot, diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h index 0b01c1f093..e89b8121f3 100644 --- a/src/include/miscadmin.h +++ b/src/include/miscadmin.h @@ -337,6 +337,7 @@ typedef enum BackendType B_WAL_RECEIVER, B_WAL_SENDER, B_WAL_SUMMARIZER, + B_SLOTSYNC_WORKER, B_WAL_WRITER, } BackendType; diff --git a/src/include/postmaster/bgworker.h b/src/include/postmaster/bgworker.h index 7092fc72c6..22fc49ec27 100644 --- a/src/include/postmaster/bgworker.h +++ b/src/include/postmaster/bgworker.h @@ -79,7 +79,6 @@ typedef enum BgWorkerStart_PostmasterStart, BgWorkerStart_ConsistentState, BgWorkerStart_RecoveryFinished, - BgWorkerStart_ConsistentState_HotStandby, } BgWorkerStartTime; #define BGW_DEFAULT_RESTART_INTERVAL 60 diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 2167720971..cd530d3d40 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -329,9 +329,10 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); - -extern void ReplSlotSyncWorkerMain(Datum main_arg); -extern void SlotSyncWorkerRegister(void); +#ifdef EXEC_BACKEND +extern void ReplSlotSyncWorkerMain(int argc, char *argv[]) pg_attribute_noreturn(); +#endif +extern int StartSlotSyncWorker(void); extern void ShutDownSlotSync(void); extern void SlotSyncWorkerShmemInit(void); -- 2.34.1