From 2425bf35af1e7bc690830fd0875fc7e08f274ae2 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 2 Dec 2024 15:55:18 +0200 Subject: [PATCH v7 2/4] Fix lost wakeup issue in logical replication launcher Fix it by using a separate interrupt bit for subscription changes. Discussion: https://www.postgresql.org/message-id/476672e7-62f1-4cab-a822-f3a8e949dd3f@iki.fi Discussion: https://www.postgresql.org/message-id/ff0663d9-8011-420f-a169-efbf57327cb5@iki.fi --- src/backend/replication/logical/launcher.c | 22 ++++++++++++++-------- src/include/postmaster/interrupt.h | 5 ++++- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 071d93ce816..d915b9a9fce 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -56,7 +56,7 @@ LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct { /* Supervisor process. */ - pid_t launcher_pid; + ProcNumber launcher_procno; /* Hash table holding last start times of subscriptions' apply workers. */ dsa_handle last_start_dsa; @@ -804,7 +804,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) static void logicalrep_launcher_onexit(int code, Datum arg) { - LogicalRepCtx->launcher_pid = 0; + LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER; } /* @@ -964,6 +964,7 @@ ApplyLauncherShmemInit(void) memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); + LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER; LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID; LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID; @@ -1109,8 +1110,12 @@ ApplyLauncherWakeupAtCommit(void) static void ApplyLauncherWakeup(void) { - if (LogicalRepCtx->launcher_pid != 0) - kill(LogicalRepCtx->launcher_pid, SIGUSR1); + volatile LogicalRepCtxStruct *repctx = LogicalRepCtx; + ProcNumber launcher_procno; + + launcher_procno = repctx->launcher_procno; + if (launcher_procno != INVALID_PROC_NUMBER) + SendInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE, launcher_procno); } /* @@ -1124,8 +1129,8 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); - Assert(LogicalRepCtx->launcher_pid == 0); - LogicalRepCtx->launcher_pid = MyProcPid; + Assert(LogicalRepCtx->launcher_procno == INVALID_PROC_NUMBER); + LogicalRepCtx->launcher_procno = MyProcNumber; /* Establish signal handlers. */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -1157,6 +1162,7 @@ ApplyLauncherMain(Datum main_arg) oldctx = MemoryContextSwitchTo(subctx); /* Start any missing workers for enabled subscriptions. */ + ClearInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE); sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1213,7 +1219,7 @@ ApplyLauncherMain(Datum main_arg) MemoryContextDelete(subctx); /* Wait for more work. */ - rc = WaitInterrupt(INTERRUPT_GENERAL, + rc = WaitInterrupt(INTERRUPT_GENERAL | INTERRUPT_SUBSCRIPTION_CHANGE, WL_INTERRUPT | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); @@ -1240,7 +1246,7 @@ ApplyLauncherMain(Datum main_arg) bool IsLogicalLauncher(void) { - return LogicalRepCtx->launcher_pid == MyProcPid; + return LogicalRepCtx->launcher_procno == MyProcNumber; } /* diff --git a/src/include/postmaster/interrupt.h b/src/include/postmaster/interrupt.h index 346f9cae8e1..09dc6f0330d 100644 --- a/src/include/postmaster/interrupt.h +++ b/src/include/postmaster/interrupt.h @@ -113,11 +113,14 @@ typedef enum INTERRUPT_GENERAL = 1 << 1, /* - * INTERRUPT_RECOVERY_WAKEUP is used to wake up startup process, to tell + * INTERRUPT_RECOVERY_CONTINUE is used to wake up startup process, to tell * it that it should continue WAL replay. It's sent by WAL receiver when * more WAL arrives, or when promotion is requested. */ INTERRUPT_RECOVERY_CONTINUE = 1 << 2, + + /* sent to logical replication launcher, when a subscription changes */ + INTERRUPT_SUBSCRIPTION_CHANGE = 1 << 3, } InterruptType; /* -- 2.39.5