From d37772e3337a083e719be2fc9d93474792bc8c8d Mon Sep 17 00:00:00 2001 From: rbagga Date: Sun, 7 Sep 2025 16:55:57 -0700 Subject: [PATCH v4] Implement WAL-based async notifications for improved throughput - Added WAL logging for async notifications to improve scalability - Implemented async resource manager for WAL-based notification handling - Added new async descriptor files for pg_waldump support - Updated makefiles and build configuration for new components --- src/backend/access/rmgrdesc/Makefile | 1 + src/backend/access/rmgrdesc/asyncdesc.c | 47 ++ src/backend/access/rmgrdesc/meson.build | 1 + src/backend/access/rmgrdesc/xactdesc.c | 13 + src/backend/access/transam/rmgr.c | 1 + src/backend/access/transam/xact.c | 48 +- src/backend/commands/async.c | 800 ++++++++++++------------ src/bin/pg_rewind/parsexlog.c | 1 + src/bin/pg_waldump/rmgrdesc.c | 2 + src/include/access/async_xlog.h | 43 ++ src/include/access/rmgrlist.h | 1 + src/include/access/xact.h | 9 + src/include/commands/async.h | 25 + src/include/storage/proc.h | 3 + 14 files changed, 591 insertions(+), 404 deletions(-) create mode 100644 src/backend/access/rmgrdesc/asyncdesc.c create mode 100644 src/include/access/async_xlog.h diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile index cd95eec37f1..6e6e75b12bd 100644 --- a/src/backend/access/rmgrdesc/Makefile +++ b/src/backend/access/rmgrdesc/Makefile @@ -9,6 +9,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = \ + asyncdesc.o \ brindesc.o \ clogdesc.o \ committsdesc.o \ diff --git a/src/backend/access/rmgrdesc/asyncdesc.c b/src/backend/access/rmgrdesc/asyncdesc.c new file mode 100644 index 00000000000..7f322849ff1 --- /dev/null +++ b/src/backend/access/rmgrdesc/asyncdesc.c @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * asyncdesc.c + * rmgr descriptor routines for access/async.c + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/rmgrdesc/asyncdesc.c + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/async_xlog.h" + +void +async_desc(StringInfo buf, XLogReaderState *record) +{ + char *rec = XLogRecGetData(record); + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + if (info == XLOG_ASYNC_NOTIFY_DATA) + { + xl_async_notify_data *xlrec = (xl_async_notify_data *) rec; + + appendStringInfo(buf, "notify data: db %u xid %u pid %d notifications %u", + xlrec->dbid, xlrec->xid, xlrec->srcPid, xlrec->nnotifications); + } +} + +const char * +async_identify(uint8 info) +{ + const char *id = NULL; + + switch (info & ~XLR_INFO_MASK) + { + case XLOG_ASYNC_NOTIFY_DATA: + id = "NOTIFY_DATA"; + break; + } + + return id; +} \ No newline at end of file diff --git a/src/backend/access/rmgrdesc/meson.build b/src/backend/access/rmgrdesc/meson.build index 96c98e800c2..38bef2e87f6 100644 --- a/src/backend/access/rmgrdesc/meson.build +++ b/src/backend/access/rmgrdesc/meson.build @@ -2,6 +2,7 @@ # used by frontend programs like pg_waldump rmgr_desc_sources = files( + 'asyncdesc.c', 'brindesc.c', 'clogdesc.c', 'committsdesc.c', diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index f0f696855b9..4f32f7fc591 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -135,6 +135,19 @@ ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *pars data += sizeof(xl_xact_origin); } + + if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY) + { + xl_xact_notify xl_notify; + + /* no alignment is guaranteed, so copy onto stack */ + memcpy(&xl_notify, data, sizeof(xl_notify)); + + parsed->notify_lsn = xl_notify.notify_lsn; + + data += sizeof(xl_xact_notify); + } + } void diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c index 1b7499726eb..f8c25e6597a 100644 --- a/src/backend/access/transam/rmgr.c +++ b/src/backend/access/transam/rmgr.c @@ -19,6 +19,7 @@ /* includes needed for "access/rmgrlist.h" */ /* IWYU pragma: begin_keep */ +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index b46e7e9c2a6..33b16ff4746 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5841,10 +5841,24 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; + xl_xact_notify xl_notify; uint8 info; + XLogRecPtr result; Assert(CritSectionCount > 0); + /* + * Handle notification commit ordering: if this transaction has pending + * notifications, we must write the queue entry just before the commit + * record while holding NotifyQueueLock to ensure proper ordering. + */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + TransactionId xid = GetCurrentTransactionId(); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); + asyncQueueAddCompactEntry(MyDatabaseId, xid, MyProc->notifyCommitLsn); + } + xl_xinfo.xinfo = 0; /* decide between a plain and 2pc commit */ @@ -5926,9 +5940,17 @@ XactLogCommitRecord(TimestampTz commit_time, xl_origin.origin_timestamp = replorigin_session_origin_timestamp; } + /* include notification information if present */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_NOTIFY; + xl_notify.notify_lsn = MyProc->notifyCommitLsn; + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; + /* Then include all the collected data into the commit record. */ XLogBeginInsert(); @@ -5982,10 +6004,28 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) XLogRegisterData(&xl_origin, sizeof(xl_xact_origin)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_NOTIFY) + XLogRegisterData(&xl_notify, sizeof(xl_xact_notify)); + /* we allow filtering by xacts */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsert(RM_XACT_ID, info); + /* Insert the commit record */ + result = XLogInsert(RM_XACT_ID, info); + + /* + * Release NotifyQueueLock if we held it. The queue entry is now + * associated with a committed transaction, so readers can process it. + */ + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + LWLockRelease(NotifyQueueLock); + + /* Signal listening backends to check for new notifications */ + SignalBackends(); + } + + return result; } /* @@ -6227,6 +6267,12 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, false /* backward */ , false /* WAL */ ); } + /* Add notification queue entry if this commit has notifications */ + if (parsed->xinfo & XACT_XINFO_HAS_NOTIFY) + { + asyncQueueAddCompactEntry(parsed->dbId, xid, parsed->notify_lsn); + } + /* Make sure files supposed to be dropped are dropped */ if (parsed->nrels > 0) { diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4bd37d5beb5..57fa732e9b8 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -133,6 +133,12 @@ #include "access/slru.h" #include "access/transam.h" #include "access/xact.h" +#include "access/xlog.h" +#include "access/xloginsert.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "access/xlogrecovery.h" +#include "access/xlog_internal.h" #include "catalog/pg_database.h" #include "commands/async.h" #include "common/hashfn.h" @@ -151,6 +157,29 @@ #include "utils/snapmgr.h" #include "utils/timestamp.h" +/* Missing definitions for WAL-based notification system */ +#define AsyncQueueEntryEmptySize ASYNC_QUEUE_ENTRY_SIZE +#define SLRU_PAGE_SIZE BLCKSZ +#define AsyncCtl NotifyCtl + +/* WAL record types */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 + +/* + * WAL record for notification data (written in PreCommit_Notify) + */ +typedef struct xl_async_notify_data +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + int32 srcPid; /* source backend PID */ + uint32 nnotifications; /* number of notifications */ + /* followed by serialized notification data */ +} xl_async_notify_data; + +#define SizeOfAsyncNotifyData (offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32)) + + /* * Maximum size of a NOTIFY payload, including terminating NULL. This @@ -163,30 +192,13 @@ #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) /* - * Struct representing an entry in the global notify queue - * - * This struct declaration has the maximal length, but in a real queue entry - * the data area is only big enough for the actual channel and payload strings - * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible - * entry size, if both channel and payload strings are empty (but note it - * doesn't include alignment padding). - * - * The "length" field should always be rounded up to the next QUEUEALIGN - * multiple so that all fields are properly aligned. + * NOTE: The AsyncQueueEntry structure is now defined in commands/async.h + * as a compact metadata-only structure for the new WAL-based notification system. + * The old variable-length structure with full notification content is no longer used. */ -typedef struct AsyncQueueEntry -{ - int length; /* total allocated length of entry */ - Oid dboid; /* sender's database OID */ - TransactionId xid; /* sender's XID */ - int32 srcPid; /* sender's PID */ - char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; -} AsyncQueueEntry; - -/* Currently, no field of AsyncQueueEntry requires more than int alignment */ -#define QUEUEALIGN(len) INTALIGN(len) -#define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) +/* Queue alignment is still needed for SLRU page management */ +#define QUEUEALIGN(len) INTALIGN(len) /* * Struct describing a queue position, and assorted macros for working with it @@ -438,18 +450,13 @@ static void Exec_UnlistenCommit(const char *channel); static void Exec_UnlistenAllCommit(void); static bool IsListeningOn(const char *channel); static void asyncQueueUnregister(void); -static bool asyncQueueIsFull(void); static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); -static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); -static ListCell *asyncQueueAddEntries(ListCell *nextNotify); static double asyncQueueUsage(void); -static void asyncQueueFillWarning(void); -static void SignalBackends(void); +void SignalBackends(void); static void asyncQueueReadAllNotifications(void); static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, - Snapshot snapshot); + char *page_buffer); static void asyncQueueAdvanceTail(void); static void ProcessIncomingNotify(bool flush); static bool AsyncExistsPendingNotify(Notification *n); @@ -457,6 +464,7 @@ static void AddEventToPendingNotifies(Notification *n); static uint32 notification_hash(const void *key, Size keysize); static int notification_match(const void *key1, const void *key2, Size keysize); static void ClearPendingActionsAndNotifies(void); +static void processNotificationFromWAL(XLogRecPtr notify_lsn); /* * Compute the difference between two queue page numbers. @@ -890,65 +898,75 @@ PreCommit_Notify(void) } } - /* Queue any pending notifies (must happen after the above) */ + /* Write notification data to WAL if we have any */ if (pendingNotifies) { - ListCell *nextNotify; + TransactionId currentXid; + ListCell *l; + size_t total_size = 0; + uint32 nnotifications = 0; + char *notifications_data; + char *ptr; + XLogRecPtr notify_lsn; /* * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not - * so cheap if we don't, and we'd prefer not to do that work while - * holding NotifyQueueLock. + * so cheap if we don't. */ - (void) GetCurrentTransactionId(); + currentXid = GetCurrentTransactionId(); /* - * Serialize writers by acquiring a special lock that we hold till - * after commit. This ensures that queue entries appear in commit - * order, and in particular that there are never uncommitted queue - * entries ahead of committed ones, so an uncommitted transaction - * can't block delivery of deliverable notifications. - * - * We use a heavyweight lock so that it'll automatically be released - * after either commit or abort. This also allows deadlocks to be - * detected, though really a deadlock shouldn't be possible here. - * - * The lock is on "database 0", which is pretty ugly but it doesn't - * seem worth inventing a special locktag category just for this. - * (Historical note: before PG 9.0, a similar lock on "database 0" was - * used by the flatfiles mechanism.) + * Step 1: Write notification data to WAL. + * This can be done in parallel with other transactions since we're + * not holding any global locks yet. */ - LockSharedObject(DatabaseRelationId, InvalidOid, 0, - AccessExclusiveLock); + + /* First pass: calculate total size needed for serialization */ + foreach(l, pendingNotifies->events) + { + Notification *n = (Notification *) lfirst(l); + + /* Size: 2 bytes for channel_len + 2 bytes for payload_len + strings */ + total_size += 4 + n->channel_len + 1 + n->payload_len + 1; + nnotifications++; + } - /* Now push the notifications into the queue */ - nextNotify = list_head(pendingNotifies->events); - while (nextNotify != NULL) + /* Allocate buffer for notification data */ + notifications_data = palloc(total_size); + ptr = notifications_data; + + /* Second pass: serialize all notifications */ + foreach(l, pendingNotifies->events) { - /* - * Add the pending notifications to the queue. We acquire and - * release NotifyQueueLock once per page, which might be overkill - * but it does allow readers to get in while we're doing this. - * - * A full queue is very uncommon and should really not happen, - * given that we have so much space available in the SLRU pages. - * Nevertheless we need to deal with this possibility. Note that - * when we get here we are in the process of committing our - * transaction, but we have not yet committed to clog, so at this - * point in time we can still roll the transaction back. - */ - LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); - asyncQueueFillWarning(); - if (asyncQueueIsFull()) - ereport(ERROR, - (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), - errmsg("too many notifications in the NOTIFY queue"))); - nextNotify = asyncQueueAddEntries(nextNotify); - LWLockRelease(NotifyQueueLock); + Notification *n = (Notification *) lfirst(l); + char *channel = n->data; + char *payload = n->data + n->channel_len + 1; + + /* Write channel length, payload length, channel, and payload */ + memcpy(ptr, &n->channel_len, 2); + ptr += 2; + memcpy(ptr, &n->payload_len, 2); + ptr += 2; + memcpy(ptr, channel, n->channel_len + 1); + ptr += n->channel_len + 1; + memcpy(ptr, payload, n->payload_len + 1); + ptr += n->payload_len + 1; } - /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ + /* Write notification data to WAL */ + notify_lsn = LogAsyncNotifyData(MyDatabaseId, currentXid, MyProcPid, + nnotifications, total_size, + notifications_data); + + pfree(notifications_data); + + /* + * Step 2: Store the notification LSN in PROC for use during commit. + * The queue entry will be written just before the commit record + * while holding the global notification commit lock to ensure proper ordering. + */ + MyProc->notifyCommitLsn = notify_lsn; } } @@ -1006,12 +1024,19 @@ AtCommit_Notify(void) asyncQueueUnregister(); /* - * Send signals to listening backends. We need do this only if there are - * pending notifies, which were previously added to the shared queue by - * PreCommit_Notify(). + * If we had notifications, they were already written to the queue in + * PreCommit_Notify. Now that we've committed, signal listening backends + * to check the queue. The transaction visibility logic will now see our + * XID as committed and process the notifications. */ - if (pendingNotifies != NULL) + if (!XLogRecPtrIsInvalid(MyProc->notifyCommitLsn)) + { + /* Signal listening backends to check the queue */ SignalBackends(); + + /* Clear the flag now that we're done */ + MyProc->notifyCommitLsn = InvalidXLogRecPtr; + } /* * If it's time to try to advance the global tail pointer, do that. @@ -1263,21 +1288,6 @@ asyncQueueUnregister(void) amRegisteredListener = false; } -/* - * Test whether there is room to insert more notification messages. - * - * Caller must hold at least shared NotifyQueueLock. - */ -static bool -asyncQueueIsFull(void) -{ - int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD); - int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); - int64 occupied = headPage - tailPage; - - return occupied >= max_notify_queue_pages; -} - /* * Advance the QueuePosition to the next entry, assuming that the current * entry is of length entryLength. If we jump to a new page the function @@ -1313,166 +1323,6 @@ asyncQueueAdvance(volatile QueuePosition *position, int entryLength) return pageJump; } -/* - * Fill the AsyncQueueEntry at *qe with an outbound notification message. - */ -static void -asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) -{ - size_t channellen = n->channel_len; - size_t payloadlen = n->payload_len; - int entryLength; - - Assert(channellen < NAMEDATALEN); - Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); - - /* The terminators are already included in AsyncQueueEntryEmptySize */ - entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; - entryLength = QUEUEALIGN(entryLength); - qe->length = entryLength; - qe->dboid = MyDatabaseId; - qe->xid = GetCurrentTransactionId(); - qe->srcPid = MyProcPid; - memcpy(qe->data, n->data, channellen + payloadlen + 2); -} - -/* - * Add pending notifications to the queue. - * - * We go page by page here, i.e. we stop once we have to go to a new page but - * we will be called again and then fill that next page. If an entry does not - * fit into the current page, we write a dummy entry with an InvalidOid as the - * database OID in order to fill the page. So every page is always used up to - * the last byte which simplifies reading the page later. - * - * We are passed the list cell (in pendingNotifies->events) containing the next - * notification to write and return the first still-unwritten cell back. - * Eventually we will return NULL indicating all is done. - * - * We are holding NotifyQueueLock already from the caller and grab - * page specific SLRU bank lock locally in this function. - */ -static ListCell * -asyncQueueAddEntries(ListCell *nextNotify) -{ - AsyncQueueEntry qe; - QueuePosition queue_head; - int64 pageno; - int offset; - int slotno; - LWLock *prevlock; - - /* - * We work with a local copy of QUEUE_HEAD, which we write back to shared - * memory upon exiting. The reason for this is that if we have to advance - * to a new page, SimpleLruZeroPage might fail (out of disk space, for - * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, - * subsequent insertions would try to put entries into a page that slru.c - * thinks doesn't exist yet.) So, use a local position variable. Note - * that if we do fail, any already-inserted queue entries are forgotten; - * this is okay, since they'd be useless anyway after our transaction - * rolls back. - */ - queue_head = QUEUE_HEAD; - - /* - * If this is the first write since the postmaster started, we need to - * initialize the first page of the async SLRU. Otherwise, the current - * page should be initialized already, so just fetch it. - */ - pageno = QUEUE_POS_PAGE(queue_head); - prevlock = SimpleLruGetBankLock(NotifyCtl, pageno); - - /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ - LWLockAcquire(prevlock, LW_EXCLUSIVE); - - if (QUEUE_POS_IS_ZERO(queue_head)) - slotno = SimpleLruZeroPage(NotifyCtl, pageno); - else - slotno = SimpleLruReadPage(NotifyCtl, pageno, true, - InvalidTransactionId); - - /* Note we mark the page dirty before writing in it */ - NotifyCtl->shared->page_dirty[slotno] = true; - - while (nextNotify != NULL) - { - Notification *n = (Notification *) lfirst(nextNotify); - - /* Construct a valid queue entry in local variable qe */ - asyncQueueNotificationToEntry(n, &qe); - - offset = QUEUE_POS_OFFSET(queue_head); - - /* Check whether the entry really fits on the current page */ - if (offset + qe.length <= QUEUE_PAGESIZE) - { - /* OK, so advance nextNotify past this item */ - nextNotify = lnext(pendingNotifies->events, nextNotify); - } - else - { - /* - * Write a dummy entry to fill up the page. Actually readers will - * only check dboid and since it won't match any reader's database - * OID, they will ignore this entry and move on. - */ - qe.length = QUEUE_PAGESIZE - offset; - qe.dboid = InvalidOid; - qe.data[0] = '\0'; /* empty channel */ - qe.data[1] = '\0'; /* empty payload */ - } - - /* Now copy qe into the shared buffer page */ - memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, - &qe, - qe.length); - - /* Advance queue_head appropriately, and detect if page is full */ - if (asyncQueueAdvance(&(queue_head), qe.length)) - { - LWLock *lock; - - pageno = QUEUE_POS_PAGE(queue_head); - lock = SimpleLruGetBankLock(NotifyCtl, pageno); - if (lock != prevlock) - { - LWLockRelease(prevlock); - LWLockAcquire(lock, LW_EXCLUSIVE); - prevlock = lock; - } - - /* - * Page is full, so we're done here, but first fill the next page - * with zeroes. The reason to do this is to ensure that slru.c's - * idea of the head page is always the same as ours, which avoids - * boundary problems in SimpleLruTruncate. The test in - * asyncQueueIsFull() ensured that there is room to create this - * page without overrunning the queue. - */ - slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); - - /* - * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, - * set flag to remember that we should try to advance the tail - * pointer (we don't want to actually do that right here). - */ - if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) - tryAdvanceTail = true; - - /* And exit the loop */ - break; - } - } - - /* Success, so update the global QUEUE_HEAD */ - QUEUE_HEAD = queue_head; - - LWLockRelease(prevlock); - - return nextNotify; -} - /* * SQL function to return the fraction of the notification queue currently * occupied. @@ -1515,52 +1365,6 @@ asyncQueueUsage(void) return (double) occupied / (double) max_notify_queue_pages; } -/* - * Check whether the queue is at least half full, and emit a warning if so. - * - * This is unlikely given the size of the queue, but possible. - * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. - * - * Caller must hold exclusive NotifyQueueLock. - */ -static void -asyncQueueFillWarning(void) -{ - double fillDegree; - TimestampTz t; - - fillDegree = asyncQueueUsage(); - if (fillDegree < 0.5) - return; - - t = GetCurrentTimestamp(); - - if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, - t, QUEUE_FULL_WARN_INTERVAL)) - { - QueuePosition min = QUEUE_HEAD; - int32 minPid = InvalidPid; - - for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) - { - Assert(QUEUE_BACKEND_PID(i) != InvalidPid); - min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); - if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) - minPid = QUEUE_BACKEND_PID(i); - } - - ereport(WARNING, - (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100), - (minPid != InvalidPid ? - errdetail("The server process with PID %d is among those with the oldest transactions.", minPid) - : 0), - (minPid != InvalidPid ? - errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.") - : 0))); - - asyncQueueControl->lastQueueFillWarn = t; - } -} /* * Send signals to listening backends. @@ -1577,7 +1381,7 @@ asyncQueueFillWarning(void) * This is called during CommitTransaction(), so it's important for it * to have very low probability of failure. */ -static void +void SignalBackends(void) { int32 *pids; @@ -1844,15 +1648,13 @@ ProcessNotifyInterrupt(bool flush) /* * Read all pending notifications from the queue, and deliver appropriate - * ones to my frontend. Stop when we reach queue head or an uncommitted - * notification. + * ones to my frontend. Stop when we reach queue head. */ static void asyncQueueReadAllNotifications(void) { volatile QueuePosition pos; QueuePosition head; - Snapshot snapshot; /* page_buffer must be adequately aligned, so use a union */ union @@ -1875,46 +1677,6 @@ asyncQueueReadAllNotifications(void) return; } - /*---------- - * Get snapshot we'll use to decide which xacts are still in progress. - * This is trickier than it might seem, because of race conditions. - * Consider the following example: - * - * Backend 1: Backend 2: - * - * transaction starts - * UPDATE foo SET ...; - * NOTIFY foo; - * commit starts - * queue the notify message - * transaction starts - * LISTEN foo; -- first LISTEN in session - * SELECT * FROM foo WHERE ...; - * commit to clog - * commit starts - * add backend 2 to array of listeners - * advance to queue head (this code) - * commit to clog - * - * Transaction 2's SELECT has not seen the UPDATE's effects, since that - * wasn't committed yet. Ideally we'd ensure that client 2 would - * eventually get transaction 1's notify message, but there's no way - * to do that; until we're in the listener array, there's no guarantee - * that the notify message doesn't get removed from the queue. - * - * Therefore the coding technique transaction 2 is using is unsafe: - * applications must commit a LISTEN before inspecting database state, - * if they want to ensure they will see notifications about subsequent - * changes to that state. - * - * What we do guarantee is that we'll see all notifications from - * transactions committing after the snapshot we take here. - * Exec_ListenPreCommit has already added us to the listener array, - * so no not-yet-committed messages can be removed from the queue - * before we see them. - *---------- - */ - snapshot = RegisterSnapshot(GetLatestSnapshot()); /* * It is possible that we fail while trying to send a message to our @@ -1979,8 +1741,7 @@ asyncQueueReadAllNotifications(void) * while sending the notifications to the frontend. */ reachedStop = asyncQueueProcessPageEntries(&pos, head, - page_buffer.buf, - snapshot); + page_buffer.buf); } while (!reachedStop); } PG_FINALLY(); @@ -1992,8 +1753,6 @@ asyncQueueReadAllNotifications(void) } PG_END_TRY(); - /* Done with snapshot */ - UnregisterSnapshot(snapshot); } /* @@ -2004,19 +1763,17 @@ asyncQueueReadAllNotifications(void) * memory. (We could access the page right in shared memory, but that * would imply holding the SLRU bank lock throughout this routine.) * - * We stop if we reach the "stop" position, or reach a notification from an - * uncommitted transaction, or reach the end of the page. + * We stop if we reach the "stop" position or reach the end of the page. * - * The function returns true once we have reached the stop position or an - * uncommitted notification, and false if we have finished with the page. + * The function returns true once we have reached the stop position, and false + * if we have finished with the page. * In other words: once it returns true there is no need to look further. * The QueuePosition *current is advanced past all processed messages. */ static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, QueuePosition stop, - char *page_buffer, - Snapshot snapshot) + char *page_buffer) { bool reachedStop = false; bool reachedEndOfPage; @@ -2032,60 +1789,24 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); /* - * Advance *current over this message, possibly to the next page. As - * noted in the comments for asyncQueueReadAllNotifications, we must - * do this before possibly failing while processing the message. + * Advance *current over this compact entry. The new compact entries are + * fixed-size, making this much simpler than the old variable-length entries. */ - reachedEndOfPage = asyncQueueAdvance(current, qe->length); + reachedEndOfPage = asyncQueueAdvance(current, sizeof(AsyncQueueEntry)); /* Ignore messages destined for other databases */ - if (qe->dboid == MyDatabaseId) + if (qe->dbid == MyDatabaseId) { - if (XidInMVCCSnapshot(qe->xid, snapshot)) - { - /* - * The source transaction is still in progress, so we can't - * process this message yet. Break out of the loop, but first - * back up *current so we will reprocess the message next - * time. (Note: it is unlikely but not impossible for - * TransactionIdDidCommit to fail, so we can't really avoid - * this advance-then-back-up behavior when dealing with an - * uncommitted message.) - * - * Note that we must test XidInMVCCSnapshot before we test - * TransactionIdDidCommit, else we might return a message from - * a transaction that is not yet visible to snapshots; compare - * the comments at the head of heapam_visibility.c. - * - * Also, while our own xact won't be listed in the snapshot, - * we need not check for TransactionIdIsCurrentTransactionId - * because our transaction cannot (yet) have queued any - * messages. - */ - *current = thisentry; - reachedStop = true; - break; - } - else if (TransactionIdDidCommit(qe->xid)) - { - /* qe->data is the null-terminated channel name */ - char *channel = qe->data; - - if (IsListeningOn(channel)) - { - /* payload follows channel name */ - char *payload = qe->data + strlen(channel) + 1; - - NotifyMyFrontEnd(channel, payload, qe->srcPid); - } - } - else - { - /* - * The source transaction aborted or crashed, so we just - * ignore its notifications. - */ - } + /* + * Since queue entries are written atomically with commit records + * while holding NotifyQueueLock exclusively, all entries in the queue + * are guaranteed to be from committed transactions. + * + * Step 5: Read notification data from WAL using stored LSN. + * The compact entry only contains metadata; actual notification + * content is retrieved from WAL on demand. + */ + processNotificationFromWAL(qe->notify_lsn); } /* Loop back if we're not at end of page */ @@ -2097,6 +1818,220 @@ asyncQueueProcessPageEntries(volatile QueuePosition *current, return reachedStop; } +/* + * processNotificationFromWAL + * + * Fetch notification data from WAL using the stored LSN and process + * the individual notifications for delivery to listening frontend. + * This implements Step 5 of the new WAL-based notification system. + */ +static void +processNotificationFromWAL(XLogRecPtr notify_lsn) +{ + XLogReaderState *xlogreader; + DecodedXLogRecord *record; + xl_async_notify_data *xlrec; + char *data; + char *ptr; + uint32_t remaining; + int srcPid; + char *errormsg; + + /* + * Create XLog reader to fetch the notification data record. + * We use a temporary reader since this is called during normal + * notification processing, not during recovery. + */ + xlogreader = XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.page_read = &read_local_xlog_page, + .segment_open = &wal_segment_open, + .segment_close = &wal_segment_close), + NULL); + if (!xlogreader) + elog(ERROR, "failed to allocate XLog reader for notification data"); + + /* Start reading exactly at the NOTIFY_DATA record begin LSN */ + XLogBeginRead(xlogreader, notify_lsn); + + /* Read the NOTIFY_DATA record */ + record = (DecodedXLogRecord *) XLogReadRecord(xlogreader, &errormsg); + if (record == NULL) + elog(ERROR, "failed to read notification data from WAL at %X/%X: %s", + LSN_FORMAT_ARGS(notify_lsn), errormsg ? errormsg : "no error message"); + + /* Verify this is the expected record type */ + if (XLogRecGetRmid(xlogreader) != RM_ASYNC_ID || + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK) != XLOG_ASYNC_NOTIFY_DATA) + elog(ERROR, "expected NOTIFY_DATA at %X/%X, found rmgr %u info %u", + LSN_FORMAT_ARGS(notify_lsn), + XLogRecGetRmid(xlogreader), + (XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK)); + + /* Extract the notification data from the WAL record */ + xlrec = (xl_async_notify_data *) XLogRecGetData(xlogreader); + srcPid = xlrec->srcPid; + data = (char *) xlrec + SizeOfAsyncNotifyData; + ptr = data; + remaining = XLogRecGetDataLen(xlogreader) - SizeOfAsyncNotifyData; + + /* + * Process each notification in the serialized data. + * The format is: 2-byte channel_len, 2-byte payload_len, + * null-terminated channel, null-terminated payload. + */ + for (uint32_t i = 0; i < xlrec->nnotifications && remaining >= 4; i++) + { + uint16 channel_len; + uint16 payload_len; + char *channel; + char *payload; + + /* Read lengths */ + memcpy(&channel_len, ptr, 2); + ptr += 2; + memcpy(&payload_len, ptr, 2); + ptr += 2; + remaining -= 4; + + /* Verify we have enough data */ + if (remaining < channel_len + 1 + payload_len + 1) + break; + + /* Extract channel and payload strings */ + channel = ptr; + ptr += channel_len + 1; + payload = ptr; + ptr += payload_len + 1; + remaining -= (channel_len + 1 + payload_len + 1); + + /* Deliver notification if we're listening on this channel */ + if (IsListeningOn(channel)) + NotifyMyFrontEnd(channel, payload, srcPid); + } + + /* Clean up */ + XLogReaderFree(xlogreader); +} + + +/* + * asyncQueueAddCompactEntry + * + * Add a compact entry to the notification SLRU queue containing only + * metadata (dbid, xid, notify_lsn) that points to the full notification + * data in WAL. This is much more efficient than the old approach of + * storing complete notification content in the SLRU queue. + */ +void +asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn) +{ + AsyncQueueEntry entry; + QueuePosition queue_head; + int64 pageno; + int offset; + int slotno; + LWLock *banklock; + + /* + * Fill in the compact entry with just the metadata. + * No payload data is stored here - it's all in WAL. + */ + entry.dbid = dbid; + entry.xid = xid; + entry.notify_lsn = notify_lsn; + + /* Caller should already hold NotifyQueueLock in exclusive mode */ + queue_head = QUEUE_HEAD; + + /* + * Get the current page. If this is the first write since postmaster + * started, initialize the first page. + */ + pageno = QUEUE_POS_PAGE(queue_head); + banklock = SimpleLruGetBankLock(NotifyCtl, pageno); + + LWLockAcquire(banklock, LW_EXCLUSIVE); + + if (QUEUE_POS_IS_ZERO(queue_head)) + slotno = SimpleLruZeroPage(NotifyCtl, pageno); + else + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, + InvalidTransactionId); + + /* Mark the page dirty before writing */ + NotifyCtl->shared->page_dirty[slotno] = true; + + offset = QUEUE_POS_OFFSET(queue_head); + + /* Check if the compact entry fits on the current page */ + if (offset + sizeof(AsyncQueueEntry) <= QUEUE_PAGESIZE) + { + /* Copy the compact entry to the shared buffer */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &entry, + sizeof(AsyncQueueEntry)); + + /* Advance queue head by the size of our compact entry */ + if (asyncQueueAdvance(&queue_head, sizeof(AsyncQueueEntry))) + { + /* + * Page became full. Initialize the next page to ensure SLRU + * consistency (similar to what asyncQueueAddEntries does). + */ + LWLock *nextlock; + + pageno = QUEUE_POS_PAGE(queue_head); + nextlock = SimpleLruGetBankLock(NotifyCtl, pageno); + if (nextlock != banklock) + { + LWLockRelease(banklock); + LWLockAcquire(nextlock, LW_EXCLUSIVE); + } + SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); + if (nextlock != banklock) + { + LWLockRelease(nextlock); + LWLockAcquire(banklock, LW_EXCLUSIVE); + } + + /* Set cleanup flag if appropriate */ + if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) + tryAdvanceTail = true; + } + + /* Update the global queue head */ + QUEUE_HEAD = queue_head; + } + else + { + /* + * Entry doesn't fit on current page. This should be very rare with + * our small compact entries, but handle it by padding the page and + * writing to the next page. + */ + AsyncQueueEntry padding; + + memset(&padding, 0, sizeof(padding)); + padding.dbid = InvalidOid; /* Mark as padding */ + + /* Fill the rest of the page with padding */ + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, + &padding, + QUEUE_PAGESIZE - offset); + + /* Advance to next page */ + asyncQueueAdvance(&queue_head, QUEUE_PAGESIZE - offset); + + /* Recursively add the entry on the new page */ + QUEUE_HEAD = queue_head; + LWLockRelease(banklock); + asyncQueueAddCompactEntry(dbid, xid, notify_lsn); + return; + } + + LWLockRelease(banklock); +} + /* * Advance the shared queue tail variable to the minimum of all the * per-backend tail pointers. Truncate pg_notify space if possible. @@ -2395,3 +2330,62 @@ check_notify_buffers(int *newval, void **extra, GucSource source) { return check_slru_buffers("notify_buffers", newval); } + +/* + * Write a WAL record containing async notification data + * + * This logs notification data to WAL, allowing us to release locks earlier + * and maintain commit ordering through WAL's natural ordering guarantees. + */ +XLogRecPtr +LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data) +{ + xl_async_notify_data xlrec; + + + xlrec.dbid = dboid; + xlrec.xid = xid; + xlrec.srcPid = srcPid; + xlrec.nnotifications = nnotifications; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfAsyncNotifyData); + XLogRegisterData(data, data_len); + + (void) XLogInsert(RM_ASYNC_ID, XLOG_ASYNC_NOTIFY_DATA); + + /* Return the begin LSN of the record we just inserted. */ + return ProcLastRecPtr; +} + + + + +/* + * Redo function for async notification WAL records + * + * During recovery, we need to replay notification records. For now, + * we'll add them to the traditional notification queue. In a complete + * implementation, replaying backends would read directly from WAL. + */ +void +async_redo(XLogReaderState *record) +{ + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + + switch (info) + { + case XLOG_ASYNC_NOTIFY_DATA: + /* + * For notification data records, we don't need to do anything + * during recovery since listeners will read directly from WAL. + * The data is already durably stored in the WAL record itself. + */ + break; + + + default: + elog(PANIC, "async_redo: unknown op code %u", info); + } +} diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 8f4b282c6b1..b35b007e51c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -19,6 +19,7 @@ #include "access/xlogreader.h" #include "catalog/pg_control.h" #include "catalog/storage_xlog.h" +#include "commands/async.h" #include "commands/dbcommands_xlog.h" #include "fe_utils/archive.h" #include "filemap.h" diff --git a/src/bin/pg_waldump/rmgrdesc.c b/src/bin/pg_waldump/rmgrdesc.c index fac509ed134..03e73ae33c9 100644 --- a/src/bin/pg_waldump/rmgrdesc.c +++ b/src/bin/pg_waldump/rmgrdesc.c @@ -8,6 +8,7 @@ #define FRONTEND 1 #include "postgres.h" +#include "access/async_xlog.h" #include "access/brin_xlog.h" #include "access/clog.h" #include "access/commit_ts.h" @@ -23,6 +24,7 @@ #include "access/xact.h" #include "access/xlog_internal.h" #include "catalog/storage_xlog.h" +#include "commands/async.h" #include "commands/dbcommands_xlog.h" #include "commands/sequence.h" #include "commands/tablespace.h" diff --git a/src/include/access/async_xlog.h b/src/include/access/async_xlog.h new file mode 100644 index 00000000000..d4c0c828e84 --- /dev/null +++ b/src/include/access/async_xlog.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * async_xlog.h + * Async notification WAL definitions + * + * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/async_xlog.h + * + *------------------------------------------------------------------------- + */ +#ifndef ASYNC_XLOG_H +#define ASYNC_XLOG_H + +#include "access/xlogdefs.h" +#include "access/xlogreader.h" +#include "lib/stringinfo.h" + +/* + * WAL record types for async notifications + */ +#define XLOG_ASYNC_NOTIFY_DATA 0x00 /* notification data */ + +/* + * WAL record for notification data (written in PreCommit_Notify) + */ +typedef struct xl_async_notify_data +{ + Oid dbid; /* database ID */ + TransactionId xid; /* transaction ID */ + int32 srcPid; /* source backend PID */ + uint32 nnotifications; /* number of notifications */ + /* followed by serialized notification data */ +} xl_async_notify_data; + +#define SizeOfAsyncNotifyData (offsetof(xl_async_notify_data, nnotifications) + sizeof(uint32)) + +extern void async_redo(XLogReaderState *record); +extern void async_desc(StringInfo buf, XLogReaderState *record); +extern const char *async_identify(uint8 info); + +#endif /* ASYNC_XLOG_H */ \ No newline at end of file diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h index 8e7fc9db877..58293e05165 100644 --- a/src/include/access/rmgrlist.h +++ b/src/include/access/rmgrlist.h @@ -47,3 +47,4 @@ PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_i PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL, NULL, NULL) PG_RMGR(RM_GENERIC_ID, "Generic", generic_redo, generic_desc, generic_identify, NULL, NULL, generic_mask, NULL) PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL, NULL, logicalmsg_decode) +PG_RMGR(RM_ASYNC_ID, "Async", async_redo, async_desc, async_identify, NULL, NULL, NULL, NULL) diff --git a/src/include/access/xact.h b/src/include/access/xact.h index b2bc10ee041..aa1e2733976 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -194,6 +194,7 @@ typedef struct SavedTransactionCharacteristics #define XACT_XINFO_HAS_AE_LOCKS (1U << 6) #define XACT_XINFO_HAS_GID (1U << 7) #define XACT_XINFO_HAS_DROPPED_STATS (1U << 8) +#define XACT_XINFO_HAS_NOTIFY (1U << 9) /* * Also stored in xinfo, these indicating a variety of additional actions that @@ -317,6 +318,11 @@ typedef struct xl_xact_origin TimestampTz origin_timestamp; } xl_xact_origin; +typedef struct xl_xact_notify +{ + XLogRecPtr notify_lsn; /* LSN of notification data in WAL */ +} xl_xact_notify; + typedef struct xl_xact_commit { TimestampTz xact_time; /* time of commit */ @@ -330,6 +336,7 @@ typedef struct xl_xact_commit /* xl_xact_twophase follows if XINFO_HAS_TWOPHASE */ /* twophase_gid follows if XINFO_HAS_GID. As a null-terminated string. */ /* xl_xact_origin follows if XINFO_HAS_ORIGIN, stored unaligned! */ + /* xl_xact_notify follows if XINFO_HAS_NOTIFY, stored unaligned! */ } xl_xact_commit; #define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz)) @@ -403,6 +410,8 @@ typedef struct xl_xact_parsed_commit XLogRecPtr origin_lsn; TimestampTz origin_timestamp; + + XLogRecPtr notify_lsn; /* LSN of notification data */ } xl_xact_parsed_commit; typedef xl_xact_parsed_commit xl_xact_parsed_prepare; diff --git a/src/include/commands/async.h b/src/include/commands/async.h index f75c3df9556..7e9f10cb84b 100644 --- a/src/include/commands/async.h +++ b/src/include/commands/async.h @@ -14,11 +14,25 @@ #define ASYNC_H #include +#include "access/xlogreader.h" +#include "lib/stringinfo.h" extern PGDLLIMPORT bool Trace_notify; extern PGDLLIMPORT int max_notify_queue_pages; extern PGDLLIMPORT volatile sig_atomic_t notifyInterruptPending; +/* + * Compact SLRU queue entry - stores metadata pointing to WAL data + */ +typedef struct AsyncQueueEntry +{ + Oid dbid; /* database ID for quick filtering */ + TransactionId xid; /* transaction ID */ + XLogRecPtr notify_lsn; /* LSN of notification data in WAL */ +} AsyncQueueEntry; + +#define ASYNC_QUEUE_ENTRY_SIZE sizeof(AsyncQueueEntry) + extern Size AsyncShmemSize(void); extern void AsyncShmemInit(void); @@ -46,4 +60,15 @@ extern void HandleNotifyInterrupt(void); /* process interrupts */ extern void ProcessNotifyInterrupt(bool flush); +/* WAL-based notification functions */ +extern XLogRecPtr LogAsyncNotifyData(Oid dboid, TransactionId xid, int32 srcPid, + uint32 nnotifications, Size data_len, char *data); +extern void async_redo(XLogReaderState *record); +extern void async_desc(StringInfo buf, XLogReaderState *record); +extern const char *async_identify(uint8 info); + +/* notification queue functions */ +extern void asyncQueueAddCompactEntry(Oid dbid, TransactionId xid, XLogRecPtr notify_lsn); +extern void SignalBackends(void); + #endif /* ASYNC_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c6f5ebceefd..71459fe5529 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -301,6 +301,9 @@ struct PGPROC uint32 wait_event_info; /* proc's wait information */ + /* Support for async notifications */ + XLogRecPtr notifyCommitLsn; /* LSN of notification data for current xact */ + /* Support for group transaction status update. */ bool clogGroupMember; /* true, if member of clog group */ pg_atomic_uint32 clogGroupNext; /* next clog group member */ -- 2.39.3 (Apple Git-145)