From 560cdfa444a3b05a0e6b8054f3cfeadf56e059fc Mon Sep 17 00:00:00 2001 From: Kyotaro Horiguchi Date: Thu, 5 Sep 2019 20:21:55 +0900 Subject: [PATCH v18 1/3] Move callback-call from ReadPageInternal to XLogReadRecord. The current WAL record reader reads page data using a call back function. Redesign the interface so that it asks the caller for more data when required. This model works better for proposed projects that encryption, prefetching and other new features that would require extending the callback interface for each case. As the first step of that change, this patch moves the page reader function out of ReadPageInternal(), then the remaining tasks of the function are taken over by the new function XLogNeedData(). Author: Kyotaro HORIGUCHI Author: Heikki Linnakangas Reviewed-by: Antonin Houska Reviewed-by: Alvaro Herrera Reviewed-by: Takashi Menjo Reviewed-by: Thomas Munro Discussion: https://postgr.es/m/20190418.210257.43726183.horiguchi.kyotaro%40lab.ntt.co.jp --- src/backend/access/transam/xlog.c | 16 +- src/backend/access/transam/xlogreader.c | 277 ++++++++++++++---------- src/backend/access/transam/xlogutils.c | 12 +- src/backend/replication/walsender.c | 10 +- src/bin/pg_rewind/parsexlog.c | 21 +- src/bin/pg_waldump/pg_waldump.c | 8 +- src/include/access/xlogreader.h | 33 +-- src/include/access/xlogutils.h | 2 +- 8 files changed, 224 insertions(+), 155 deletions(-) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 6f8810e149..449e17d2fa 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -920,7 +920,7 @@ static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source); -static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, bool fetching_ckpt, XLogRecPtr tliRecPtr); @@ -4375,7 +4375,6 @@ ReadRecord(XLogReaderState *xlogreader, int emode, XLogRecord *record; XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; - /* Pass through parameters to XLogPageRead */ private->fetching_ckpt = fetching_ckpt; private->emode = emode; private->randAccess = (xlogreader->ReadRecPtr == InvalidXLogRecPtr); @@ -12111,7 +12110,7 @@ CancelBackup(void) * XLogPageRead() to try fetching the record from another source, or to * sleep and retry. */ -static int +static bool XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -12170,7 +12169,8 @@ retry: readLen = 0; readSource = XLOG_FROM_ANY; - return -1; + xlogreader->readLen = -1; + return false; } } @@ -12265,7 +12265,8 @@ retry: goto next_record_is_invalid; } - return readLen; + xlogreader->readLen = readLen; + return true; next_record_is_invalid: lastSourceFailed = true; @@ -12279,8 +12280,9 @@ next_record_is_invalid: /* In standby-mode, keep trying */ if (StandbyMode) goto retry; - else - return -1; + + xlogreader->readLen = -1; + return false; } /* diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 42738eb940..6ac8fc4b80 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -36,8 +36,8 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) pg_attribute_printf(2, 3); static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); -static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, - int reqLen); +static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, + int reqLen, bool header_inclusive); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); @@ -276,7 +276,6 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) uint32 targetRecOff; uint32 pageHeaderSize; bool gotheader; - int readOff; /* * randAccess indicates whether to verify the previous-record pointer of @@ -326,14 +325,20 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) * byte to cover the whole record header, or at least the part of it that * fits on the same page. */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ), + targetRecOff != 0)) + { + if (!state->routine.page_read(state, state->readPagePtr, state->readLen, + RecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. + * We have at least the page header, so we can examine it now. */ pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); if (targetRecOff == 0) @@ -359,8 +364,8 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) goto err; } - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + /* XLogNeedData has verified the page header */ + Assert(pageHeaderSize <= state->readLen); /* * Read the record length. @@ -432,18 +437,27 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) do { + int rest_len = total_len - gotlen; + /* Calculate pointer to beginning of next page */ targetPagePtr += XLOG_BLCKSZ; /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + while (XLogNeedData(state, targetPagePtr, + Min(rest_len, XLOG_BLCKSZ), + false)) + { + if (!state->routine.page_read(state, state->readPagePtr, + state->readLen, + state->ReadRecPtr, + state->readBuf)) + break; + } - if (readOff < 0) + if (!state->page_verified) goto err; - Assert(SizeOfXLogShortPHD <= readOff); + Assert(SizeOfXLogShortPHD <= state->readLen); /* Check that the continuation on next page looks valid */ pageHeader = (XLogPageHeader) state->readBuf; @@ -473,21 +487,14 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) /* Append the continuation from this page to the buffer */ pageHeaderSize = XLogPageHeaderSize(pageHeader); - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); - - Assert(pageHeaderSize <= readOff); + Assert(pageHeaderSize <= state->readLen); contdata = (char *) state->readBuf + pageHeaderSize; len = XLOG_BLCKSZ - pageHeaderSize; if (pageHeader->xlp_rem_len < len) len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); - + Assert(pageHeaderSize + len <= state->readLen); memcpy(buffer, (char *) contdata, len); buffer += len; gotlen += len; @@ -517,9 +524,16 @@ XLogReadRecord(XLogReaderState *state, char **errormsg) else { /* Wait for the record data to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + total_len, XLOG_BLCKSZ)); - if (readOff < 0) + while (XLogNeedData(state, targetPagePtr, + Min(targetRecOff + total_len, XLOG_BLCKSZ), true)) + { + if (!state->routine.page_read(state, state->readPagePtr, + state->readLen, + state->ReadRecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; /* Record does not cross a page boundary */ @@ -562,109 +576,138 @@ err: } /* - * Read a single xlog page including at least [pageptr, reqLen] of valid data - * via the page_read() callback. + * Checks that an xlog page loaded in state->readBuf is including at least + * [pageptr, reqLen] and the page is valid. header_inclusive indicates that + * reqLen is calculated including page header length. * - * Returns -1 if the required page cannot be read for some reason; errormsg_buf - * is set in that case (unless the error occurs in the page_read callback). + * Returns false if the buffer already contains the requested data, or found + * error. state->page_verified is set to true for the former and false for the + * latter. * - * We fetch the page from a reader-local cache if we know we have the required - * data and if there hasn't been any error since caching the data. + * Otherwise returns true and requests data loaded onto state->readBuf by + * state->readPagePtr and state->readLen. The caller shall call this function + * again after filling the buffer at least with that portion of data and set + * state->readLen to the length of actually loaded data. + * + * If header_inclusive is false, corrects reqLen internally by adding the + * actual page header length and may request caller for new data. */ -static int -ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) +static bool +XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, + bool header_inclusive) { - int readLen; uint32 targetPageOff; XLogSegNo targetSegNo; - XLogPageHeader hdr; + uint32 addLen = 0; - Assert((pageptr % XLOG_BLCKSZ) == 0); + /* Some data is loaded, but page header is not verified yet. */ + if (!state->page_verified && + !XLogRecPtrIsInvalid(state->readPagePtr) && state->readLen >= 0) + { + uint32 pageHeaderSize; - XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); - targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + /* just loaded new data so needs to verify page header */ - /* check whether we have all the requested data already */ - if (targetSegNo == state->seg.ws_segno && - targetPageOff == state->segoff && reqLen <= state->readLen) - return state->readLen; + /* The caller must have loaded at least page header */ + Assert(state->readLen >= SizeOfXLogShortPHD); - /* - * Data is not in our buffer. - * - * Every time we actually read the segment, even if we looked at parts of - * it before, we need to do verification as the page_read callback might - * now be rereading data from a different source. - * - * Whenever switching to a new WAL segment, we read the first page of the - * file and validate its header, even if that's not where the target - * record is. This is so that we can check the additional identification - * info that is present in the first page's "long" header. - */ - if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) - { - XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; + /* + * We have enough data to check the header length. Recheck the loaded + * length against the actual header length. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* Request more data if we don't have the full header. */ + if (state->readLen < pageHeaderSize) + { + state->readLen = pageHeaderSize; + return true; + } - /* we can be sure to have enough WAL available, we scrolled back */ - Assert(readLen == XLOG_BLCKSZ); + /* Now that we know we have the full header, validate it. */ + if (!XLogReaderValidatePageHeader(state, state->readPagePtr, + (char *) state->readBuf)) + { + /* That's bad. Force reading the page again. */ + XLogReaderInvalReadState(state); - if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, - state->readBuf)) - goto err; + return false; + } + + state->page_verified = true; + + XLByteToSeg(state->readPagePtr, state->seg.ws_segno, + state->segcxt.ws_segsize); } /* - * First, read the requested data length, but at least a short page header - * so that we can validate it. + * The loaded page may not be the one caller is supposing to read when we + * are verifying the first page of new segment. In that case, skip further + * verification and immediately load the target page. */ - readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + if (state->page_verified && pageptr == state->readPagePtr) + { + /* + * calculate additional length for page header keeping the total + * length within the block size. + */ + if (!header_inclusive) + { + uint32 pageHeaderSize = + XLogPageHeaderSize((XLogPageHeader) state->readBuf); - Assert(readLen <= XLOG_BLCKSZ); + addLen = pageHeaderSize; + if (reqLen + pageHeaderSize <= XLOG_BLCKSZ) + addLen = pageHeaderSize; + else + addLen = XLOG_BLCKSZ - reqLen; - /* Do we have enough data to check the header length? */ - if (readLen <= SizeOfXLogShortPHD) - goto err; + Assert(addLen >= 0); + } - Assert(readLen >= reqLen); + /* Return if we already have it. */ + if (reqLen + addLen <= state->readLen) + return false; + } + + /* Data is not in our buffer, request the caller for it. */ + XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); + targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); + Assert((pageptr % XLOG_BLCKSZ) == 0); - hdr = (XLogPageHeader) state->readBuf; + /* + * Every time we request to load new data of a page to the caller, even if + * we looked at a part of it before, we need to do verification on the + * next invocation as the caller might now be rereading data from a + * different source. + */ + state->page_verified = false; - /* still not enough */ - if (readLen < XLogPageHeaderSize(hdr)) + /* + * Whenever switching to a new WAL segment, we read the first page of the + * file and validate its header, even if that's not where the target + * record is. This is so that we can check the additional identification + * info that is present in the first page's "long" header. Don't do this + * if the caller requested the first page in the segment. + */ + if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) { - readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), - state->currRecPtr, - state->readBuf); - if (readLen < 0) - goto err; + /* + * Then we'll see that the targetSegNo now matches the ws_segno, and + * will not come back here, but will request the actual target page. + */ + state->readPagePtr = pageptr - targetPageOff; + state->readLen = XLOG_BLCKSZ; + return true; } /* - * Now that we know we have the full header, validate it. + * Request the caller to load the page. We need at least a short page + * header so that we can validate it. */ - if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) - goto err; - - /* update read state information */ - state->seg.ws_segno = targetSegNo; - state->segoff = targetPageOff; - state->readLen = readLen; - - return readLen; - -err: - XLogReaderInvalReadState(state); - return -1; + state->readPagePtr = pageptr; + state->readLen = Max(reqLen + addLen, SizeOfXLogShortPHD); + return true; } /* @@ -673,9 +716,7 @@ err: static void XLogReaderInvalReadState(XLogReaderState *state) { - state->seg.ws_segno = 0; - state->segoff = 0; - state->readLen = 0; + state->readPagePtr = InvalidXLogRecPtr; } /* @@ -953,7 +994,6 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) XLogRecPtr targetPagePtr; int targetRecOff; uint32 pageHeaderSize; - int readLen; /* * Compute targetRecOff. It should typically be equal or greater than @@ -961,27 +1001,32 @@ XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) * that, except when caller has explicitly specified the offset that * falls somewhere there or when we are skipping multi-page * continuation record. It doesn't matter though because - * ReadPageInternal() is prepared to handle that and will read at - * least short page-header worth of data + * XLogNeedData() is prepared to handle that and will read at least + * short page-header worth of data */ targetRecOff = tmpRecPtr % XLOG_BLCKSZ; /* scroll back to page boundary */ targetPagePtr = tmpRecPtr - targetRecOff; - /* Read the page containing the record */ - readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); - if (readLen < 0) + while (XLogNeedData(state, targetPagePtr, targetRecOff, + targetRecOff != 0)) + { + if (!state->routine.page_read(state, state->readPagePtr, + state->readLen, + state->ReadRecPtr, state->readBuf)) + break; + } + + if (!state->page_verified) goto err; header = (XLogPageHeader) state->readBuf; pageHeaderSize = XLogPageHeaderSize(header); - /* make sure we have enough data for the page header */ - readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); - if (readLen < 0) - goto err; + /* we should have read the page header */ + Assert(state->readLen >= pageHeaderSize); /* skip over potential continuation data */ if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index d17d660f46..46eda33f25 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -686,8 +686,8 @@ XLogTruncateRelation(RelFileNode rnode, ForkNumber forkNum, void XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wantLength) { - const XLogRecPtr lastReadPage = (state->seg.ws_segno * - state->segcxt.ws_segsize + state->segoff); + const XLogRecPtr lastReadPage = state->seg.ws_segno * + state->segcxt.ws_segsize + state->readLen; Assert(wantPage != InvalidXLogRecPtr && wantPage % XLOG_BLCKSZ == 0); Assert(wantLength <= XLOG_BLCKSZ); @@ -824,7 +824,7 @@ wal_segment_close(XLogReaderState *state) * exists for normal backends, so we have to do a check/sleep/repeat style of * loop for now. */ -int +bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { @@ -926,7 +926,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, else if (targetPagePtr + reqLen > read_upto) { /* not enough data there */ - return -1; + state->readLen = -1; + return false; } else { @@ -944,7 +945,8 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, WALReadRaiseError(&errinfo); /* number of valid bytes in the buffer */ - return count; + state->readLen = count; + return true; } /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 4bf8a18e01..a4d6f30957 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -806,7 +806,7 @@ StartReplication(StartReplicationCmd *cmd) * which has to do a plain sleep/busy loop, because the walsender's latch gets * set every time WAL is flushed. */ -static int +static bool logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page) { @@ -826,7 +826,10 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* fail if not (implies we are going to shut down) */ if (flushptr < targetPagePtr + reqLen) - return -1; + { + state->readLen = -1; + return false; + } if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; /* more than one block available */ @@ -854,7 +857,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize); CheckXLogRemoved(segno, state->seg.ws_tli); - return count; + state->readLen = count; + return true; } /* diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 59ebac7d6a..cf119848b0 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -47,7 +47,7 @@ typedef struct XLogPageReadPrivate int tliIndex; } XLogPageReadPrivate; -static int SimpleXLogPageRead(XLogReaderState *xlogreader, +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); @@ -246,7 +246,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, } /* XLogReader callback function, to read a WAL page */ -static int +static bool SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf) { @@ -306,7 +306,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (private->restoreCommand == NULL) { pg_log_error("could not open file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } /* @@ -319,7 +320,10 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, private->restoreCommand); if (xlogreadfd < 0) - return -1; + { + xlogreader->readLen = -1; + return false; + } else pg_log_debug("using file \"%s\" restored from archive", xlogfpath); @@ -335,7 +339,8 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, if (lseek(xlogreadfd, (off_t) targetPageOff, SEEK_SET) < 0) { pg_log_error("could not seek in file \"%s\": %m", xlogfpath); - return -1; + xlogreader->readLen = -1; + return false; } @@ -348,13 +353,15 @@ SimpleXLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, pg_log_error("could not read file \"%s\": read %d of %zu", xlogfpath, r, (Size) XLOG_BLCKSZ); - return -1; + xlogreader->readLen = -1; + return false; } Assert(targetSegNo == xlogreadsegno); xlogreader->seg.ws_tli = targetHistory[private->tliIndex].tli; - return XLOG_BLCKSZ; + xlogreader->readLen = XLOG_BLCKSZ; + return true; } /* diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index f8b8afe4a7..75ece5c658 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -331,7 +331,7 @@ WALDumpCloseSegment(XLogReaderState *state) } /* pg_waldump's XLogReaderRoutine->page_read callback */ -static int +static bool WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetPtr, char *readBuff) { @@ -348,7 +348,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, else { private->endptr_reached = true; - return -1; + state->readLen = -1; + return false; } } @@ -373,7 +374,8 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, (Size) errinfo.wre_req); } - return count; + state->readLen = count; + return true; } /* diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 21d200d3df..5b66cab875 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -57,12 +57,12 @@ typedef struct WALSegmentContext typedef struct XLogReaderState XLogReaderState; -/* Function type definitions for various xlogreader interactions */ -typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader, - XLogRecPtr targetPagePtr, - int reqLen, - XLogRecPtr targetRecPtr, - char *readBuf); +/* Function type definition for the read_page callback */ +typedef bool (*XLogPageReadCB) (XLogReaderState *xlogreader, + XLogRecPtr targetPagePtr, + int reqLen, + XLogRecPtr targetRecPtr, + char *readBuf); typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader, XLogSegNo nextSegNo, TimeLineID *tli_p); @@ -175,6 +175,20 @@ struct XLogReaderState XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ + /* ---------------------------------------- + * Communication with page reader + * readBuf is XLOG_BLCKSZ bytes, valid up to at least readLen bytes. + * ---------------------------------------- + */ + /* variables to communicate with page reader */ + XLogRecPtr readPagePtr; /* page pointer to read */ + int32 readLen; /* bytes requested to reader, or actual bytes + * read by reader, which must be larger than + * the request, or -1 on error */ + TimeLineID readPageTLI; /* TLI for data currently in readBuf */ + char *readBuf; /* buffer to store data */ + bool page_verified; /* is the page on the buffer verified? */ + /* ---------------------------------------- * Decoded representation of current record @@ -203,13 +217,6 @@ struct XLogReaderState * ---------------------------------------- */ - /* - * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least - * readLen bytes) - */ - char *readBuf; - uint32 readLen; - /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h index 9ac602b674..364a21c4ea 100644 --- a/src/include/access/xlogutils.h +++ b/src/include/access/xlogutils.h @@ -47,7 +47,7 @@ extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, extern Relation CreateFakeRelcacheEntry(RelFileNode rnode); extern void FreeFakeRelcacheEntry(Relation fakerel); -extern int read_local_xlog_page(XLogReaderState *state, +extern bool read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *cur_page); extern void wal_segment_open(XLogReaderState *state, -- 2.30.1