From 2c46ebcb95954580da3ece4bd8ce5d5b1d824694 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 3 Mar 2023 10:33:06 +0000 Subject: [PATCH v7] Improve WALRead() to suck data directly from WAL buffers --- src/backend/access/transam/xlog.c | 140 ++++++++++++++++++++++++ src/backend/access/transam/xlogreader.c | 45 +++++++- src/include/access/xlog.h | 6 + 3 files changed, 189 insertions(+), 2 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 87af608d15..51dd101d12 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1639,6 +1639,146 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) return cachedPos + ptr % XLOG_BLCKSZ; } +/* + * Read WAL from WAL buffers. + * + * Read 'count' bytes of WAL from WAL buffers into 'buf', starting at location + * 'startptr', on timeline 'tli' and set the read bytes to 'read_bytes'. + * + * Note that this function reads as much as it can from WAL buffers, meaning, + * it may not read all the requested 'count' bytes. The caller must be aware of + * this and deal with it. + */ +void +XLogReadFromBuffers(XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf, + Size *read_bytes) +{ + XLogRecPtr ptr; + char *dst; + Size nbytes; + + Assert(!XLogRecPtrIsInvalid(startptr)); + Assert(count > 0); + Assert(startptr <= GetFlushRecPtr(NULL)); + Assert(!RecoveryInProgress()); + Assert(tli == GetWALInsertionTimeLine()); + + ptr = startptr; + nbytes = count; + dst = buf; + *read_bytes = 0; + + /* + * Holding WALBufMappingLock ensures inserters don't overwrite this value + * while we are reading it. We try to acquire it in shared mode so that the + * concurrent WAL readers are also allowed. We try to do as less work as + * possible while holding the lock to not impact concurrent WAL writers + * much. We quickly exit to not cause any contention, if the lock isn't + * immediately available. + */ + if (!LWLockConditionalAcquire(WALBufMappingLock, LW_SHARED)) + return; + + while (nbytes > 0) + { + XLogRecPtr origptr; + XLogRecPtr expectedEndPtr; + XLogRecPtr endptr; + int idx; + + origptr = ptr; + idx = XLogRecPtrToBufIdx(ptr); + expectedEndPtr = ptr; + expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + + endptr = XLogCtl->xlblocks[idx]; + + if (expectedEndPtr == endptr) + { + char *page; + char *data; + XLogPageHeader phdr; + + /* + * We found WAL buffer page containing given XLogRecPtr. Get + * starting address of the page and a pointer to the right location + * of given XLogRecPtr in that page. + */ + page = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; + data = page + ptr % XLOG_BLCKSZ; + + /* Read what is wanted, not the whole page. */ + if ((data + nbytes) <= (page + XLOG_BLCKSZ)) + { + /* All the bytes are in one page. */ + memcpy(dst, data, nbytes); + *read_bytes += nbytes; + nbytes = 0; + } + else + { + Size nread; + + /* + * All the bytes are not in one page. Read available bytes on + * the current page, copy them over to output buffer and + * continue to read remaining bytes. + */ + nread = XLOG_BLCKSZ - (data - page); + Assert(nread > 0 && nread <= nbytes); + memcpy(dst, data, nread); + ptr += nread; + nbytes -= nread; + dst += nread; + *read_bytes += nread; + } + + + /* + * The fact that we acquire WALBufMappingLock while reading the WAL + * buffer page itself guarantees that no one else initializes it or + * makes it ready for next use in AdvanceXLInsertBuffer(). + * + * However, we perform basic page header checks for ensuring that + * we are not reading a page that just got initialized. Callers + * will anyway perform extensive page-level and record-level + * checks. + */ + phdr = (XLogPageHeader) page; + + if (!(phdr->xlp_magic == XLOG_PAGE_MAGIC && + phdr->xlp_pageaddr == (origptr - (origptr % XLOG_BLCKSZ)) && + phdr->xlp_tli == tli)) + { + /* + * WAL buffer page doesn't look valid, so return with what we + * have read so far. + */ + break; + } + } + else + { + /* + * Requested WAL isn't available in WAL buffers, so return with + * what we have read so far. + */ + break; + } + } + + LWLockRelease(WALBufMappingLock); + + /* We never read more than what the caller has asked for. */ + Assert(*read_bytes <= count); + + elog(DEBUG1, "read %zu bytes out of %zu bytes from WAL buffers for given LSN %X/%X, Timeline ID %u", + *read_bytes, count, LSN_FORMAT_ARGS(startptr), tli); +} + /* * Converts a "usable byte position" to XLogRecPtr. A usable byte position * is the position starting from the beginning of WAL, excluding all WAL diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index cadea21b37..bd11df448a 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1486,8 +1486,7 @@ err: * Returns true if succeeded, false if an error occurs, in which case * 'errinfo' receives error details. * - * XXX probably this should be improved to suck data directly from the - * WAL buffers when possible. + * When possible, this function reads data directly from WAL buffers. */ bool WALRead(XLogReaderState *state, @@ -1498,6 +1497,48 @@ WALRead(XLogReaderState *state, XLogRecPtr recptr; Size nbytes; +#ifndef FRONTEND + /* Frontend tools have no idea of WAL buffers. */ + Size read_bytes; + + /* + * When possible, read WAL from WAL buffers. We skip this step and continue + * the usual way, that is to read from WAL file, either when server is in + * recovery (standby mode, archive or crash recovery), in which case the + * WAL buffers are not used or when the server is inserting in a different + * timeline from that of the timeline that we're trying to read WAL from. + */ + if (!RecoveryInProgress() && + tli == GetWALInsertionTimeLine()) + { + XLogReadFromBuffers(startptr, tli, count, buf, &read_bytes); + + /* + * Check if we have read fully (hit), partially (partial hit) or + * nothing (miss) from WAL buffers. If we have read either partially or + * nothing, then continue to read the remaining bytes the usual way, + * that is, read from WAL file. + */ + if (count == read_bytes) + { + /* Buffer hit, so return. */ + return true; + } + else if (read_bytes > 0 && count > read_bytes) + { + /* + * Buffer partial hit, so reset the state to count the read bytes + * and continue. + */ + buf += read_bytes; + startptr += read_bytes; + count -= read_bytes; + } + + /* Buffer miss i.e., read_bytes = 0, so continue */ + } +#endif /* FRONTEND */ + p = buf; recptr = startptr; nbytes = count; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index cfe5409738..c9941aa001 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -247,6 +247,12 @@ extern XLogRecPtr GetLastImportantRecPtr(void); extern void SetWalWriterSleeping(bool sleeping); +extern void XLogReadFromBuffers(XLogRecPtr startptr, + TimeLineID tli, + Size count, + char *buf, + Size *read_bytes); + /* * Routines used by xlogrecovery.c to call back into xlog.c during recovery. */ -- 2.34.1