From 68382d9b7c2734f42679d069f892d2e055265ee6 Mon Sep 17 00:00:00 2001 From: Takashi Menjo Date: Tue, 23 Mar 2021 11:45:44 +0900 Subject: [PATCH v7 07/15] Map WAL segment files on PMEM as WAL buffers Fixes introduced in patchset v2: - Keep openLogSegNo even if wal_pmem_map=true - Fix sync issue of PmemXLogCreate - Fix unmapping issue of PmemXLogUnmap - Remove unused XLogPageOffset --- src/backend/access/transam/Makefile | 1 + src/backend/access/transam/xlog.c | 153 +++++++++---- src/backend/access/transam/xlogpmem.c | 297 ++++++++++++++++++++++++++ src/include/access/xlogpmem.h | 59 +++++ 4 files changed, 474 insertions(+), 36 deletions(-) create mode 100644 src/backend/access/transam/xlogpmem.c create mode 100644 src/include/access/xlogpmem.h diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile index 595e02de72..3a29583bc0 100644 --- a/src/backend/access/transam/Makefile +++ b/src/backend/access/transam/Makefile @@ -31,6 +31,7 @@ OBJS = \ xlogarchive.o \ xlogfuncs.o \ xloginsert.o \ + xlogpmem.o \ xlogreader.o \ xlogutils.o diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index ee000bf181..62f08cb50b 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -35,6 +35,7 @@ #include "access/xlog_internal.h" #include "access/xlogarchive.h" #include "access/xloginsert.h" +#include "access/xlogpmem.h" #include "access/xlogreader.h" #include "access/xlogutils.h" #include "catalog/catversion.h" @@ -2027,7 +2028,14 @@ GetXLogBuffer(XLogRecPtr ptr, TimeLineID tli) * offset within the page. */ cachedPage = ptr / XLOG_BLCKSZ; - cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; + if (wal_pmem_map) + { + openLogTLI = tli; + openLogSegNo = PmemXLogEnsurePrevMapped(endptr, tli); + cachedPos = PmemXLogGetBufferPages() + idx * (Size) XLOG_BLCKSZ; + } + else + cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); @@ -2261,7 +2269,14 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); - NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); + if (wal_pmem_map) + { + openLogTLI = tli; + openLogSegNo = PmemXLogEnsurePrevMapped(NewPageEndPtr, tli); + NewPage = (XLogPageHeader) (PmemXLogGetBufferPages() + nextidx * (Size) XLOG_BLCKSZ); + } + else + NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); /* * Be sure to re-zero the buffer so that bytes beyond what we've @@ -2480,6 +2495,8 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) int npages; int startidx; uint32 startoffset; + bool isfirstpage; + XLogRecPtr startpageptr; /* We should always be inside a critical section here */ Assert(CritSectionCount > 0); @@ -2502,6 +2519,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) startidx = 0; startoffset = 0; + /* Those are used actually only if wal_pmem_map=true */ + isfirstpage = true; + startpageptr = 0; + /* * Within the loop, curridx is the cache block index of the page to * consider writing. Begin at the buffer containing the next unwritten @@ -2527,33 +2548,36 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) LogwrtResult.Write = EndPtr; ispartialpage = WriteRqst.Write < LogwrtResult.Write; - if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size)) + if (!wal_pmem_map) { - /* - * Switch to new logfile segment. We cannot have any pending - * pages here (since we dump what we have at segment end). - */ - Assert(npages == 0); - if (openLogFile >= 0) - XLogFileClose(); - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size); - openLogTLI = tli; + if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo, + wal_segment_size)) + { + /* + * Switch to new logfile segment. We cannot have any pending + * pages here (since we dump what we have at segment end). + */ + Assert(npages == 0); + if (openLogFile >= 0) + XLogFileClose(); + XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + wal_segment_size); + openLogTLI = tli; - /* create/use new log file */ - openLogFile = XLogFileInit(openLogSegNo, tli); - ReserveExternalFD(); - } + /* create/use new log file */ + openLogFile = XLogFileInit(openLogSegNo, tli); + ReserveExternalFD(); + } - /* Make sure we have the current logfile open */ - if (openLogFile < 0) - { - XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, - wal_segment_size); - openLogTLI = tli; - openLogFile = XLogFileOpen(openLogSegNo, tli); - ReserveExternalFD(); + /* Make sure we have the current logfile open */ + if (openLogFile < 0) + { + XLByteToPrevSeg(LogwrtResult.Write, openLogSegNo, + wal_segment_size); + openLogTLI = tli; + openLogFile = XLogFileOpen(openLogSegNo, tli); + ReserveExternalFD(); + } } /* Add current page to the set of pending pages-to-dump */ @@ -2561,8 +2585,8 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) { /* first of group */ startidx = curridx; - startoffset = XLogSegmentOffset(LogwrtResult.Write - XLOG_BLCKSZ, - wal_segment_size); + startpageptr = LogwrtResult.Write - XLOG_BLCKSZ; + startoffset = XLogSegmentOffset(startpageptr, wal_segment_size); } npages++; @@ -2600,7 +2624,38 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) INSTR_TIME_SET_CURRENT(start); pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE); - written = pg_pwrite(openLogFile, from, nleft, startoffset); + + /* + * If we use a WAL segment file as WAL buffers, we cache-flush + * records on the buffers byte by byte, not page by page. To do + * so, here we fix the range being cache-flushed. + */ + if (wal_pmem_map) + { + XLogRecPtr startbyteptr; + XLogRecPtr endbyteptr; + + startbyteptr = (isfirstpage) + ? XLogCtl->LogwrtResult.Write + : startpageptr; + + endbyteptr = (ispartialpage) + ? WriteRqst.Write + : LogwrtResult.Write; + + /* Now we cache-flush records */ + openLogTLI = tli; + openLogSegNo = PmemXLogEnsurePrevMapped(endbyteptr, tli); + PmemXLogFlush(startbyteptr, endbyteptr); + + /* Mark the first page is consumed */ + isfirstpage = false; + + /* Tell all the "pages" have been written successfully */ + written = nleft; + } + else + written = pg_pwrite(openLogFile, from, nleft, startoffset); pgstat_report_wait_end(); /* @@ -2658,7 +2713,10 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) */ if (finishing_seg) { - issue_xlog_fsync(openLogFile, openLogSegNo, tli); + if (wal_pmem_map) + PmemXLogSync(); + else + issue_xlog_fsync(openLogFile, openLogSegNo, tli); /* signal that we need to wakeup walsenders later */ WalSndWakeupRequest(); @@ -2709,12 +2767,14 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) LogwrtResult.Flush < LogwrtResult.Write) { + if (wal_pmem_map) + PmemXLogSync(); /* * Could get here without iterating above loop, in which case we might * have no open file or the wrong one. However, we do not need to * fsync more than one file. */ - if (sync_method != SYNC_METHOD_OPEN && + else if (sync_method != SYNC_METHOD_OPEN && sync_method != SYNC_METHOD_OPEN_DSYNC) { if (openLogFile >= 0 && @@ -8102,11 +8162,32 @@ StartupXLOG(void) firstIdx = XLogRecPtrToBufIdx(EndOfLog); - /* Copy the valid part of the last block, and zero the rest */ - page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ]; - len = EndOfLog % XLOG_BLCKSZ; - memcpy(page, xlogreader->readBuf, len); - memset(page + len, 0, XLOG_BLCKSZ - len); + if (wal_pmem_map) + { + /* + * Keep the valid part of the last block, and zero the rest. + * Note that "len" indicates the size of the valid part. + * + * TODO how about if (newTLI != replayTLI) ? + */ + openLogTLI = newTLI; + openLogSegNo = PmemXLogEnsurePrevMapped(EndOfLog, newTLI); + page = PmemXLogGetBufferPages() + firstIdx * (Size) XLOG_BLCKSZ; + len = EndOfLog % XLOG_BLCKSZ; + memset(page + len, 0, XLOG_BLCKSZ - len); + + /* Cache-flush and sync now */ + PmemXLogFlush(EndOfLog, pageBeginPtr + XLOG_BLCKSZ); + PmemXLogSync(); + } + else + { + /* Copy the valid part of the last block, and zero the rest */ + page = &XLogCtl->pages[firstIdx * XLOG_BLCKSZ]; + len = EndOfLog % XLOG_BLCKSZ; + memcpy(page, xlogreader->readBuf, len); + memset(page + len, 0, XLOG_BLCKSZ - len); + } XLogCtl->xlblocks[firstIdx] = pageBeginPtr + XLOG_BLCKSZ; XLogCtl->InitializedUpTo = pageBeginPtr + XLOG_BLCKSZ; diff --git a/src/backend/access/transam/xlogpmem.c b/src/backend/access/transam/xlogpmem.c new file mode 100644 index 0000000000..5b50ba80a7 --- /dev/null +++ b/src/backend/access/transam/xlogpmem.c @@ -0,0 +1,297 @@ +#include "postgres.h" + +#ifdef USE_LIBPMEM + +#include +#include /* INT_MAX */ +#include /* size_t */ +#include /* uintptr_t */ +#include /* getpid, unlink */ + +#include + +#include "c.h" /* bool, Size */ +#include "access/xlog.h" +#include "access/xlog_internal.h" /* XLogFilePath, XLByteToSeg */ +#include "access/xlogpmem.h" +#include "common/file_perm.h" /* pg_file_create_mode */ +#include "miscadmin.h" /* enableFsync */ +#include "pgstat.h" + +static char *mappedPages = NULL; +static XLogSegNo mappedSegNo = 0; + +#define PG_DAX_HUGEPAGE_SIZE (((uintptr_t) 1) << 21) +#define PG_DAX_HUGEPAGE_MASK (~(PG_DAX_HUGEPAGE_SIZE - 1)) + +static XLogSegNo PmemXLogMap(XLogSegNo segno, TimeLineID tli); +static void PmemXLogCreate(XLogSegNo segno, TimeLineID tli); +static void PmemXLogUnmap(void); + +static void *PmemCreateMapFile(const char *path, size_t len); +static void *PmemOpenMapFile(const char *path, size_t expected_len); +static void *PmemTryOpenMapFile(const char *path, size_t expected_len); +static void *PmemMapFile(const char *path, size_t expected_len, int flags, + bool try_open); +static void PmemUnmapForError(void *addr, size_t len); + +/* + * Ensures the WAL segment containg {ptr-1} to be mapped. + * + * Returns mapped XLogSegNo. + */ +XLogSegNo +PmemXLogEnsurePrevMapped(XLogRecPtr ptr, TimeLineID tli) +{ + XLogSegNo segno; + + Assert(wal_pmem_map); + + XLByteToPrevSeg(ptr, segno, wal_segment_size); + + if (mappedPages != NULL) + { + /* Fast return: The segment we need is already mapped */ + if (mappedSegNo == segno) + return mappedSegNo; + + /* Unmap the current segment we don't need */ + PmemXLogUnmap(); + } + + return PmemXLogMap(segno, tli); +} + +/* + * Creates a new XLOG file segment, or open a pre-existing one, for WAL buffers. + * + * Returns mapped XLogSegNo. + * + * See also XLogFileInit in xlog.c. + */ +static XLogSegNo +PmemXLogMap(XLogSegNo segno, TimeLineID tli) +{ + char path[MAXPGPATH]; + + Assert(mappedPages == NULL); + + XLogFilePath(path, tli, segno, wal_segment_size); + + /* PmemTryOpenMapFile will handle error except ENOENT */ + mappedPages = PmemTryOpenMapFile(path, wal_segment_size); + + /* Fast return if already exists */ + if (mappedPages != NULL) + { + mappedSegNo = segno; + return mappedSegNo; + } + + elog(DEBUG2, "creating and filling new WAL file"); + PmemXLogCreate(segno, tli); + + /* PmemCreateMapFile will handle error */ + mappedPages = PmemOpenMapFile(path, wal_segment_size); + mappedSegNo = segno; + + elog(DEBUG2, "done creating and filling new WAL file"); + return mappedSegNo; +} + +/* + * Creates a new XLOG file segment. + * + * See also XLogFileInit in xlog.c. + */ +static void +PmemXLogCreate(XLogSegNo segno, TimeLineID tli) +{ + char *addr; + char tmppath[MAXPGPATH]; + XLogSegNo inst_segno; + XLogSegNo max_segno; + + snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid()); + unlink(tmppath); + + /* PmemCreateMapFile will handle error */ + addr = PmemCreateMapFile(tmppath, wal_segment_size); + + /* + * Initialize whole the buffers. + * + * Note that we don't put any single byte if not wal_init_zero. It's okay + * because we already have a new segment file truncated to the proper size. + */ + pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_WRITE); + if (wal_init_zero) + pmem_memset_nodrain(addr, 0, wal_segment_size); + pgstat_report_wait_end(); + + pgstat_report_wait_start(WAIT_EVENT_WAL_INIT_SYNC); + if (enableFsync) + pmem_drain(); + pgstat_report_wait_end(); + + if (pmem_unmap(addr, wal_segment_size) < 0) + elog(ERROR, "could not pmem_unmap temporal WAL buffers: %m"); + + inst_segno = segno; + max_segno = segno + CheckPointSegments; + if (!InstallXLogFileSegment(&inst_segno, tmppath, true, max_segno, tli)) + unlink(tmppath); +} + +/* + * Unmaps the current WAL segment file if mapped. + */ +static void +PmemXLogUnmap(void) +{ + /* Fast return if not mapped */ + if (mappedPages == NULL) + return; + + if (pmem_unmap(mappedPages, wal_segment_size) < 0) + elog(ERROR, "could not pmem_unmap WAL buffers: %m"); + + mappedPages = NULL; +} + +/* + * Gets the head address of the WAL buffers. + */ +char * +PmemXLogGetBufferPages(void) +{ + Assert(wal_pmem_map); + Assert(mappedPages != NULL); + + return mappedPages; +} + +/* + * Flushes records in the given range [start, end) within a single segment. + */ +void +PmemXLogFlush(XLogRecPtr start, XLogRecPtr end) +{ + Size off; + + Assert(wal_pmem_map); + Assert(start < end); + Assert(mappedPages != NULL); + Assert(XLByteInSeg(start, mappedSegNo, wal_segment_size)); + Assert(XLByteInPrevSeg(end, mappedSegNo, wal_segment_size)); + + off = XLogSegmentOffset(start, wal_segment_size); + pmem_flush(mappedPages + off, end - start); +} + +/* + * Wait for cache-flush to finish. + */ +void +PmemXLogSync(void) +{ + Assert(wal_pmem_map); + + /* Fast return */ + if (!enableFsync) + return; + + pmem_drain(); +} + +/* + * Wrappers for pmem_map_file. + */ +static void * +PmemCreateMapFile(const char *path, size_t len) +{ + return PmemMapFile(path, len, PMEM_FILE_CREATE | PMEM_FILE_EXCL, false); +} + +static void * +PmemOpenMapFile(const char *path, size_t expected_len) +{ + return PmemMapFile(path, expected_len, 0, false); +} + +static void * +PmemTryOpenMapFile(const char *path, size_t expected_len) +{ + return PmemMapFile(path, expected_len, 0, true); +} + +static void * +PmemMapFile(const char *path, size_t expected_len, int flags, bool try_open) +{ + size_t param_len; + int mode; + size_t mapped_len; + int is_pmem; + void *addr; + + Assert(expected_len > 0); + Assert(expected_len <= INT_MAX); + + param_len = (flags & PMEM_FILE_CREATE) ? expected_len : 0; + mode = (flags & PMEM_FILE_CREATE) ? pg_file_create_mode : 0; + + mapped_len = 0; + is_pmem = 0; + addr = pmem_map_file(path, param_len, flags, mode, &mapped_len, &is_pmem); + + if (addr == NULL) + { + if (try_open && errno == ENOENT) + return NULL; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not pmem_map_file \"%s\": %m", path))); + } + + if (mapped_len > INT_MAX) + { + PmemUnmapForError(addr, mapped_len); + elog(ERROR, + "unexpected file size: path \"%s\" actual (greater than %d) expected %d", + path, INT_MAX, (int) expected_len); + } + + if (mapped_len != expected_len) + { + PmemUnmapForError(addr, mapped_len); + elog(ERROR, + "unexpected file size: path \"%s\" actual %d expected %d", + path, (int) mapped_len, (int) expected_len); + } + + if (!is_pmem) + { + PmemUnmapForError(addr, mapped_len); + elog(ERROR, "file not on PMEM: path \"%s\"", path); + } + + if ((uintptr_t) addr & ~PG_DAX_HUGEPAGE_MASK) + elog(WARNING, + "file not mapped on DAX hugepage boundary: path \"%s\" addr %p", + path, addr); + + return addr; +} + +static void +PmemUnmapForError(void *addr, size_t len) +{ + int saved_errno; + + saved_errno = errno; + (void) pmem_unmap(addr, len); + errno = saved_errno; +} + +#endif /* USE_LIBPMEM */ diff --git a/src/include/access/xlogpmem.h b/src/include/access/xlogpmem.h new file mode 100644 index 0000000000..3978640b82 --- /dev/null +++ b/src/include/access/xlogpmem.h @@ -0,0 +1,59 @@ +/* + * xlogpmem.h + * + * Definitions for PMEM-mapped WAL buffers. + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/access/xlogpmem.h + */ +#ifndef XLOGPMEM_H +#define XLOGPMEM_H + +#include "postgres.h" + +#include "c.h" /* Size */ +#include "access/xlogdefs.h" /* XLogRecPtr, XLogSegNo */ + +#ifdef USE_LIBPMEM + +/* Prototypes */ +extern XLogSegNo PmemXLogEnsurePrevMapped(XLogRecPtr ptr, TimeLineID tli); +extern char *PmemXLogGetBufferPages(void); +extern void PmemXLogFlush(XLogRecPtr start, XLogRecPtr end); +extern void PmemXLogSync(void); + +#else /* USE_LIBPMEM */ + +#include /* abort */ + +static inline XLogSegNo +PmemXLogEnsurePrevMapped(XLogRecPtr ptr, tli) +{ + abort(); + return 0; +} + +static inline char * +PmemXLogGetBufferPages(void) +{ + abort(); + return NULL; +} + +static inline void +PmemXLogFlush(XLogRecPtr start, XLogRecPtr end) +{ + abort(); +} + +static inline void +PmemXLogSync(void) +{ + abort(); +} + +#endif /* USE_LIBPMEM */ + +#endif /* XLOGPMEM_H */ -- 2.25.1