From c1e72ebc3ca840cb75d3fd004abba1944a028304 Mon Sep 17 00:00:00 2001 From: Heikki Linnakangas Date: Thu, 6 Nov 2025 11:21:39 +0200 Subject: [PATCH v3 1/3] Escalate ERRORs during async notify processing to FATAL Previously, if async notify processing encountered an error, we would report the error to the client and advance our read position past the offending entry to prevent trying to process it over and over again. Trying to continue after an error has a few problems however: - We have no way of telling the client that a notification was lost. It's not clear if keeping the connection alive after losing a notification is a good thing. Depending on the application logic, missing a notification could for example cause the application to get stuck waiting. - If the connection is idle, PqCommReadingMsg is set and any ERROR is turned into FATAL anyway. - We bailed out of the notification processing loop on first error without processing any subsequent notifications, until another notify interrupt arrives. For example, if there were two notifications pending, and processing the first one caused an ERROR, the second notification would not be processed until someone sent a new NOTIFY. This commit changes the behavior so that any ERROR while processing async notifications is turned into FATAL, causing the client connection to be dropped. That makes the behavior more consistent as that's what happened in idle state already, and dropping the connection is a clear signal to the application that it might've missed some notifications. The reason to do this now is that the next commits will change the notification processing code in a way that would make it harder to skip over just the offending notification entry on error. Discussion: https://www.postgresql.org/message-id/fedbd908-4571-4bbe-b48e-63bfdcc38f64@iki.fi --- src/backend/commands/async.c | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..6b844808ef3 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -446,7 +446,7 @@ static double asyncQueueUsage(void); static void asyncQueueFillWarning(void); static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); -static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, +static bool asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot); @@ -1850,7 +1850,7 @@ ProcessNotifyInterrupt(bool flush) static void asyncQueueReadAllNotifications(void) { - volatile QueuePosition pos; + QueuePosition pos; QueuePosition head; Snapshot snapshot; @@ -1920,16 +1920,25 @@ asyncQueueReadAllNotifications(void) * It is possible that we fail while trying to send a message to our * frontend (for example, because of encoding conversion failure). If * that happens it is critical that we not try to send the same message - * over and over again. Therefore, we place a PG_TRY block here that will - * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake NotifyQueueLock and move the position - * before handling each individual message, but that seems like too much - * lock traffic.) + * over and over again. Therefore, we set ExitOnAnyError to upgrade any + * ERRORs to FATAL, causing the client connection to be closed on error. + * + * We used to only skip over only the offending message and try to soldier + * on, but it was a little questionable to lose a notification and give + * the client ERRORs instead. A client application would not be prepared + * for that and can't tell that a notification was missed. It was also + * not very useful in practice because notifications are often processed + * while a connection is idle and reading a message from the client, and + * in that state, any error is upgraded to FATAL anyway. Closing the + * connection is a clear signal to the application that it might have + * missed notifications. */ - PG_TRY(); { + bool save_ExitOnAnyError = ExitOnAnyError; bool reachedStop; + ExitOnAnyError = true; + do { int64 curpage = QUEUE_POS_PAGE(pos); @@ -1982,15 +1991,14 @@ asyncQueueReadAllNotifications(void) page_buffer.buf, snapshot); } while (!reachedStop); - } - PG_FINALLY(); - { + /* Update shared state */ LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyProcNumber) = pos; LWLockRelease(NotifyQueueLock); + + ExitOnAnyError = save_ExitOnAnyError; } - PG_END_TRY(); /* Done with snapshot */ UnregisterSnapshot(snapshot); @@ -2013,7 +2021,7 @@ asyncQueueReadAllNotifications(void) * The QueuePosition *current is advanced past all processed messages. */ static bool -asyncQueueProcessPageEntries(volatile QueuePosition *current, +asyncQueueProcessPageEntries(QueuePosition *current, QueuePosition stop, char *page_buffer, Snapshot snapshot) -- 2.47.3