From a36660ae7a8de490ca0db94881fd1061ee1c63b2 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Mon, 2 Dec 2024 15:55:18 +0200 Subject: [PATCH v6 3/5] Fix lost wakeup issue in logical replication launcher Fix it by using a separate interrupt bit for subscription changes. Discussion: https://www.postgresql.org/message-id/476672e7-62f1-4cab-a822-f3a8e949dd3f@iki.fi Discussion: https://www.postgresql.org/message-id/ff0663d9-8011-420f-a169-efbf57327cb5@iki.fi --- src/backend/replication/logical/launcher.c | 23 ++++++++++++++-------- src/include/storage/interrupt.h | 5 ++++- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 6578c83194b..60b28aeb2f7 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -57,7 +57,7 @@ LogicalRepWorker *MyLogicalRepWorker = NULL; typedef struct LogicalRepCtxStruct { /* Supervisor process. */ - pid_t launcher_pid; + ProcNumber launcher_procno; /* Hash table holding last start times of subscriptions' apply workers. */ dsa_handle last_start_dsa; @@ -805,7 +805,7 @@ logicalrep_worker_cleanup(LogicalRepWorker *worker) static void logicalrep_launcher_onexit(int code, Datum arg) { - LogicalRepCtx->launcher_pid = 0; + LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER; } /* @@ -965,6 +965,7 @@ ApplyLauncherShmemInit(void) memset(LogicalRepCtx, 0, ApplyLauncherShmemSize()); + LogicalRepCtx->launcher_procno = INVALID_PROC_NUMBER; LogicalRepCtx->last_start_dsa = DSA_HANDLE_INVALID; LogicalRepCtx->last_start_dsh = DSHASH_HANDLE_INVALID; @@ -1110,8 +1111,12 @@ ApplyLauncherWakeupAtCommit(void) static void ApplyLauncherWakeup(void) { - if (LogicalRepCtx->launcher_pid != 0) - kill(LogicalRepCtx->launcher_pid, SIGUSR1); + volatile LogicalRepCtxStruct *repctx = LogicalRepCtx; + ProcNumber launcher_procno; + + launcher_procno = repctx->launcher_procno; + if (launcher_procno != INVALID_PROC_NUMBER) + SendInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE, launcher_procno); } /* @@ -1125,8 +1130,8 @@ ApplyLauncherMain(Datum main_arg) before_shmem_exit(logicalrep_launcher_onexit, (Datum) 0); - Assert(LogicalRepCtx->launcher_pid == 0); - LogicalRepCtx->launcher_pid = MyProcPid; + Assert(LogicalRepCtx->launcher_procno == INVALID_PROC_NUMBER); + LogicalRepCtx->launcher_procno = MyProcNumber; /* Establish signal handlers. */ pqsignal(SIGHUP, SignalHandlerForConfigReload); @@ -1158,6 +1163,7 @@ ApplyLauncherMain(Datum main_arg) oldctx = MemoryContextSwitchTo(subctx); /* Start any missing workers for enabled subscriptions. */ + ClearInterrupt(INTERRUPT_SUBSCRIPTION_CHANGE); sublist = get_subscription_list(); foreach(lc, sublist) { @@ -1214,7 +1220,8 @@ ApplyLauncherMain(Datum main_arg) MemoryContextDelete(subctx); /* Wait for more work. */ - rc = WaitInterrupt(1 << INTERRUPT_GENERAL, + rc = WaitInterrupt(1 << INTERRUPT_GENERAL | + 1 << INTERRUPT_SUBSCRIPTION_CHANGE, WL_INTERRUPT | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, wait_time, WAIT_EVENT_LOGICAL_LAUNCHER_MAIN); @@ -1241,7 +1248,7 @@ ApplyLauncherMain(Datum main_arg) bool IsLogicalLauncher(void) { - return LogicalRepCtx->launcher_pid == MyProcPid; + return LogicalRepCtx->launcher_procno == MyProcNumber; } /* diff --git a/src/include/storage/interrupt.h b/src/include/storage/interrupt.h index 779d61a8f07..8a880cd4c29 100644 --- a/src/include/storage/interrupt.h +++ b/src/include/storage/interrupt.h @@ -110,11 +110,14 @@ typedef enum INTERRUPT_GENERAL, /* - * INTERRUPT_RECOVERY_WAKEUP is used to wake up startup process, to tell + * INTERRUPT_RECOVERY_CONTINUE is used to wake up startup process, to tell * it that it should continue WAL replay. It's sent by WAL receiver when * more WAL arrives, or when promotion is requested. */ INTERRUPT_RECOVERY_CONTINUE, + + /* sent to logical replication launcher, when a subscription changes */ + INTERRUPT_SUBSCRIPTION_CHANGE, } InterruptType; /* -- 2.39.5