diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5e5001b..fe337e2 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -586,53 +586,72 @@ restart: state->nonblocking = nonblocking; state->currRecPtr = RecPtr; assembled = false; + gotheader = false; targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); targetRecOff = RecPtr % XLOG_BLCKSZ; /* - * Read the page containing the record into state->readBuf. Request enough - * byte to cover the whole record header, or at least the part of it that - * fits on the same page. + * If this record spans multiple pages, it's possible that the first + * ReadPageInternal call succeeded while one of the following returned + * XLREAD_WOUDBLOCK. In that case, we already have the data from some + * of the calls in state->readRecordBuf, so skip re-reading it. + * This is especially important if the pages are in different WALs, + * in which case going back to a previous WAL might require re-fetching + * it from an archive - a potentially slow operation. */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); - if (readOff == XLREAD_WOULDBLOCK) - return XLREAD_WOULDBLOCK; - else if (readOff < 0) - goto err; - - /* - * ReadPageInternal always returns at least the page header, so we can - * examine it now. - */ - pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - if (targetRecOff == 0) - { + if (state->readRecordBufUsed == 0) { /* - * At page start, so skip over page header. + * Read the page containing the record into state->readBuf. Request + * enough bytes to cover the whole record header, or at least the part + * of it that fits on the same page. */ - RecPtr += pageHeaderSize; - targetRecOff = pageHeaderSize; - } - else if (targetRecOff < pageHeaderSize) - { - report_invalid_record(state, "invalid record offset at %X/%08X: expected at least %u, got %u", - LSN_FORMAT_ARGS(RecPtr), - pageHeaderSize, targetRecOff); - goto err; - } + readOff = ReadPageInternal(state, targetPagePtr, + Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); + if (readOff == XLREAD_WOULDBLOCK) + return XLREAD_WOULDBLOCK; + else if (readOff < 0) + goto err; - if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && - targetRecOff == pageHeaderSize) - { - report_invalid_record(state, "contrecord is requested by %X/%08X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + /* + * ReadPageInternal always returns at least the page header, so we can + * examine it now. + */ + pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); + if (targetRecOff == 0) + { + /* + * At page start, so skip over page header. + */ + RecPtr += pageHeaderSize; + targetRecOff = pageHeaderSize; + } + else if (targetRecOff < pageHeaderSize) + { + report_invalid_record(state, + "invalid record offset at %X/%08X: expected at least %u, got %u", + LSN_FORMAT_ARGS(RecPtr), + pageHeaderSize, targetRecOff); + goto err; + } - /* ReadPageInternal has verified the page header */ - Assert(pageHeaderSize <= readOff); + if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && + targetRecOff == pageHeaderSize) + { + report_invalid_record(state, "contrecord is requested by %X/%08X", + LSN_FORMAT_ARGS(RecPtr)); + goto err; + } + + /* ReadPageInternal has verified the page header */ + Assert(pageHeaderSize <= readOff); + + record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); + } else { + record = (XLogRecord *) state->readRecordBuf; + /* If we have the header, then it has already been validated. */ + gotheader = state->readRecordBufUsed >= SizeOfXLogRecord; + } /* * Read the record length. @@ -643,28 +662,25 @@ restart: * cannot access any other fields until we've verified that we got the * whole header. */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); total_len = record->xl_tot_len; - /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. - */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) - { - if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, - randAccess)) - goto err; - gotheader = true; - } - else - { - /* There may be no next page if it's too small. */ - if (total_len < SizeOfXLogRecord) + if (!gotheader) { + /* + * If the whole record header is on this page, validate it immediately. + * Otherwise do just a basic sanity check on xl_tot_len, and validate + * the rest of the header after reading it from the next page. The + * xl_tot_len check is necessary here to ensure that we enter the "Need + * to reassemble record" code path below; otherwise we might fail to + * apply ValidXLogRecordHeader at all. + */ + if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + { + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, + record, randAccess)) + goto err; + gotheader = true; + } + else if (total_len < SizeOfXLogRecord) { report_invalid_record(state, "invalid record length at %X/%08X: expected at least %u, got %u", @@ -672,8 +688,6 @@ restart: (uint32) SizeOfXLogRecord, total_len); goto err; } - /* We'll validate the header once we have the next page. */ - gotheader = false; } /* @@ -712,11 +726,20 @@ restart: Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2); Assert(state->readRecordBufSize >= len); - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; + if (state->readRecordBufUsed == 0) { + /* Copy the first fragment of the record from the first page. */ + memcpy(state->readRecordBuf, + state->readBuf + RecPtr % XLOG_BLCKSZ, len); + state->readRecordBufUsed = len; + state->readRecordBufPage = targetPagePtr; + buffer = state->readRecordBuf + len; + gotlen = len; + record = (XLogRecord *) state->readRecordBuf; + } else { + gotlen = state->readRecordBufUsed; + buffer = state->readRecordBuf + gotlen; + targetPagePtr = state->readRecordBufPage; + } do { @@ -749,6 +772,7 @@ restart: if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) { state->overwrittenRecPtr = RecPtr; + state->readRecordBufUsed = 0; RecPtr = targetPagePtr; goto restart; } @@ -839,10 +863,12 @@ restart: memcpy(state->readRecordBuf, save_copy, gotlen); buffer = state->readRecordBuf + gotlen; } + + state->readRecordBufUsed = gotlen; + state->readRecordBufPage = targetPagePtr; } while (gotlen < total_len); Assert(gotheader); - record = (XLogRecord *) state->readRecordBuf; if (!ValidXLogRecord(state, record, RecPtr)) goto err; @@ -870,6 +896,8 @@ restart: state->DecodeRecPtr = RecPtr; } + state->readRecordBufUsed = 0; + /* * Special processing if it's an XLOG SWITCH record */ @@ -1126,6 +1154,7 @@ XLogReaderInvalReadState(XLogReaderState *state) state->seg.ws_segno = 0; state->segoff = 0; state->readLen = 0; + state->readRecordBufUsed = 0; } /* @@ -1626,6 +1655,7 @@ ResetDecoder(XLogReaderState *state) state->decode_queue_tail = NULL; state->decode_queue_head = NULL; state->record = NULL; + state->readRecordBufUsed = 0; /* Reset the decode buffer to empty. */ state->decode_buffer_tail = state->decode_buffer; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index ae2398d..d4e6820 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -3167,9 +3167,6 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, private->randAccess = !XLogRecPtrIsValid(xlogreader->ReadRecPtr); private->replayTLI = replayTLI; - /* This is the first attempt to read this page. */ - lastSourceFailed = false; - for (;;) { char *errormsg; diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index dfabbbd..1b30be4 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -304,7 +304,9 @@ struct XLogReaderState * crosses a page boundary. */ char *readRecordBuf; - uint32 readRecordBufSize; + uint32 readRecordBufSize; + uint32 readRecordBufUsed; + XLogRecPtr readRecordBufPage; /* Buffer to hold error message */ char *errormsg_buf;