From 14a48226ba86fa98ea00f2e7b9c2fb9f98dffc32 Mon Sep 17 00:00:00 2001 From: rbagga Date: Sun, 7 Sep 2025 16:55:57 -0700 Subject: [PATCH] Implement WAL-based async notifications for improved throughput - Added WAL logging for async notifications to improve scalability - Implemented async resource manager for WAL-based notification handling - Added new async descriptor files for pg_waldump support - Updated makefiles and build configuration for new components - Pin WAL files if they contain unread notifications - Maintain min lsn across committed / in flight notifications - Ensure space in queue before entering critical section - Added TAP test to ensure error if queue is full - Added TAP test to ensure WAL records necessary for notifications are pinned --- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/asyncdesc.c | 47 + src/backend/access/rmgrdesc/meson.build | 1 + src/backend/access/rmgrdesc/xactdesc.c | 13 + src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/xact.c | 44 +- src/backend/access/transam/xlog.c | 45 +- src/backend/commands/async.c | 1257 +++++++++++------ src/bin/pg_rewind/parsexlog.c | 1 + src/bin/pg_waldump/rmgrdesc.c | 2 + src/include/access/async_xlog.h | 43 + src/include/access/rmgrlist.h | 1 + src/include/access/xact.h | 9 + src/include/commands/async.h | 27 + src/include/storage/proc.h | 3 + src/test/modules/test_listen_notify/Makefile | 17 + .../modules/test_listen_notify/meson.build | 13 + .../test_listen_notify/t/002_queue_full.pl | 89 ++ .../test_listen_notify/t/003_wal_pin_test.pl | 102 ++ 19 files changed, 1267 insertions(+), 449 deletions(-) create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c create mode 100644 src/include/access/async_xlog.h create mode 100644 src/test/modules/test_listen_notify/Makefile create mode 100644 src/test/modules/test_listen_notify/meson.build create mode 100644 src/test/modules/test_listen_notify/t/002_queue_full.pl create mode 100644 src/test/modules/test_listen_notify/t/003_wal_pin_test.pl diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index cd95eec37f1..6e6e75b12bd 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -9,6 +9,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = \ + asyncdesc.o \ brindesc.o \ clogdesc.o \ committsdesc.o \ diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c new file mode 100644 index 00000000000..7f322849ff1 --- /dev/null +++ b/src/backend/access/rmgrdesc/asyncdesc.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * asyncdesc.c + * rmgr descriptor routines for access/async.c + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/asyncdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/async_xlog.h" + +void +async_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_ASYNC_NOTIFY_DATA) + { + xl_async_notify_data *xlrec = (xl_async_notify_data *) rec; + + appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u", + xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications); + } +} + +const char * +async_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_ASYNC_NOTIFY_DATA: + id = "NOTIFY_DATA"; + break; + } + + return id; +} \ No newline at end of file diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build index 96c98e800c2..38bef2e87f6 100644 --- a/src/backend/access/rmgrdesc/meson.build +++ b/src/backend/access/rmgrdesc/meson.build @@ -2,6 +2,7 @@ # used by frontend programs like pg_waldump rmgr_desc_sources = files( + 'asyncdesc.c', 'brindesc.c', 'clogdesc.c', 'committsdesc.c', diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index f0f696855b9..4f32f7fc591 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars data += sizeof(xl_xact_origin); } + + if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY) + { + xl_xact_notify xl_notify; + + /* no alignment is guaranteed, so copy onto stack */ + memcpy(&xl_notify, data, sizeof(xl_notify)); + + parsed->notify_lsn = xl_notify.notify_lsn; + + data += sizeof(xl_xact_notify); + } + } void diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 1b7499726eb..f8c25e6597a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -19,6 +19,7 @@ /* includes needed for "access/rmgrlist.h" */ /* IWYU pragma: begin_keep */ +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2cf3d4e92b7..1f18e84f583 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; + xl_xact_notify xl_notify; uint8 info; + XLogRecPtr result; Assert(CritSectionCount > 0); + /* + * Handle notification commit ordering: if this transaction has pending + * notifications, we must write the queue entry just before the commit + * record while holding NotifyQueueLock to ensure proper ordering. + */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + TransactionId xid = GetCurrentTransactionId(); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn); + } + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc commit */ @@ -5926,6 +5940,13 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + /* include notification information if present */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY; + xl_notify.notify_lsn = MyProc->notifyCommitLsn; + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -5982,10 +6003,25 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData(&xl_origin, sizeof(xl_xact_origin)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY) + XLogRegisterData(&xl_notify, sizeof(xl_xact_notify)); + /* we allow filtering by xacts */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_XACT_ID, info); + /* Insert the commit record */ + result = XLogInsert(RM_XACT_ID, info); + + /* + * Release NotifyQueueLock if we held it. The queue entry is now + * associated with a committed transaction, so readers can process it. + */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + LWLockRelease(NotifyQueueLock); + } + + return result; } /* @@ -6227,6 +6263,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, false /* backward */ , false /* WAL */ ); } + /* Add notification queue entry if this commit has notifications */ + if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY) + { + asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn); + } + /* Make sure files supposed to be dropped are dropped */ if (parsed->nrels > 0) { diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index eac1de75ed0..c880df3df85 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -54,6 +54,7 @@ #include "access/subtrans.h" #include "access/timeline.h" #include "access/transam.h" +#include "commands/async.h" #include "access/twophase.h" #include "access/xact.h" #include "access/xlog_internal.h" @@ -3876,7 +3877,7 @@ RemoveTempXlogFiles(void) */ static void RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr, - TimeLineID insertTLI) + TimeLineID insertTLI) { DIR *xldir; struct dirent *xlde; @@ -3898,6 +3899,48 @@ RemoveOldXlogFiles(XLogSegNo segno, XLogRecPtr lastredoptr, XLogRecPtr endptr, elog(DEBUG2, "attempting to remove WAL segments older than log file %s", lastoff); + /* + * Pin WAL needed by NOTIFY delivery: adjust segno so that we do not + * remove segments older than the one containing the oldest NOTIFY entry + * still present in the queue. This prevents recycling WAL that listeners + * may still need to read NOTIFY payloads from. + */ + { + XLogRecPtr notify_oldest; + if (AsyncNotifyOldestRequiredLSN(¬ify_oldest)) + { + XLogSegNo notifySegNo; + /* Segment containing the oldest required LSN */ + XLByteToSeg(notify_oldest, notifySegNo, wal_segment_size); + if (Trace_notify) + elog(DEBUG1, "async notify: checking WAL pin; oldest notify LSN %X/%X (seg %lu)", + LSN_FORMAT_ARGS(notify_oldest), (unsigned long) notifySegNo); + /* + * Last removable must be strictly before notifySegNo. If + * notifySegNo == 0, there is no valid "previous" segment, so do + * not reduce segno at all in that case. + */ + if (notifySegNo > 0) + { + XLogSegNo cutoff = notifySegNo - 1; + if (cutoff < segno) + { + segno = cutoff; + if (Trace_notify) + { + char lastoff[MAXFNAMELEN]; + XLogFileName(lastoff, 0, segno, wal_segment_size); + elog(DEBUG1, "async notify: WAL recycle cutoff adjusted to segno %lu (lastoff %s)", + (unsigned long) segno, lastoff); + } + } + } + } + } + + /* Recompute cutoff filename after any segno adjustment above */ + XLogFileName(lastoff, 0, segno, wal_segment_size); + xldir = AllocateDir(XLOGDIR); while ((xlde = ReadDir(xldir, XLOGDIR)) != NULL) diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..a649fc9d1cc 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -133,6 +133,14 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "access/xlogrecovery.h" +#include "access/xlog_internal.h" +#include "access/xlogdefs.h" +#include "access/xact.h" #include "catalog/pg_database.h" #include "commands/async.h" #include "common/hashfn.h" @@ -143,6 +151,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/procsignal.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/guc_hooks.h" @@ -150,6 +159,31 @@ #include "utils/ps_status.h" #include "utils/snapmgr.h" #include "utils/timestamp.h" +#include "storage/fd.h" +#include + +/* Missing definitions for WAL-based notification system */ +#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE +#define SLRU_PAGE_SIZE BLCKSZ +#define AsyncCtl NotifyCtl + +/* WAL record types */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 + +/* + * WAL record for notification data (written in PreCommit_Notify) + */ +typedef struct xl_async_notify_data +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + int32 srcPid; /* source backend PID */ + uint32 nnotifications; /* number of notifications */ + /* followed by serialized notification data */ +} xl_async_notify_data; + +#define SizeOfAsyncNotifyData (offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32)) + /* @@ -163,66 +197,51 @@ #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) /* - * Struct representing an entry in the global notify queue - * - * This struct declaration has the maximal length, but in a real queue entry - * the data area is only big enough for the actual channel and payload strings - * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible - * entry size, if both channel and payload strings are empty (but note it - * doesn't include alignment padding). - * - * The "length" field should always be rounded up to the next QUEUEALIGN - * multiple so that all fields are properly aligned. + * AsyncQueueEntry is defined in commands/async.h as a compact metadata-only + * structure; notification content is stored in WAL. */ -typedef struct AsyncQueueEntry -{ - int length; /* total allocated length of entry */ - Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ - int32 srcPid; /* sender's PID */ - char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; -} AsyncQueueEntry; - -/* Currently, no field of AsyncQueueEntry requires more than int alignment */ + +/* Queue alignment is still needed for SLRU page management */ #define QUEUEALIGN(len) INTALIGN(len) -#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) +/* + * QueuePosition is a scalar entry index. Derive page and byte offset from the + * index using the fixed AsyncQueueEntry size. + */ +typedef int64 QueuePosition; + +#define ASYNC_ENTRY_SIZE ((int) sizeof(AsyncQueueEntry)) +#define ASYNC_ENTRIES_PER_PAGE (BLCKSZ / ASYNC_ENTRY_SIZE) /* - * Struct describing a queue position, and assorted macros for working with it + * One SLRU page must contain an integral number of compact entries. That is + * required for the index→page/offset mapping below (division/modulo by the + * per-page entry count) and for unambiguous page-boundary detection. + * + * AsyncQueueEntry is currently 16 bytes (Oid 4 + TransactionId 4 + XLogRecPtr + * 8) with natural alignment and no padding. BLCKSZ (QUEUE_PAGESIZE) is always + * a multiple of 1024, so this assertion holds for standard builds. If the + * entry layout changes in the future, this compile-time check ensures we fail + * early rather than producing incorrect indexing math at runtime. */ -typedef struct QueuePosition -{ - int64 page; /* SLRU page number */ - int offset; /* byte offset within page */ -} QueuePosition; +StaticAssertDecl(BLCKSZ % sizeof(AsyncQueueEntry) == 0, + "AsyncQueueEntry size must divide QUEUE_PAGESIZE"); -#define QUEUE_POS_PAGE(x) ((x).page) -#define QUEUE_POS_OFFSET(x) ((x).offset) +#define QUEUE_POS_PAGE(x) ((x) / ASYNC_ENTRIES_PER_PAGE) +#define QUEUE_POS_OFFSET(x) ((int)(((x) % ASYNC_ENTRIES_PER_PAGE) * ASYNC_ENTRY_SIZE)) #define SET_QUEUE_POS(x,y,z) \ do { \ - (x).page = (y); \ - (x).offset = (z); \ + (x) = ((int64) (y)) * ASYNC_ENTRIES_PER_PAGE + ((z) / ASYNC_ENTRY_SIZE); \ } while (0) -#define QUEUE_POS_EQUAL(x,y) \ - ((x).page == (y).page && (x).offset == (y).offset) +#define QUEUE_POS_EQUAL(x,y) ((x) == (y)) -#define QUEUE_POS_IS_ZERO(x) \ - ((x).page == 0 && (x).offset == 0) +#define QUEUE_POS_IS_ZERO(x) ((x) == 0) -/* choose logically smaller QueuePosition */ -#define QUEUE_POS_MIN(x,y) \ - (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ - (x).page != (y).page ? (y) : \ - (x).offset < (y).offset ? (x) : (y)) - -/* choose logically larger QueuePosition */ -#define QUEUE_POS_MAX(x,y) \ - (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ - (x).page != (y).page ? (x) : \ - (x).offset > (y).offset ? (x) : (y)) +/* choose logically smaller/larger positions */ +#define QUEUE_POS_MIN(x,y) ((x) <= (y) ? (x) : (y)) +#define QUEUE_POS_MAX(x,y) ((x) >= (y) ? (x) : (y)) /* * Parameter determining how often we try to advance the tail pointer: @@ -285,6 +304,7 @@ typedef struct AsyncQueueControl * listening backend */ int64 stopPage; /* oldest unrecycled page; must be <= * tail.page */ + int64 reservedEntries; /* number of entries reserved pre-commit */ ProcNumber firstListener; /* id of first listener, or * INVALID_PROC_NUMBER */ TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ @@ -420,6 +440,8 @@ static bool amRegisteredListener = false; /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ static bool tryAdvanceTail = false; +/* true if this backend reserved one compact entry pre-commit */ +static bool notifyEntryReserved = false; /* GUC parameters */ bool Trace_notify = false; @@ -438,18 +460,13 @@ static void Exec_UnlistenCommit(const char *channel); static void Exec_UnlistenAllCommit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); -static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); -static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); -static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); -static void asyncQueueFillWarning(void); static void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, - Snapshot snapshot); + char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@ -457,6 +474,141 @@ static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void processNotificationFromWAL(XLogRecPtr notify_lsn); +/* prototype provided in commands/async.h */ + +/* + * Per-page committed minimum notify LSNs. Indexed by page_no % max_notify_queue_pages + * and tagged with the exact page_no to avoid modulo aliasing. + */ +typedef struct NotifyPageMinEntry +{ + int64 page_no; /* absolute queue page number or -1 if invalid */ + XLogRecPtr min_lsn; /* minimum notify_data_lsn for committed entries on page */ +} NotifyPageMinEntry; + +static NotifyPageMinEntry *NotifyPageMins = NULL; /* shmem array of length max_notify_queue_pages */ + +/* + * Uncommitted NOTIFY tracker. One node per top-level xact that has emitted + * NOTIFY WAL but not yet committed. This is kept small; use a fixed-size + * array with at most MaxBackends entries. Protected by NotifyQueueLock. + */ +typedef struct UncommittedNotifyEntry +{ + FullTransactionId fxid; + XLogRecPtr write_lsn; + bool in_use; +} UncommittedNotifyEntry; + +static UncommittedNotifyEntry *UncommittedNotifies = NULL; /* shmem array [MaxBackends] */ + +/* Helpers to update per-page mins; caller must hold NotifyQueueLock. */ +static inline void +NotifyPageMinUpdateForPage(int64 page_no, XLogRecPtr lsn) +{ + int idx; + NotifyPageMinEntry *e; + + if (NotifyPageMins == NULL || page_no < 0) + return; + + idx = (int) (page_no % max_notify_queue_pages); + e = &NotifyPageMins[idx]; + if (e->page_no != page_no) + { + e->page_no = page_no; + e->min_lsn = lsn; + } + else + { + if (XLogRecPtrIsInvalid(e->min_lsn) || (lsn < e->min_lsn)) + e->min_lsn = lsn; + } +} + +/* Invalidate [from_page, to_page) entries; caller must hold NotifyQueueLock. */ +static inline void +NotifyPageMinInvalidateRange(int64 from_page, int64 to_page) +{ + int64 p; + int idx; + + if (NotifyPageMins == NULL) + return; + for (p = from_page; p < to_page; p++) + { + idx = (int) (p % max_notify_queue_pages); + if (NotifyPageMins[idx].page_no == p) + { + NotifyPageMins[idx].page_no = -1; + NotifyPageMins[idx].min_lsn = InvalidXLogRecPtr; + } + } +} + +/* Uncommitted list maintenance; protected by NotifyQueueLock. */ +/* Internal helper: caller must hold NotifyQueueLock EXCLUSIVE */ +/* + * Add or update the uncommitted NOTIFY pin for a top-level transaction. + * Caller must hold NotifyQueueLock EXCLUSIVE. + */ +static void +UncommittedNotifyAdd(FullTransactionId fxid, XLogRecPtr lsn) +{ + int free_slot = -1; + int i; + + if (UncommittedNotifies == NULL || !FullTransactionIdIsValid(fxid)) + return; + + /* If already present (shouldn't happen), update; else insert in a free slot. */ + for (i = 0; i < MaxBackends; i++) + { + if (UncommittedNotifies[i].in_use) + { + if (FullTransactionIdEquals(UncommittedNotifies[i].fxid, fxid)) + { + /* Keep the minimum write_lsn per top-level xact */ + if (XLogRecPtrIsInvalid(UncommittedNotifies[i].write_lsn) || + (lsn < UncommittedNotifies[i].write_lsn)) + UncommittedNotifies[i].write_lsn = lsn; + return; + } + } + else if (free_slot < 0) + free_slot = i; + } + if (free_slot >= 0) + { + UncommittedNotifies[free_slot].fxid = fxid; + UncommittedNotifies[free_slot].write_lsn = lsn; + UncommittedNotifies[free_slot].in_use = true; + } + else + { + /* Extremely unlikely: fallback to no-op rather than ERROR. */ + } +} + +/* wrapper removed: all call sites hold NotifyQueueLock already */ + +static void +UncommittedNotifyRemoveByFullXid(FullTransactionId fxid) +{ + if (UncommittedNotifies == NULL || !FullTransactionIdIsValid(fxid)) + return; + for (int i = 0; i < MaxBackends; i++) + { + if (UncommittedNotifies[i].in_use && FullTransactionIdEquals(UncommittedNotifies[i].fxid, fxid)) + { + UncommittedNotifies[i].in_use = false; + UncommittedNotifies[i].fxid = InvalidFullTransactionId; + UncommittedNotifies[i].write_lsn = InvalidXLogRecPtr; + break; + } + } +} /* * Compute the difference between two queue page numbers. @@ -492,6 +644,11 @@ AsyncShmemSize(void) size = add_size(size, SimpleLruShmemSize(notify_buffers, 0)); + /* Per-page committed mins */ + size = add_size(size, mul_size(max_notify_queue_pages, sizeof(NotifyPageMinEntry))); + /* Uncommitted list (MaxBackends entries) */ + size = add_size(size, mul_size(MaxBackends, sizeof(UncommittedNotifyEntry))); + return size; } @@ -519,6 +676,7 @@ AsyncShmemInit(void) SET_QUEUE_POS(QUEUE_HEAD, 0, 0); SET_QUEUE_POS(QUEUE_TAIL, 0, 0); QUEUE_STOP_PAGE = 0; + asyncQueueControl->reservedEntries = 0; QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; asyncQueueControl->lastQueueFillWarn = 0; for (int i = 0; i < MaxBackends; i++) @@ -546,6 +704,41 @@ AsyncShmemInit(void) */ (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL); } + + /* Allocate/attach per-page committed mins array */ + { + bool found2 = false; + NotifyPageMins = (NotifyPageMinEntry *) + ShmemInitStruct("Notify Per-Page Min Array", + sizeof(NotifyPageMinEntry) * (Size) max_notify_queue_pages, + &found2); + if (!found2) + { + for (int i = 0; i < max_notify_queue_pages; i++) + { + NotifyPageMins[i].page_no = -1; + NotifyPageMins[i].min_lsn = InvalidXLogRecPtr; + } + } + } + + /* Allocate/attach uncommitted list */ + { + bool found3 = false; + UncommittedNotifies = (UncommittedNotifyEntry *) + ShmemInitStruct("Notify Uncommitted List", + sizeof(UncommittedNotifyEntry) * (Size) MaxBackends, + &found3); + if (!found3) + { + for (int i = 0; i < MaxBackends; i++) + { + UncommittedNotifies[i].fxid = InvalidFullTransactionId; + UncommittedNotifies[i].write_lsn = InvalidXLogRecPtr; + UncommittedNotifies[i].in_use = false; + } + } + } } @@ -890,65 +1083,137 @@ PreCommit_Notify(void) } } - /* Queue any pending notifies (must happen after the above) */ + /* Write notification data to WAL if we have any */ if (pendingNotifies) { - ListCell *nextNotify; + TransactionId currentXid; + ListCell *l; + size_t total_size = 0; + uint32 nnotifications = 0; + char *notifications_data; + char *ptr; + XLogRecPtr notify_lsn; /* * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not - * so cheap if we don't, and we'd prefer not to do that work while - * holding NotifyQueueLock. + * so cheap if we don't. */ - (void) GetCurrentTransactionId(); + currentXid = GetCurrentTransactionId(); /* - * Serialize writers by acquiring a special lock that we hold till - * after commit. This ensures that queue entries appear in commit - * order, and in particular that there are never uncommitted queue - * entries ahead of committed ones, so an uncommitted transaction - * can't block delivery of deliverable notifications. - * - * We use a heavyweight lock so that it'll automatically be released - * after either commit or abort. This also allows deadlocks to be - * detected, though really a deadlock shouldn't be possible here. - * - * The lock is on "database 0", which is pretty ugly but it doesn't - * seem worth inventing a special locktag category just for this. - * (Historical note: before PG 9.0, a similar lock on "database 0" was - * used by the flatfiles mechanism.) + * Step 1: Reserve space in the in-memory queue for the compact entry. */ - LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); - - /* Now push the notifications into the queue */ - nextNotify = list_head(pendingNotifies->events); - while (nextNotify != NULL) + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); { - /* - * Add the pending notifications to the queue. We acquire and - * release NotifyQueueLock once per page, which might be overkill - * but it does allow readers to get in while we're doing this. - * - * A full queue is very uncommon and should really not happen, - * given that we have so much space available in the SLRU pages. - * Nevertheless we need to deal with this possibility. Note that - * when we get here we are in the process of committing our - * transaction, but we have not yet committed to clog, so at this - * point in time we can still roll the transaction back. - */ - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - asyncQueueFillWarning(); - if (asyncQueueIsFull()) + QueuePosition reserved_head = QUEUE_HEAD + asyncQueueControl->reservedEntries; + int64 headPage = QUEUE_POS_PAGE(reserved_head); + int headSlot = (int) (reserved_head % ASYNC_ENTRIES_PER_PAGE); + int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); + /* Also cap total entries irrespective of page math */ + int64 max_total_entries = (int64) max_notify_queue_pages * ASYNC_ENTRIES_PER_PAGE; + int64 current_total_entries = (reserved_head - QUEUE_TAIL) + 1; /* +1 for our reservation */ + LWLock *nextbank; + + /* Check page-window fullness first */ + if (asyncQueuePageDiff(headPage, tailPage) >= max_notify_queue_pages) + { + LWLockRelease(NotifyQueueLock); ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("too many notifications in the NOTIFY queue"))); - nextNotify = asyncQueueAddEntries(nextNotify); - LWLockRelease(NotifyQueueLock); + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not queue notification before commit"), + errdetail("asynchronous notification queue is full"))); + } + + /* If at last slot, ensure advancing to next page is allowed */ + if (headSlot == ASYNC_ENTRIES_PER_PAGE - 1) + { + if (asyncQueuePageDiff(headPage + 1, tailPage) >= max_notify_queue_pages) + { + LWLockRelease(NotifyQueueLock); + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not queue notification before commit"), + errdetail("asynchronous notification queue is full"))); + } + + /* Pre-initialize the next page so commit path doesn't fault it in */ + nextbank = SimpleLruGetBankLock(NotifyCtl, headPage + 1); + LWLockAcquire(nextbank, LW_EXCLUSIVE); + (void) SimpleLruZeroPage(NotifyCtl, headPage + 1); + LWLockRelease(nextbank); + } + + /* Also enforce a strict entry-count limit */ + if (current_total_entries > max_total_entries) + { + LWLockRelease(NotifyQueueLock); + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not queue notification before commit"), + errdetail("asynchronous notification queue is full"))); + } + + /* Reserve one entry */ + asyncQueueControl->reservedEntries++; + notifyEntryReserved = true; + } + LWLockRelease(NotifyQueueLock); + + /* + * Step 2: Write notification data to WAL. + */ + /* First pass: calculate total size needed for serialization */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + + /* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */ + total_size += 4 + n->channel_len + 1 + n->payload_len + 1; + nnotifications++; } - /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ + /* Allocate buffer for notification data */ + notifications_data = palloc(total_size); + ptr = notifications_data; + + /* Second pass: serialize all notifications */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + char *channel = n->data; + char *payload = n->data + n->channel_len + 1; + + /* Write channel length, payload length, channel, and payload */ + memcpy(ptr, &n->channel_len, 2); + ptr += 2; + memcpy(ptr, &n->payload_len, 2); + ptr += 2; + memcpy(ptr, channel, n->channel_len + 1); + ptr += n->channel_len + 1; + memcpy(ptr, payload, n->payload_len + 1); + ptr += n->payload_len + 1; + } + + /* + * To avoid a race with WAL recycling, hold NotifyQueueLock EXCLUSIVE + * across the WAL insert and publication of the uncommitted pin. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Write notification data to WAL */ + notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid, + nnotifications, total_size, + notifications_data); + + pfree(notifications_data); + + MyProc->notifyCommitLsn = notify_lsn; + UncommittedNotifyAdd(GetTopFullTransactionId(), notify_lsn); + + LWLockRelease(NotifyQueueLock); + + /* Notification payloads are now read directly from WAL at delivery time. */ } } @@ -1006,13 +1271,20 @@ AtCommit_Notify(void) asyncQueueUnregister(); /* - * Send signals to listening backends. We need do this only if there are - * pending notifies, which were previously added to the shared queue by - * PreCommit_Notify(). + * If we had notifications, they were already written to the queue in + * PreCommit_Notify. After commit, signal listening backends to check the + * queue. The transaction visibility logic will see our + * XID as committed and process the notifications. */ - if (pendingNotifies != NULL) + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + /* Signal listening backends to check the queue */ SignalBackends(); + /* Clear the flag after signaling */ + MyProc->notifyCommitLsn = InvalidXLogRecPtr; + } + /* * If it's time to try to advance the global tail pointer, do that. * @@ -1030,6 +1302,9 @@ AtCommit_Notify(void) /* And clean up */ ClearPendingActionsAndNotifies(); + + /* Reset local reservation flag if set (reservation consumed at commit). */ + notifyEntryReserved = false; } /* @@ -1263,21 +1538,6 @@ asyncQueueUnregister(void) amRegisteredListener = false; } -/* - * Test whether there is room to insert more notification messages. - * - * Caller must hold at least shared NotifyQueueLock. - */ -static bool -asyncQueueIsFull(void) -{ - int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD); - int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); - int64 occupied = headPage - tailPage; - - return occupied >= max_notify_queue_pages; -} - /* * Advance the QueuePosition to the next entry, assuming that the current * entry is of length entryLength. If we jump to a new page the function @@ -1286,193 +1546,17 @@ asyncQueueIsFull(void) static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength) { - int64 pageno = QUEUE_POS_PAGE(*position); - int offset = QUEUE_POS_OFFSET(*position); - bool pageJump = false; - - /* - * Move to the next writing position: First jump over what we have just - * written or read. - */ - offset += entryLength; - Assert(offset <= QUEUE_PAGESIZE); - - /* - * In a second step check if another entry can possibly be written to the - * page. If so, stay here, we have reached the next position. If not, then - * we need to move on to the next page. - */ - if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) - { - pageno++; - offset = 0; - pageJump = true; - } - - SET_QUEUE_POS(*position, pageno, offset); + int64 idx; + bool pageJump; + + /* With fixed-size entries, advancing is just +1 entry. */ + Assert(entryLength == (int) sizeof(AsyncQueueEntry)); + idx = *position; + pageJump = ((idx % ASYNC_ENTRIES_PER_PAGE) == (ASYNC_ENTRIES_PER_PAGE - 1)); + *position = idx + 1; return pageJump; } -/* - * Fill the AsyncQueueEntry at *qe with an outbound notification message. - */ -static void -asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) -{ - size_t channellen = n->channel_len; - size_t payloadlen = n->payload_len; - int entryLength; - - Assert(channellen < NAMEDATALEN); - Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); - - /* The terminators are already included in AsyncQueueEntryEmptySize */ - entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; - entryLength = QUEUEALIGN(entryLength); - qe->length = entryLength; - qe->dboid = MyDatabaseId; - qe->xid = GetCurrentTransactionId(); - qe->srcPid = MyProcPid; - memcpy(qe->data, n->data, channellen + payloadlen + 2); -} - -/* - * Add pending notifications to the queue. - * - * We go page by page here, i.e. we stop once we have to go to a new page but - * we will be called again and then fill that next page. If an entry does not - * fit into the current page, we write a dummy entry with an InvalidOid as the - * database OID in order to fill the page. So every page is always used up to - * the last byte which simplifies reading the page later. - * - * We are passed the list cell (in pendingNotifies->events) containing the next - * notification to write and return the first still-unwritten cell back. - * Eventually we will return NULL indicating all is done. - * - * We are holding NotifyQueueLock already from the caller and grab - * page specific SLRU bank lock locally in this function. - */ -static ListCell * -asyncQueueAddEntries(ListCell *nextNotify) -{ - AsyncQueueEntry qe; - QueuePosition queue_head; - int64 pageno; - int offset; - int slotno; - LWLock *prevlock; - - /* - * We work with a local copy of QUEUE_HEAD, which we write back to shared - * memory upon exiting. The reason for this is that if we have to advance - * to a new page, SimpleLruZeroPage might fail (out of disk space, for - * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, - * subsequent insertions would try to put entries into a page that slru.c - * thinks doesn't exist yet.) So, use a local position variable. Note - * that if we do fail, any already-inserted queue entries are forgotten; - * this is okay, since they'd be useless anyway after our transaction - * rolls back. - */ - queue_head = QUEUE_HEAD; - - /* - * If this is the first write since the postmaster started, we need to - * initialize the first page of the async SLRU. Otherwise, the current - * page should be initialized already, so just fetch it. - */ - pageno = QUEUE_POS_PAGE(queue_head); - prevlock = SimpleLruGetBankLock(NotifyCtl, pageno); - - /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ - LWLockAcquire(prevlock, LW_EXCLUSIVE); - - if (QUEUE_POS_IS_ZERO(queue_head)) - slotno = SimpleLruZeroPage(NotifyCtl, pageno); - else - slotno = SimpleLruReadPage(NotifyCtl, pageno, true, - InvalidTransactionId); - - /* Note we mark the page dirty before writing in it */ - NotifyCtl->shared->page_dirty[slotno] = true; - - while (nextNotify != NULL) - { - Notification *n = (Notification *) lfirst(nextNotify); - - /* Construct a valid queue entry in local variable qe */ - asyncQueueNotificationToEntry(n, &qe); - - offset = QUEUE_POS_OFFSET(queue_head); - - /* Check whether the entry really fits on the current page */ - if (offset + qe.length <= QUEUE_PAGESIZE) - { - /* OK, so advance nextNotify past this item */ - nextNotify = lnext(pendingNotifies->events, nextNotify); - } - else - { - /* - * Write a dummy entry to fill up the page. Actually readers will - * only check dboid and since it won't match any reader's database - * OID, they will ignore this entry and move on. - */ - qe.length = QUEUE_PAGESIZE - offset; - qe.dboid = InvalidOid; - qe.data[0] = '\0'; /* empty channel */ - qe.data[1] = '\0'; /* empty payload */ - } - - /* Now copy qe into the shared buffer page */ - memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, - &qe, - qe.length); - - /* Advance queue_head appropriately, and detect if page is full */ - if (asyncQueueAdvance(&(queue_head), qe.length)) - { - LWLock *lock; - - pageno = QUEUE_POS_PAGE(queue_head); - lock = SimpleLruGetBankLock(NotifyCtl, pageno); - if (lock != prevlock) - { - LWLockRelease(prevlock); - LWLockAcquire(lock, LW_EXCLUSIVE); - prevlock = lock; - } - - /* - * Page is full, so we're done here, but first fill the next page - * with zeroes. The reason to do this is to ensure that slru.c's - * idea of the head page is always the same as ours, which avoids - * boundary problems in SimpleLruTruncate. The test in - * asyncQueueIsFull() ensured that there is room to create this - * page without overrunning the queue. - */ - slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); - - /* - * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, - * set flag to remember that we should try to advance the tail - * pointer (we don't want to actually do that right here). - */ - if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) - tryAdvanceTail = true; - - /* And exit the loop */ - break; - } - } - - /* Success, so update the global QUEUE_HEAD */ - QUEUE_HEAD = queue_head; - - LWLockRelease(prevlock); - - return nextNotify; -} - /* * SQL function to return the fraction of the notification queue currently * occupied. @@ -1515,52 +1599,6 @@ asyncQueueUsage(void) return (double) occupied / (double) max_notify_queue_pages; } -/* - * Check whether the queue is at least half full, and emit a warning if so. - * - * This is unlikely given the size of the queue, but possible. - * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. - * - * Caller must hold exclusive NotifyQueueLock. - */ -static void -asyncQueueFillWarning(void) -{ - double fillDegree; - TimestampTz t; - - fillDegree = asyncQueueUsage(); - if (fillDegree < 0.5) - return; - - t = GetCurrentTimestamp(); - - if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, - t, QUEUE_FULL_WARN_INTERVAL)) - { - QueuePosition min = QUEUE_HEAD; - int32 minPid = InvalidPid; - - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) - { - Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); - if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) - minPid = QUEUE_BACKEND_PID(i); - } - - ereport(WARNING, - (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100), - (minPid != InvalidPid ? - errdetail("The server process with PID %d is among those with the oldest transactions.", minPid) - : 0), - (minPid != InvalidPid ? - errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.") - : 0))); - - asyncQueueControl->lastQueueFillWarn = t; - } -} /* * Send signals to listening backends. @@ -1678,6 +1716,16 @@ AtAbort_Notify(void) if (amRegisteredListener && listenChannels == NIL) asyncQueueUnregister(); + /* Release any reserved queue entry */ + if (notifyEntryReserved) + { + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + if (asyncQueueControl->reservedEntries > 0) + asyncQueueControl->reservedEntries--; + LWLockRelease(NotifyQueueLock); + notifyEntryReserved = false; + } + /* And clean up */ ClearPendingActionsAndNotifies(); } @@ -1844,15 +1892,13 @@ ProcessNotifyInterrupt(bool flush) /* * Read all pending notifications from the queue, and deliver appropriate - * ones to my frontend. Stop when we reach queue head or an uncommitted - * notification. + * ones to my frontend. Stop when we reach queue head. */ static void asyncQueueReadAllNotifications(void) { volatile QueuePosition pos; QueuePosition head; - Snapshot snapshot; /* page_buffer must be adequately aligned, so use a union */ union @@ -1875,46 +1921,6 @@ asyncQueueReadAllNotifications(void) return; } - /*---------- - * Get snapshot we'll use to decide which xacts are still in progress. - * This is trickier than it might seem, because of race conditions. - * Consider the following example: - * - * Backend 1: Backend 2: - * - * transaction starts - * UPDATE foo SET ...; - * NOTIFY foo; - * commit starts - * queue the notify message - * transaction starts - * LISTEN foo; -- first LISTEN in session - * SELECT * FROM foo WHERE ...; - * commit to clog - * commit starts - * add backend 2 to array of listeners - * advance to queue head (this code) - * commit to clog - * - * Transaction 2's SELECT has not seen the UPDATE's effects, since that - * wasn't committed yet. Ideally we'd ensure that client 2 would - * eventually get transaction 1's notify message, but there's no way - * to do that; until we're in the listener array, there's no guarantee - * that the notify message doesn't get removed from the queue. - * - * Therefore the coding technique transaction 2 is using is unsafe: - * applications must commit a LISTEN before inspecting database state, - * if they want to ensure they will see notifications about subsequent - * changes to that state. - * - * What we do guarantee is that we'll see all notifications from - * transactions committing after the snapshot we take here. - * Exec_ListenPreCommit has already added us to the listener array, - * so no not-yet-committed messages can be removed from the queue - * before we see them. - *---------- - */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); /* * It is possible that we fail while trying to send a message to our @@ -1979,8 +1985,7 @@ asyncQueueReadAllNotifications(void) * while sending the notifications to the frontend. */ reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); + page_buffer.buf); } while (!reachedStop); } PG_FINALLY(); @@ -1992,8 +1997,6 @@ asyncQueueReadAllNotifications(void) } PG_END_TRY(); - /* Done with snapshot */ - UnregisterSnapshot(snapshot); } /* @@ -2004,19 +2007,17 @@ asyncQueueReadAllNotifications(void) * memory. (We could access the page right in shared memory, but that * would imply holding the SLRU bank lock throughout this routine.) * - * We stop if we reach the "stop" position, or reach a notification from an - * uncommitted transaction, or reach the end of the page. + * We stop if we reach the "stop" position or reach the end of the page. * - * The function returns true once we have reached the stop position or an - * uncommitted notification, and false if we have finished with the page. + * The function returns true once we have reached the stop position, and false + * if we have finished with the page. * In other words: once it returns true there is no need to look further. * The QueuePosition *current is advanced past all processed messages. */ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, - Snapshot snapshot) + char *page_buffer) { bool reachedStop = false; bool reachedEndOfPage; @@ -2031,70 +2032,374 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); - /* - * Advance *current over this message, possibly to the next page. As - * noted in the comments for asyncQueueReadAllNotifications, we must - * do this before possibly failing while processing the message. - */ - reachedEndOfPage = asyncQueueAdvance(current, qe->length); + /* Advance *current by one fixed-size compact entry. */ + reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry)); /* Ignore messages destined for other databases */ if (qe->dboid == MyDatabaseId) { - if (XidInMVCCSnapshot(qe->xid, snapshot)) + /* + * Since queue entries are written atomically with commit records + * while holding NotifyQueueLock exclusively, all entries in the queue + * are guaranteed to be from committed transactions. + * + * Step 5: Read notification data using stored LSN from WAL. + * The compact entry only contains metadata. + */ + processNotificationFromWAL(qe->notify_lsn); + } + + /* Loop back if we're not at end of page */ + } while (!reachedEndOfPage); + + if (QUEUE_POS_EQUAL(*current, stop)) + reachedStop = true; + + return reachedStop; +} + +/* + * processNotificationFromWAL + * + * Fetch notification data from WAL using the stored LSN and process + * the individual notifications for delivery to listening frontend. + * This implements Step 5 of the new WAL-based notification system. + */ +static void +processNotificationFromWAL(XLogRecPtr notify_lsn) +{ + XLogReaderState *xlogreader; + DecodedXLogRecord *record; + xl_async_notify_data *xlrec; + char *data; + char *ptr; + uint32_t remaining; + int srcPid; + char *errormsg; + Oid dboid; + uint32 nnotifications; + + /* + * Create XLog reader to fetch the notification data record. + * We use a temporary reader since this is called during normal + * notification processing, not during recovery. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + elog(ERROR, "failed to allocate XLog reader for notification data"); + + /* Start reading exactly at the NOTIFY_DATA record begin LSN */ + XLogBeginRead(xlogreader, notify_lsn); + + /* Read the NOTIFY_DATA record */ + record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + { + XLogReaderFree(xlogreader); + elog(ERROR, "failed to read notification data from WAL at %X/%X: %s", + LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message"); + } + + /* Verify this is the expected record type */ + if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA) + elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u", + LSN_FORMAT_ARGS(notify_lsn), + XLogRecGetRmid(xlogreader), + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK)); + + /* Extract the notification data from the WAL record */ + xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader); + srcPid = xlrec->srcPid; + dboid = xlrec->dbid; + data = (char *) xlrec + SizeOfAsyncNotifyData; + ptr = data; + remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData; + nnotifications = xlrec->nnotifications; + + /* + * Process each notification in the serialized data. + * The format is: 2-byte channel_len, 2-byte payload_len, + * null-terminated channel, null-terminated payload. + */ + for (uint32_t i = 0; i < nnotifications && remaining >= 4; i++) + { + uint16 channel_len; + uint16 payload_len; + char *channel; + char *payload; + + /* Read lengths */ + memcpy(&channel_len, ptr, 2); + ptr += 2; + memcpy(&payload_len, ptr, 2); + ptr += 2; + remaining -= 4; + + /* Verify we have enough data */ + if (remaining < channel_len + 1 + payload_len + 1) + break; + + /* Extract channel and payload strings */ + channel = ptr; + ptr += channel_len + 1; + payload = ptr; + ptr += payload_len + 1; + remaining -= (channel_len + 1 + payload_len + 1); + + /* Deliver notification if we're listening on this channel */ + if (dboid == MyDatabaseId && IsListeningOn(channel)) + NotifyMyFrontEnd(channel, payload, srcPid); + } + + /* Clean up */ + XLogReaderFree(xlogreader); +} + + +/* + * AsyncNotifyOldestRequiredLSN + * + * Compute the oldest WAL LSN required to satisfy NOTIFY delivery for any + * still-present queue entry. Returns true and sets *oldest_lsn when the + * queue is non-empty (QUEUE_TAIL != QUEUE_HEAD). Otherwise returns false. + * + * We look at the queue entry at QUEUE_TAIL; since that is the oldest entry + * still needed by some listener, its notify_lsn is the minimum WAL position + * that must be retained. + */ +bool +AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn) +{ + XLogRecPtr committed_min = InvalidXLogRecPtr; + XLogRecPtr uncommitted_min = InvalidXLogRecPtr; + bool have_any = false; + int i; + TransactionId xid; + + /* + * Hold a single EXCLUSIVE lock while reading both structures to avoid + * races with the commit path that updates page mins and removes + * uncommitted entries atomically under the same lock. + */ + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + + /* Committed side: scan per-page mins */ + if (NotifyPageMins != NULL) + { + for (i = 0; i < max_notify_queue_pages; i++) + { + if (NotifyPageMins[i].page_no >= 0 && !XLogRecPtrIsInvalid(NotifyPageMins[i].min_lsn)) { - /* - * The source transaction is still in progress, so we can't - * process this message yet. Break out of the loop, but first - * back up *current so we will reprocess the message next - * time. (Note: it is unlikely but not impossible for - * TransactionIdDidCommit to fail, so we can't really avoid - * this advance-then-back-up behavior when dealing with an - * uncommitted message.) - * - * Note that we must test XidInMVCCSnapshot before we test - * TransactionIdDidCommit, else we might return a message from - * a transaction that is not yet visible to snapshots; compare - * the comments at the head of heapam_visibility.c. - * - * Also, while our own xact won't be listed in the snapshot, - * we need not check for TransactionIdIsCurrentTransactionId - * because our transaction cannot (yet) have queued any - * messages. - */ - *current = thisentry; - reachedStop = true; - break; + if (XLogRecPtrIsInvalid(committed_min) || (NotifyPageMins[i].min_lsn < committed_min)) + committed_min = NotifyPageMins[i].min_lsn; } - else if (TransactionIdDidCommit(qe->xid)) + } + } + + /* Uncommitted side: prune stale and compute min */ + if (UncommittedNotifies != NULL) + { + for (i = 0; i < MaxBackends; i++) + { + if (!UncommittedNotifies[i].in_use) + continue; + xid = XidFromFullTransactionId(UncommittedNotifies[i].fxid); + if (!TransactionIdIsInProgress(xid)) { - /* qe->data is the null-terminated channel name */ - char *channel = qe->data; + /* Stale entry, drop it */ + UncommittedNotifies[i].in_use = false; + UncommittedNotifies[i].fxid = InvalidFullTransactionId; + UncommittedNotifies[i].write_lsn = InvalidXLogRecPtr; + continue; + } + if (XLogRecPtrIsInvalid(uncommitted_min) || (UncommittedNotifies[i].write_lsn < uncommitted_min)) + uncommitted_min = UncommittedNotifies[i].write_lsn; + } + } - if (IsListeningOn(channel)) - { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; + /* Choose the minimum among committed and uncommitted (ignoring Invalid) */ + if (!XLogRecPtrIsInvalid(committed_min)) + { + *oldest_lsn = committed_min; + have_any = true; + } + if (!XLogRecPtrIsInvalid(uncommitted_min)) + { + if (!have_any || (uncommitted_min < *oldest_lsn)) + *oldest_lsn = uncommitted_min; + have_any = true; + } - NotifyMyFrontEnd(channel, payload, qe->srcPid); - } + LWLockRelease(NotifyQueueLock); + return have_any; +} + + +/* + * asyncQueueAddCompactEntry + * + * Add a compact entry to the notification SLRU queue containing only + * metadata (dbid, xid, notify_lsn) that points to the full notification + * data in WAL. This is much more efficient than the old approach of + * storing complete notification content in the SLRU queue. + */ +void +asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn) +{ + AsyncQueueEntry entry; + QueuePosition queue_head; + int64 pageno; + int64 entry_pageno = -1; /* page where the entry is written */ + int offset; + int slotno; + LWLock *banklock; + + /* + * Fill in the compact entry with just the metadata. + * No payload data is stored here - it's all in WAL. + */ + entry.dboid = dbid; + entry.xid = xid; + entry.notify_lsn = notify_lsn; + + /* Caller should already hold NotifyQueueLock in exclusive mode */ + queue_head = QUEUE_HEAD; + + /* Capacity was reserved in PreCommit_Notify. Just write the entry. */ + + /* + * Get the current page. If this is the first write since postmaster + * started, initialize the first page. + */ + pageno = QUEUE_POS_PAGE(queue_head); + banklock = SimpleLruGetBankLock(NotifyCtl, pageno); + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Mark the page dirty before writing */ + NotifyCtl->shared->page_dirty[slotno] = true; + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check if the compact entry fits on the current page */ + if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE) + { + /* Copy the compact entry to the shared buffer */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &entry, + sizeof(AsyncQueueEntry)); + + entry_pageno = pageno; + + /* Advance queue head by the size of our compact entry */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + /* + * Page became full. Initialize the next page to ensure SLRU + * consistency (similar to what asyncQueueAddEntries does). + */ + LWLock *nextlock; + + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); } - else + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) { - /* - * The source transaction aborted or crashed, so we just - * ignore its notifications. - */ + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); } + + /* Set cleanup flag if appropriate */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; } - /* Loop back if we're not at end of page */ - } while (!reachedEndOfPage); + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) + { + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } + } + else + { + /* + * No room on current page. Move to the next page and write entry at + * offset 0; padding is unnecessary with fixed-size entries and bounded + * scans that stop at QUEUE_HEAD. + */ + LWLockRelease(banklock); - if (QUEUE_POS_EQUAL(*current, stop)) - reachedStop = true; + /* Move head to the start of the next page */ + SET_QUEUE_POS(queue_head, QUEUE_POS_PAGE(queue_head) + 1, 0); - return reachedStop; + /* Ensure next page is present */ + banklock = SimpleLruGetBankLock(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + LWLockAcquire(banklock, LW_EXCLUSIVE); + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + NotifyCtl->shared->page_dirty[slotno] = true; + + /* Write entry at beginning of the new page */ + memcpy(NotifyCtl->shared->page_buffer[slotno], &entry, sizeof(AsyncQueueEntry)); + + entry_pageno = QUEUE_POS_PAGE(queue_head); + + /* Advance queue head and initialize subsequent page if needed */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + LWLock *nextlock; + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + } + + /* Update the global queue head and consume reservation (not in recovery) */ + QUEUE_HEAD = queue_head; + if (!RecoveryInProgress()) + { + Assert(asyncQueueControl->reservedEntries > 0); + asyncQueueControl->reservedEntries--; + } + } + + /* Update per-page minimum and remove from uncommitted list under locks. */ + if (entry_pageno >= 0) + { + /* Caller holds NotifyQueueLock EXCLUSIVE (see xact.c commit path). */ + NotifyPageMinUpdateForPage(entry_pageno, notify_lsn); + UncommittedNotifyRemoveByFullXid(GetTopFullTransactionIdIfAny()); + } + + LWLockRelease(banklock); } /* @@ -2161,6 +2466,8 @@ asyncQueueAdvanceTail(void) SimpleLruTruncate(NotifyCtl, newtailpage); LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + /* Invalidate per-page mins for pages we are truncating */ + NotifyPageMinInvalidateRange(oldtailpage, newtailpage); QUEUE_STOP_PAGE = newtailpage; LWLockRelease(NotifyQueueLock); } @@ -2395,3 +2702,59 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * Write a WAL record containing async notification data + * + * This logs notification data to WAL, allowing us to release locks earlier + * and maintain commit ordering through WAL's natural ordering guarantees. + */ +XLogRecPtr +LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data) +{ + xl_async_notify_data xlrec; + + + xlrec.dbid = dboid; + xlrec.xid = xid; + xlrec.srcPid = srcPid; + xlrec.nnotifications = nnotifications; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData); + XLogRegisterData(data, data_len); + + (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA); + + /* Return the begin LSN of the record we just inserted. */ + return ProcLastRecPtr; +} + +/* + * Redo function for async notification WAL records + * + * During recovery, we need to replay notification records. For now, + * we'll add them to the traditional notification queue. In a complete + * implementation, replaying backends would read directly from WAL. + */ +void +async_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_ASYNC_NOTIFY_DATA: + /* + * For notification data records, we don't need to do anything + * during recovery since listeners will read directly from WAL. + * The data is already durably stored in the WAL record itself. + */ + break; + + + default: + elog(PANIC, "async_redo: unknown op code %u", info); + } +} diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 8f4b282c6b1..b35b007e51c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -19,6 +19,7 @@ #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "catalog/storage_xlog.h" +#include "commands/async.h" #include "commands/dbcommands_xlog.h" #include "fe_utils/archive.h" #include "filemap.h" diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index fac509ed134..03e73ae33c9 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -8,6 +8,7 @@ #define FRONTEND 1 #include "postgres.h" +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" @@ -23,6 +24,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/storage_xlog.h" +#include "commands/async.h" #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h new file mode 100644 index 00000000000..d4c0c828e84 --- /dev/null +++ b/src/include/access/async_xlog.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * async_xlog.h + * Async notification WAL definitions + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/async_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef ASYNC_XLOG_H +#define ASYNC_XLOG_H + +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +/* + * WAL record types for async notifications + */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 /* notification data */ + +/* + * WAL record for notification data (written in PreCommit_Notify) + */ +typedef struct xl_async_notify_data +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + int32 srcPid; /* source backend PID */ + uint32 nnotifications; /* number of notifications */ + /* followed by serialized notification data */ +} xl_async_notify_data; + +#define SizeOfAsyncNotifyData (offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32)) + +extern void async_redo(XLogReaderState *record); +extern void async_desc(StringInfo buf, XLogReaderState *record); +extern const char *async_identify(uint8 info); + +#endif /* ASYNC_XLOG_H */ \ No newline at end of file diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 8e7fc9db877..58293e05165 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 4528e51829e..2d8709ba51e 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -195,6 +195,7 @@ typedef struct SavedTransactionCharacteristics #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) #define XACT_XINFO_HAS_GID (1U << 7) #define XACT_XINFO_HAS_DROPPED_STATS (1U << 8) +#define XACT_XINFO_HAS_NOTIFY (1U << 9) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -318,6 +319,11 @@ typedef struct xl_xact_origin TimestampTz origin_timestamp; } xl_xact_origin; +typedef struct xl_xact_notify +{ + XLogRecPtr notify_lsn; /* LSN of notification data in WAL */ +} xl_xact_notify; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -331,6 +337,7 @@ typedef struct xl_xact_commit /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */ /* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */ + /* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -404,6 +411,8 @@ typedef struct xl_xact_parsed_commit XLogRecPtr origin_lsn; TimestampTz origin_timestamp; + + XLogRecPtr notify_lsn; /* LSN of notification data */ } xl_xact_parsed_commit; typedef xl_xact_parsed_commit xl_xact_parsed_prepare; diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..404eb62c0a9 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -14,11 +14,25 @@ #define ASYNC_H #include +#include "access/xlogreader.h" +#include "lib/stringinfo.h" extern PGDLLIMPORT bool Trace_notify; extern PGDLLIMPORT int max_notify_queue_pages; extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; +/* + * Compact SLRU queue entry - stores metadata pointing to WAL data + */ +typedef struct AsyncQueueEntry +{ + Oid dboid; /* database ID for quick filtering */ + TransactionId xid; /* transaction ID */ + XLogRecPtr notify_lsn; /* LSN of notification data in WAL */ +} AsyncQueueEntry; + +#define ASYNC_QUEUE_ENTRY_SIZE sizeof(AsyncQueueEntry) + extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); @@ -46,4 +60,17 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* WAL-based notification functions */ +extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data); +extern void async_redo(XLogReaderState *record); +extern void async_desc(StringInfo buf, XLogReaderState *record); +extern const char *async_identify(uint8 info); + +/* notification queue functions */ +extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn); + +/* Spill helper to be called before WAL recycle */ +extern bool AsyncNotifyOldestRequiredLSN(XLogRecPtr *oldest_lsn); + #endif /* ASYNC_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c6f5ebceefd..71459fe5529 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -301,6 +301,9 @@ struct PGPROC uint32 wait_event_info; /* proc's wait information */ + /* Support for async notifications */ + XLogRecPtr notifyCommitLsn; /* LSN of notification data for current xact */ + /* Support for group transaction status update. */ bool clogGroupMember; /* true, if member of clog group */ pg_atomic_uint32 clogGroupNext; /* next clog group member */ diff --git a/src/test/modules/test_listen_notify/Makefile b/src/test/modules/test_listen_notify/Makefile new file mode 100644 index 00000000000..da1bf5bb1b7 --- /dev/null +++ b/src/test/modules/test_listen_notify/Makefile @@ -0,0 +1,17 @@ +# src/test/modules/test_listen_notify/Makefile + +MODULE = test_listen_notify +PGFILEDESC = "test_listen_notify - regression testing for LISTEN/NOTIFY support" + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = src/test/modules/test_listen_notify +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/src/test/modules/test_listen_notify/meson.build b/src/test/modules/test_listen_notify/meson.build new file mode 100644 index 00000000000..8119e6c761f --- /dev/null +++ b/src/test/modules/test_listen_notify/meson.build @@ -0,0 +1,13 @@ +# Copyright (c) 2022-2025, PostgreSQL Global Development Group + +tests += { + 'name': 'test_listen_notify', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_xid_freeze.pl', + ], + }, +} + diff --git a/src/test/modules/test_listen_notify/t/002_queue_full.pl b/src/test/modules/test_listen_notify/t/002_queue_full.pl new file mode 100644 index 00000000000..a255dc20fa9 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/002_queue_full.pl @@ -0,0 +1,89 @@ +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::BackgroundPsql; +use IPC::Run qw(start pump finish timeout); +use Test::More; + +# Verify that NOTIFY errors when the notification queue reaches the configured +# maximum page distance. We force max_notify_queue_pages to 1 so we can reach +# the limit quickly. With fixed-size compact entries, the last slot on the page +# cannot be used because advancing would require preparing the next page, which +# exceeds the allowed window. + +my $node = PostgreSQL::Test::Cluster->new('t_queue_full'); +$node->init; +$node->append_conf('postgresql.conf', qq{ +max_notify_queue_pages = 64 +fsync = off +synchronous_commit = off +full_page_writes = off +autovacuum = off +}); +$node->start; + +# Create a listener that registers and then stays in a transaction so it does +# not process incoming notifications, preventing the queue tail from advancing. +my $listener = $node->background_psql('postgres'); +$listener->query_safe('LISTEN tap_queue_full'); +$listener->query_safe('BEGIN'); + +# Launch multiple concurrent psql senders that issue many autocommit NOTIFY +# statements. Stop as soon as one fails with the expected error. +my $n_senders = 4; +my @handles; +my @in; my @out; my @err; +my $full_seen = 0; +my $stderr_msg = ''; + +for my $i (0..$n_senders-1) { + my @cmd = ( + $node->installed_command('psql'), + '--no-psqlrc', '--no-align', '--tuples-only', '--quiet', + '--dbname' => $node->connstr('postgres'), + '--set' => 'ON_ERROR_STOP=1', + '--file' => '-', + ); + $in[$i] = ''; + $out[$i] = ''; + $err[$i] = ''; + my $tmo = timeout($PostgreSQL::Test::Utils::timeout_default); + $handles[$i] = start \@cmd, '<' => \$in[$i], '>' => \$out[$i], '2>' => \$err[$i], $tmo; +} + +my $chunk = ("NOTIFY tap_queue_full, 'x';\n" x 1000); +my $iterations = 0; +ITER: while (!$full_seen && $iterations < 1000) { + $iterations++; + for my $i (0..$#handles) { + next if $full_seen; # break outer fast + # feed a chunk to this sender + $in[$i] .= $chunk; + $handles[$i]->pump(); + # if this sender errored, capture message + if ($err[$i] =~ /asynchronous notification queue is full/i) { + $stderr_msg = $err[$i]; + $full_seen = 1; + last ITER; + } + } +} + +# Terminate all senders +for my $i (0..$#handles) { + # try to finish gracefully; ignore errors + eval { $handles[$i]->finish; 1 } or do {}; +} + +ok($full_seen, 'NOTIFY fails once queue reaches configured maximum'); +like($stderr_msg, qr/asynchronous notification queue is full/i, + 'error message mentions full NOTIFY queue'); + +# Cleanup listener +$listener->query('ROLLBACK'); +$listener->quit(); + +$node->stop('fast'); + +done_testing(); diff --git a/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl b/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl new file mode 100644 index 00000000000..cb36ef26832 --- /dev/null +++ b/src/test/modules/test_listen_notify/t/003_wal_pin_test.pl @@ -0,0 +1,102 @@ +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use IPC::Run qw(start pump finish timeout); +use Test::More; + +# Goal: Verify that NOTIFY pins WAL segments via notify LSN so WAL recycling +# does not remove needed segments. With a listener idle-in-transaction, queued +# NOTIFY entries remain unconsumed, so RemoveOldXlogFiles should keep older +# segments. After releasing the listener, notifications are delivered and WAL +# recycling can proceed. + +my $node = PostgreSQL::Test::Cluster->new('wal_pin'); +$node->init; +$node->append_conf('postgresql.conf', qq{ +fsync = off +synchronous_commit = off +full_page_writes = off +autovacuum = off +wal_level = replica +max_wal_size = '64MB' +min_wal_size = '32MB' +checkpoint_timeout = '30s' +trace_notify = on +log_min_messages = debug1 +}); +$node->start; + +# Helper to count WAL segment files in pg_wal (24-hex-digit filenames) +sub count_wal_files { + my $wal_dir = $node->basedir . '/pgdata/pg_wal'; + opendir(my $dh, $wal_dir) or die "cannot open pg_wal: $!"; + my @segs = grep { /^[0-9A-F]{24}$/ } readdir($dh); + closedir($dh); + return scalar(@segs); +} + +# Start a psql session that LISTENs and then pins by going idle-in-xact. +my $psql = [ $node->installed_command('psql'), '--no-psqlrc', '--quiet', '--dbname' => $node->connstr('postgres') ]; +my ($in, $out, $err) = ('','',''); +my $tmo = timeout($PostgreSQL::Test::Utils::timeout_default); +my $h = start $psql, '<' => \$in, '>' => \$out, '2>' => \$err, $tmo; + +$in .= "\\set ON_ERROR_STOP 1\nLISTEN wal_pin_t;\n"; +$h->pump(); + +# Commit the LISTEN (autocommit), then begin a transaction to be idle-in-xact. +$in .= "BEGIN;\n"; +$h->pump(); + +# Produce a bunch of NOTIFYs in autocommit to populate the queue and WAL. +my $notify_count = 2000; +my $sql = ("NOTIFY wal_pin_t, 'p';\n" x $notify_count); +my ($ret, $stdout, $stderr) = $node->psql('postgres', "\\set ON_ERROR_STOP 1\n$sql"); +is($ret, 0, 'NOTIFY batch succeeded'); + +# Force WAL generation and recycling attempts. +for my $i (1..150) { + $node->safe_psql('postgres', 'SELECT pg_switch_wal()'); +} +for my $i (1..20) { + $node->safe_psql('postgres', 'CHECKPOINT'); +} + +# Assert that NOTIFY WAL pinning path was invoked. +my $log = $node->logfile; +open my $lfh, '<', $log or die "cannot open postmaster log $log: $!"; +my $logtxt = do { local $/; <$lfh> }; +close $lfh; +like($logtxt, qr/async notify: (?:WAL recycle pinned by oldest notify LSN|checking WAL pin; oldest notify LSN)/i, + 'saw WAL pinning path invoked'); + +note('completed WAL switch/checkpoint churn with listener pinned'); + +# Release listener so it can process notifications, and force a round trip. +$in .= "ROLLBACK;\nSELECT 1;\n"; +$h->pump(); + +# Pump to capture async outputs printed by psql +my $saw = 0; +for my $i (1..1000) { + eval { $h->pump(); 1 } or do { }; + if ($out =~ /Asynchronous notification/i) { $saw = 1; last; } + select(undef, undef, undef, 0.02); +} +ok($saw, 'notifications printed after releasing listener'); + +# Encourage tail advancement and WAL cleanup. +for my $i (1..20) { + $node->safe_psql('postgres', 'SELECT pg_notification_queue_usage()'); + $node->safe_psql('postgres', 'CHECKPOINT'); +} + +note('post-release: queue advanced and checkpoints executed'); + +$in .= "\\q\n"; +eval { $h->finish; 1 } or do {}; + +$node->stop('fast'); + +done_testing(); -- 2.47.1