From eb42223e869ddc8b4d5c7a62fe8f1e6e58eedced Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 11 Oct 2022 19:15:58 +0200 Subject: [PATCH 2/3] Compactify xlog format Reduces xlog header size to 16B in many cases, and <=21B in the common case, saving 3-8 bytes /record. Future improvements will see up to 2 more bytes saved per registered block. Note: prepared_xacts tests are disabled for now; because I had issues that I couldn't trace down; resulting in a lock waiting for terminated backends. IDK how that happened, but that's for a later day. --- contrib/pg_walinspect/pg_walinspect.c | 4 +- src/backend/access/heap/heapam.c | 24 +- src/backend/access/heap/rewriteheap.c | 3 +- src/backend/access/transam/multixact.c | 3 +- src/backend/access/transam/twophase.c | 2 +- src/backend/access/transam/xact.c | 6 +- src/backend/access/transam/xlog.c | 90 +++- src/backend/access/transam/xloginsert.c | 141 +++++- src/backend/access/transam/xlogprefetcher.c | 2 +- src/backend/access/transam/xlogreader.c | 438 ++++++++++++------ src/backend/access/transam/xlogrecovery.c | 31 +- src/backend/catalog/storage.c | 8 +- src/backend/commands/dbcommands.c | 20 +- src/backend/replication/logical/decode.c | 6 + src/backend/replication/logical/logical.c | 2 +- .../replication/logical/logicalfuncs.c | 2 +- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 2 +- src/backend/utils/cache/inval.c | 3 +- src/bin/pg_resetwal/pg_resetwal.c | 23 +- src/bin/pg_rewind/parsexlog.c | 6 +- src/bin/pg_waldump/pg_waldump.c | 2 +- src/include/access/xloginsert.h | 2 +- src/include/access/xlogprefetcher.h | 4 +- src/include/access/xlogreader.h | 6 +- src/include/access/xlogrecord.h | 173 ++++++- src/test/regress/parallel_schedule | 7 +- 27 files changed, 748 insertions(+), 264 deletions(-) diff --git a/contrib/pg_walinspect/pg_walinspect.c b/contrib/pg_walinspect/pg_walinspect.c index f4e3b40bed..0c6eeab557 100644 --- a/contrib/pg_walinspect/pg_walinspect.c +++ b/contrib/pg_walinspect/pg_walinspect.c @@ -38,7 +38,7 @@ PG_FUNCTION_INFO_V1(pg_get_wal_stats_till_end_of_wal); static bool IsFutureLSN(XLogRecPtr lsn, XLogRecPtr *curr_lsn); static XLogReaderState *InitXLogReaderState(XLogRecPtr lsn); -static XLogRecord *ReadNextXLogRecord(XLogReaderState *xlogreader); +static XLRHeaderData *ReadNextXLogRecord(XLogReaderState *xlogreader); static void GetWALRecordInfo(XLogReaderState *record, Datum *values, bool *nulls, uint32 ncols); static XLogRecPtr ValidateInputLSNs(bool till_end_of_wal, @@ -138,7 +138,7 @@ InitXLogReaderState(XLogRecPtr lsn) * encounter errors if the flush pointer falls in the middle of a record. In * that case we'll return NULL. */ -static XLogRecord * +static XLRHeaderData * ReadNextXLogRecord(XLogReaderState *xlogreader) { XLogRecord *record; diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 94e702cdb2..22b12da928 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2171,7 +2171,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - recptr = XLogInsert(RM_HEAP_ID, rminfo); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + rminfo, InvalidCommandId); PageSetLSN(page, recptr); } @@ -2519,7 +2520,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - recptr = XLogInsert(RM_HEAP2_ID, rminfo); + recptr = XLogInsertExtended(RM_HEAP2_ID, XLR_HAS_XID, + rminfo, InvalidCommandId); PageSetLSN(page, recptr); } @@ -3018,7 +3020,8 @@ l1: /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + XLOG_HEAP_DELETE, InvalidCommandId); PageSetLSN(page, recptr); } @@ -5813,7 +5816,8 @@ heap_finish_speculative(Relation relation, ItemPointer tid) XLogRegisterData((char *) &xlrec, SizeOfHeapConfirm); XLogRegisterBuffer(0, buffer, REGBUF_STANDARD); - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_CONFIRM); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + XLOG_HEAP_CONFIRM, InvalidCommandId); PageSetLSN(page, recptr); } @@ -5956,7 +5960,8 @@ heap_abort_speculative(Relation relation, ItemPointer tid) /* No replica identity & replication origin logged */ - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + XLOG_HEAP_DELETE, InvalidCommandId); PageSetLSN(page, recptr); } @@ -6067,7 +6072,8 @@ heap_inplace_update(Relation relation, HeapTuple tuple) /* inplace updates aren't decoded atm, don't log the origin */ - recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_INPLACE); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + XLOG_HEAP_INPLACE, InvalidCommandId); PageSetLSN(page, recptr); } @@ -8439,7 +8445,8 @@ log_heap_update(Relation reln, Buffer oldbuf, /* filtering by origin on a row level is much more efficient */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - recptr = XLogInsert(RM_HEAP_ID, rminfo); + recptr = XLogInsertExtended(RM_HEAP_ID, XLR_HAS_XID, + rminfo, InvalidCommandId); return recptr; } @@ -8513,7 +8520,8 @@ log_heap_new_cid(Relation relation, HeapTuple tup) /* will be looked at irrespective of origin */ - recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_NEW_CID); + recptr = XLogInsertExtended(RM_HEAP2_ID, XLR_HAS_XID, + XLOG_HEAP2_NEW_CID, InvalidCommandId); return recptr; } diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c index b01b39b008..0b54dec99d 100644 --- a/src/backend/access/heap/rewriteheap.c +++ b/src/backend/access/heap/rewriteheap.c @@ -927,7 +927,8 @@ logical_heap_rewrite_flush_mappings(RewriteState state) XLogRegisterData(waldata_start, len); /* write xlog record */ - XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_REWRITE); + XLogInsertExtended(RM_HEAP2_ID, XLR_HAS_XID, + XLOG_HEAP2_REWRITE, InvalidCommandId); pfree(waldata_start); } diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index ca6e238542..2fe15635e9 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -835,7 +835,8 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members) XLogRegisterData((char *) (&xlrec), SizeOfMultiXactCreate); XLogRegisterData((char *) members, nmembers * sizeof(MultiXactMember)); - (void) XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID); + (void) XLogInsertExtended(RM_MULTIXACT_ID, XLR_HAS_XID, + XLOG_MULTIXACT_CREATE_ID, InvalidCommandId); /* Now enter the information into the OFFSETs and MEMBERs logs */ RecordNewMultiXact(multi, offset, nmembers, members); diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 98c1ef7ec5..0bae7c71ea 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1396,7 +1396,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok) static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len) { - XLogRecord *record; + XLRHeaderData *record; XLogReaderState *xlogreader; char *errormsg; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 8ce44abfcf..fd9ce4d41f 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -5769,7 +5769,8 @@ XactLogCommitRecord(TimestampTz commit_time, /* we allow filtering by xacts */ XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsertExtended(RM_XACT_ID, info, rminfo); + return XLogInsertExtended(RM_XACT_ID, info | XLR_HAS_XID, + rminfo, InvalidCommandId); } /* @@ -5917,7 +5918,8 @@ XactLogAbortRecord(TimestampTz abort_time, if (TransactionIdIsValid(twophase_xid)) XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN); - return XLogInsertExtended(RM_XACT_ID, info, rminfo); + return XLogInsertExtended(RM_XACT_ID, info | XLR_HAS_XID, + rminfo, InvalidCommandId); } /* diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3058041683..02ec968697 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -742,16 +742,16 @@ XLogInsertRecord(XLogRecData *rdata, pg_crc32c rdata_crc; bool inserted; XLogRecord *rechdr = (XLogRecord *) rdata->data; - uint8 rminfo = rechdr->xl_rminfo; - bool isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID && - rminfo == XLOG_SWITCH); + uint8 rminfo; + uint32 reclength; + bool isLogSwitch; XLogRecPtr StartPos; XLogRecPtr EndPos; bool prevDoPageWrites = doPageWrites; TimeLineID insertTLI; /* we assume that all of the record header is in the first chunk */ - Assert(rdata->len >= SizeOfXLogRecord); + Assert(rdata->len >= MinXLogHeaderSize); /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) @@ -763,6 +763,40 @@ XLogInsertRecord(XLogRecData *rdata, */ insertTLI = XLogCtl->InsertTimeLineID; + if (rechdr->xl_info & XLR_HAS_RMINFO) + { + Size offset = MinXLogHeaderSize; + + switch (rechdr->xl_info & XLR2_LEN_MASK) + { + case XLR2_LEN_ABSENT: + break; + case XLR2_LEN_1B: + offset += 1; + break; + case XLR2_LEN_2B: + offset += 2; + break; + case XLR2_LEN_4B: + offset += 4; + break; + default: + pg_unreachable(); + } + if (rechdr->xl_info & XLR_HAS_XID) + offset += sizeof(TransactionId); + if (rechdr->xl_info & XLR_HAS_CID) + offset += sizeof(CommandId); + + rminfo = *((uint8 *) (rdata->data + offset)); + } + else + rminfo = 0; + + reclength = XLogRecordGetLength(rechdr); + + isLogSwitch = (rechdr->xl_rmid == RM_XLOG_ID && rminfo == XLOG_SWITCH); + /*---------- * * We have now done all the preparatory work we can without holding a @@ -845,7 +879,7 @@ XLogInsertRecord(XLogRecData *rdata, inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); else { - ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, + ReserveXLogInsertLocation(reclength, &StartPos, &EndPos, &rechdr->xl_prev); inserted = true; } @@ -865,7 +899,7 @@ XLogInsertRecord(XLogRecData *rdata, * All the record data, including the header, is now ready to be * inserted. Copy the record in the space reserved. */ - CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, + CopyXLogRecordToWAL(reclength, isLogSwitch, rdata, StartPos, EndPos, insertTLI); /* @@ -896,14 +930,18 @@ XLogInsertRecord(XLogRecData *rdata, END_CRIT_SECTION(); - MarkCurrentTransactionIdLoggedIfAny(); + if (rechdr->xl_info & XLR_HAS_XID) + MarkCurrentTransactionIdLoggedIfAny(); /* * Mark top transaction id is logged (if needed) so that we should not try * to log it again with the next WAL record in the current subtransaction. */ if (topxid_included) + { + Assert(rechdr->xl_info & XLR_HAS_XID); MarkSubxactTopXidLogged(); + } /* * Update shared LogwrtRqst.Write, if we crossed page boundary. @@ -936,7 +974,8 @@ XLogInsertRecord(XLogRecData *rdata, */ if (inserted) { - EndPos = StartPos + SizeOfXLogRecord; + /* switch record is no more than a min-sized header plus rminfo */ + EndPos = StartPos + MAXALIGN(MinXLogHeaderSize + sizeof(uint8)); if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { uint64 offset = XLogSegmentOffset(EndPos, wal_segment_size); @@ -977,7 +1016,7 @@ XLogInsertRecord(XLogRecData *rdata, /* We also need temporary space to decode the record. */ record = (XLogRecord *) recordBuf.data; decoded = (DecodedXLogRecord *) - palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len)); + palloc(DecodeXLogRecordRequiredSpace(reclength)); if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, @@ -1022,7 +1061,7 @@ XLogInsertRecord(XLogRecData *rdata, /* Report WAL traffic to the instrumentation. */ if (inserted) { - pgWalUsage.wal_bytes += rechdr->xl_tot_len; + pgWalUsage.wal_bytes += reclength; pgWalUsage.wal_records++; pgWalUsage.wal_fpi += num_fpi; } @@ -1056,7 +1095,7 @@ ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, size = MAXALIGN(size); /* All (non xlog-switch) records should contain data. */ - Assert(size > SizeOfXLogRecord); + Assert(size > MinXLogHeaderSize); /* * The duration the spinlock needs to be held is minimized by minimizing @@ -1107,7 +1146,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) uint64 startbytepos; uint64 endbytepos; uint64 prevbytepos; - uint32 size = MAXALIGN(SizeOfXLogRecord); + uint32 size = MAXALIGN(MinXLogHeaderSize + sizeof(uint8)); XLogRecPtr ptr; uint32 segleft; @@ -1250,8 +1289,11 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, */ if (isLogSwitch && XLogSegmentOffset(CurrPos, wal_segment_size) != 0) { - /* An xlog-switch record doesn't contain any data besides the header */ - Assert(write_len == SizeOfXLogRecord); + /* + * An xlog-switch record doesn't contain any data besides the basic + * header and rminfo + */ + Assert(write_len == MAXALIGN(MinXLogHeaderSize + sizeof(uint8))); /* Assert that we did reserve the right amount of space */ Assert(XLogSegmentOffset(EndPos, wal_segment_size) == 0); @@ -4672,6 +4714,9 @@ BootStrapXLOG(void) uint64 sysidentifier; struct timeval tv; pg_crc32c crc; + const uint32 rec_tot_len = MinXLogHeaderSize + sizeof(uint8) /* xl_tot_len */ + + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint); + const uint8 rec_len = rec_tot_len; /* allow ordinary WAL segment creation, like StartupXLOG() would */ SetInstallXLogFileSegmentActive(); @@ -4746,20 +4791,23 @@ BootStrapXLOG(void) recptr = ((char *) page + SizeOfXLogLongPHD); record = (XLogRecord *) recptr; record->xl_prev = 0; - record->xl_xid = InvalidTransactionId; - record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint); - record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; + record->xl_info = XLR2_LEN_1B; record->xl_rmid = RM_XLOG_ID; - recptr += SizeOfXLogRecord; + + recptr += MinXLogHeaderSize; + + memcpy(recptr, (char *) &rec_len, sizeof(uint8)); + recptr += sizeof(uint8); + /* fill the XLogRecordDataHeaderShort struct */ *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = sizeof(checkPoint); - memcpy(recptr, &checkPoint, sizeof(checkPoint)); + memcpy(recptr, (char *) &checkPoint, sizeof(checkPoint)); recptr += sizeof(checkPoint); - Assert(recptr - (char *) record == record->xl_tot_len); + Assert(recptr - (char *) record == rec_len); INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); + COMP_CRC32C(crc, ((char *) record) + offsetof(XLogRecord, xl_rmid), rec_len - offsetof(XLogRecord, xl_rmid)); COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); record->xl_crc = crc; diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index cc4a262d8e..2ac1d0a870 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -118,7 +118,7 @@ static char *hdr_scratch = NULL; #define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ - (SizeOfXLogRecord + \ + (MaxXLogHeaderSize + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ SizeOfXLogTransactionId) @@ -135,10 +135,11 @@ static bool begininsert_called = false; /* Memory context to hold the registered buffer and data references. */ static MemoryContext xloginsert_cxt; -static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, - XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi, - bool *topxid_included); +static XLogRecData * +XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, + XLogRecPtr RedoRecPtr, bool doPageWrites, + XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included, + CommandId cid); static bool XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length, char *dest, uint16 *dlen); @@ -447,11 +448,15 @@ XLogSetRecordFlags(uint8 flags) * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) + * + * Note: To include the XID in the record, you have to use + * XLogInsertExtended with info = XLR2_HAS_XID. To include CID, do the + * same with HAS_CID, and specify the command ID. */ XLogRecPtr XLogInsert(RmgrId rmid, uint8 rminfo) { - return XLogInsertExtended(rmid, 0, rminfo); + return XLogInsertExtended(rmid, 0, rminfo, InvalidCommandId); } @@ -459,11 +464,15 @@ XLogInsert(RmgrId rmid, uint8 rminfo) * Insert an XLOG record having the specified RMID, info and rminfo bytes, * with the body of the record being the data and buffer references * registered earlier with XLogRegister* calls. + * + * CommandId is an argument to be called by the user, while TransactionId + * (if needed) is taken from this backend's state: There is at any time + * only one running XID, while there may be more than one active CommandIds. * * See also XLogInsert above. */ XLogRecPtr -XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo) +XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo, CommandId cid) { XLogRecPtr EndPos; @@ -475,8 +484,7 @@ XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo) * The caller can set XLR_SPECIAL_REL_UPDATE and * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me. */ - if ((info & ~(XLR_SPECIAL_REL_UPDATE | - XLR_CHECK_CONSISTENCY)) != 0) + if ((info & ~XLR_INFO_USERFLAGS) != 0) elog(PANIC, "invalid xlog info mask %02X", info); TRACE_POSTGRESQL_WAL_INSERT(rmid, info); @@ -509,7 +517,8 @@ XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo) GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); rdt = XLogRecordAssemble(rmid, info, rminfo, RedoRecPtr, doPageWrites, - &fpw_lsn, &num_fpi, &topxid_included); + &fpw_lsn, &num_fpi, &topxid_included, + cid); EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi, topxid_included); @@ -538,30 +547,57 @@ XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo) static XLogRecData * XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included) + XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included, + CommandId cid) { XLogRecData *rdt; uint32 total_len = 0; int block_id; + bool only_hdr = true; pg_crc32c rdata_crc; registered_buffer *prev_regbuf = NULL; XLogRecData *rdt_datas_last; - XLogRecord *rechdr; + XLogRecord *rechdr = (XLogRecord *) hdr_scratch; char *scratch = hdr_scratch; + Assert((info & ~(XLR_INFO_USERFLAGS)) == 0); + /* * Note: this function can be called multiple times for the same record. * All the modifications we do to the rdata chains below must handle that. */ - /* The record begins with the fixed-size header */ - rechdr = (XLogRecord *) scratch; - scratch += SizeOfXLogRecord; + /* + * The record begins with the variable-size header data. We pre-allocate + * the fixed part of the xlog header section, plus the length field, as + * we'll only fill those at the end of the record. The rest can be + * pre-filled. + */ + scratch += MinXLogHeaderSize + sizeof(uint32); hdr_rdt.next = NULL; rdt_datas_last = &hdr_rdt; hdr_rdt.data = hdr_scratch; + if (IsSubxactTopXidLogPending()) + info |= XLR_HAS_XID; + + if (info & XLR_HAS_XID) + { + TransactionId xid = GetCurrentTransactionIdIfAny(); + memcpy(scratch, (char *) &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + + if (info & XLR_HAS_CID) + { + memcpy(scratch, (char *) &cid, sizeof(CommandId)); + scratch += sizeof(CommandId); + } + + if (rminfo != 0) + *(scratch++) = rminfo; + /* * Enforce consistency checks for this record if user is looking for it. * Do this before at the beginning of this routine to give the possibility @@ -592,6 +628,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, if (!regbuf->in_use) continue; + only_hdr = false; /* Determine if this block needs to be backed up */ if (regbuf->flags & REGBUF_FORCE_IMAGE) needs_backup = true; @@ -832,6 +869,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && replorigin_session_origin != InvalidRepOriginId) { + only_hdr = false; *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); scratch += sizeof(replorigin_session_origin); @@ -845,6 +883,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, /* Set the flag that the top xid is included in the WAL */ *topxid_included = true; + only_hdr = false; *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; memcpy(scratch, &xid, sizeof(TransactionId)); scratch += sizeof(TransactionId); @@ -853,6 +892,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, /* followed by main data, if any */ if (mainrdata_len > 0) { + only_hdr = false; + if (mainrdata_len > 255) { *(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; @@ -873,16 +914,75 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, hdr_rdt.len = (scratch - hdr_scratch); total_len += hdr_rdt.len; + + /* ensure that we haven't yet set the length mask */ + Assert((info & XLR2_LEN_MASK) == 0); + + /* + * Here, the full xlog header has been constructed, except for the xlog + * record length, and the constant fields in the xlog header. + */ + + /* fill the xlog header length field and mask */ + if (only_hdr) + { + Assert(total_len - sizeof(uint32) == XLRSizeOfHeader(info)); + info |= XLR2_LEN_ABSENT; + memmove(hdr_scratch + MinXLogHeaderSize, + hdr_scratch + MinXLogHeaderSize + sizeof(uint32), + hdr_rdt.len - MinXLogHeaderSize - sizeof(uint32)); + hdr_rdt.len = hdr_rdt.len - sizeof(uint32); + } + else if (total_len - sizeof(uint32) <= UINT8_MAX - sizeof(uint8)) + { + uint8 size = (uint8) (total_len - sizeof(uint32)) + sizeof(uint8); + info |= XLR2_LEN_1B; + memmove(hdr_scratch + MinXLogHeaderSize + sizeof(uint8), + hdr_scratch + MinXLogHeaderSize + sizeof(uint32), + hdr_rdt.len - MinXLogHeaderSize - sizeof(uint32)); + memcpy(hdr_scratch + MinXLogHeaderSize, (char *) &size, sizeof(uint8)); + total_len = size; + hdr_rdt.len = hdr_rdt.len - sizeof(uint32) + sizeof(uint8); + } + else if (total_len - sizeof(uint32) <= UINT16_MAX - sizeof(uint16)) + { + uint16 size = (uint16) (total_len - sizeof(uint32)) + sizeof(uint16); + info |= XLR2_LEN_2B; + memmove(hdr_scratch + MinXLogHeaderSize + sizeof(uint16), + hdr_scratch + MinXLogHeaderSize + sizeof(uint32), + hdr_rdt.len - MinXLogHeaderSize - sizeof(uint32)); + memcpy(hdr_scratch + MinXLogHeaderSize, (char *) &size, sizeof(uint16)); + total_len = size; + hdr_rdt.len = hdr_rdt.len - sizeof(uint32) + sizeof(uint16); + } + else + { + uint32 size = total_len; + info |= XLR2_LEN_4B; + memcpy(hdr_scratch + MinXLogHeaderSize, (char *) &size, sizeof(uint32)); + total_len = size; + } + + /* + * We've filled all variable-length data of the xlog header section, + * which allows us to start CRC-ing the data. + */ + + rechdr->xl_rmid = rmid; + rechdr->xl_info = info; + /* * Calculate CRC of the data * * Note that the record header isn't added into the CRC initially since we * don't know the prev-link yet. Thus, the CRC will represent the CRC of - * the whole record in the order: rdata, then backup blocks, then record - * header. + * the whole record in the order: xl_rmid, xl_info, varlen header fields, + * rdata, then backup blocks, then prevptr. */ INIT_CRC32C(rdata_crc); - COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + COMP_CRC32C(rdata_crc, + hdr_rdt.data + offsetof(XLogRecord, xl_rmid), + hdr_rdt.len - offsetof(XLogRecord, xl_rmid)); for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) COMP_CRC32C(rdata_crc, rdt->data, rdt->len); @@ -891,11 +991,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, uint8 rminfo, * once we know where in the WAL the record will be inserted. The CRC does * not include the record header yet. */ - rechdr->xl_xid = GetCurrentTransactionIdIfAny(); - rechdr->xl_tot_len = total_len; - rechdr->xl_info = info; - rechdr->xl_rmid = rmid; - rechdr->xl_rminfo = rminfo; rechdr->xl_prev = InvalidXLogRecPtr; rechdr->xl_crc = rdata_crc; diff --git a/src/backend/access/transam/xlogprefetcher.c b/src/backend/access/transam/xlogprefetcher.c index e5d4f36182..f885670263 100644 --- a/src/backend/access/transam/xlogprefetcher.c +++ b/src/backend/access/transam/xlogprefetcher.c @@ -983,7 +983,7 @@ XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr) * A wrapper for XLogReadRecord() that provides the same interface, but also * tries to initiate I/O for blocks referenced in future WAL records. */ -XLogRecord * +XLRHeaderData * XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, char **errmsg) { DecodedXLogRecord *record; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 9200d7f56c..56bf86cb27 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -49,7 +49,8 @@ static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, static void XLogReaderInvalReadState(XLogReaderState *state); static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, - XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); + XLogRecPtr PrevRecPtr, XLogRecord *record, + bool randAccess); static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr); static void ResetDecoder(XLogReaderState *state); @@ -418,7 +419,7 @@ XLogNextRecord(XLogReaderState *state, char **errormsg) * The returned pointer (or *errormsg) points to an internal buffer that's * valid until the next call to XLogReadRecord. */ -XLogRecord * +XLRHeaderData * XLogReadRecord(XLogReaderState *state, char **errormsg) { DecodedXLogRecord *decoded; @@ -531,6 +532,83 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi return NULL; } +/* + * Fills contdata with pointer to record continuation data of the next page, + * and len with the length of the data on that page. + */ +#define ReadPageData(dataNeeded, gotLen, expectLen, strict) \ + do \ + { \ + /* Wait for the next page to become available */ \ + readOff = ReadPageInternal(state, targetPagePtr, \ + Min((dataNeeded) + SizeOfXLogShortPHD, \ + XLOG_BLCKSZ)); \ + \ + if (readOff == XLREAD_WOULDBLOCK) \ + return XLREAD_WOULDBLOCK; \ + else if (readOff < 0) \ + goto err; \ + Assert(SizeOfXLogShortPHD <= readOff); \ + pageHeader = (XLogPageHeader) state->readBuf; \ + /* + * If we were expecting a continuation record and got an + * "overwrite contrecord" flag, that means the continuation record + * was overwritten with a different record. Restart the read by + * assuming the address to read is the location where we found + * this flag; but keep track of the LSN of the record we were + * reading, for later verification. + */ \ + if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) \ + { \ + state->overwrittenRecPtr = RecPtr; \ + RecPtr = targetPagePtr; \ + goto restart; \ + } \ + \ + /* Check that the continuation on next page looks valid */ \ + if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) \ + { \ + report_invalid_record(state, \ + "there is no contrecord flag at %X/%X", \ + LSN_FORMAT_ARGS(RecPtr)); \ + goto err; \ + } \ + /* + * Cross-check that xlp_rem_len agrees with how much of the record + * we expect there to be left. + */ \ + if (pageHeader->xlp_rem_len == 0 || (strict) ? ( \ + (expectLen) != (pageHeader->xlp_rem_len + (gotLen)) \ + ) : ( \ + (expectLen) < (pageHeader->xlp_rem_len + (gotLen)) \ + )) \ + { \ + report_invalid_record(state, \ + "invalid contrecord length %u (expected %lld) at %X/%X", \ + pageHeader->xlp_rem_len, \ + ((long long) (expectLen)) - (gotLen), \ + LSN_FORMAT_ARGS(RecPtr)); \ + goto err; \ + } \ + /* Append the continuation from this page to the buffer */ \ + pageHeaderSize = XLogPageHeaderSize(pageHeader); \ + \ + if (readOff < Min(pageHeaderSize + (dataNeeded), XLOG_BLCKSZ)) \ + { \ + readOff = ReadPageInternal(state, targetPagePtr, \ + Min(pageHeaderSize + (dataNeeded), XLOG_BLCKSZ)); \ + if (readOff == XLREAD_WOULDBLOCK) \ + return XLREAD_WOULDBLOCK; \ + else if (readOff < 0) \ + goto err; \ + } \ + Assert(pageHeaderSize <= readOff); \ + \ + /* point to the right */\ + contdata = (char *) state->readBuf + pageHeaderSize; \ + len = readOff - pageHeaderSize; \ + } while (false) + static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) { @@ -543,10 +621,16 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) uint32 targetRecOff; uint32 pageHeaderSize; bool assembled; - bool gotheader; int readOff; DecodedXLogRecord *decoded; char *errormsg; /* not used */ + union { + char bytes[MaxXLogHeaderSize]; + XLogRecord hdr; + } rechdr = {0}; /* record header buffer for cross-page header reads */ + uint32 rechdrsize = 0; + XLogPageHeader pageHeader; + char *contdata; /* * randAccess indicates whether to verify the previous-record pointer of @@ -602,7 +686,8 @@ restart: * fits on the same page. */ readOff = ReadPageInternal(state, targetPagePtr, - Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); + Min(targetRecOff + MAXALIGN(MinXLogHeaderSize), + XLOG_BLCKSZ)); if (readOff == XLREAD_WOULDBLOCK) return XLREAD_WOULDBLOCK; else if (readOff < 0) @@ -639,47 +724,126 @@ restart: /* ReadPageInternal has verified the page header */ Assert(pageHeaderSize <= readOff); - /* - * Read the record length. - * - * NB: Even though we use an XLogRecord pointer here, the whole record - * header might not fit on this page. xl_tot_len is the first field of the - * struct, so it must be on this page (the records are MAXALIGNed), but we - * cannot access any other fields until we've verified that we got the - * whole header. - */ - record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); - total_len = record->xl_tot_len; + contdata = state->readBuf + targetRecOff; + len = state->readLen - targetRecOff; + + rechdrsize = Min(len, MinXLogHeaderSize); + + memcpy(&rechdr.bytes[0], contdata, rechdrsize); + + contdata += rechdrsize; + len -= rechdrsize; + /* - * If the whole record header is on this page, validate it immediately. - * Otherwise do just a basic sanity check on xl_tot_len, and validate the - * rest of the header after reading it from the next page. The xl_tot_len - * check is necessary here to ensure that we enter the "Need to reassemble - * record" code path below; otherwise we might fail to apply - * ValidXLogRecordHeader at all. + * If we don't yet have the minimum required data of an XLogRecord, + * we have to read from the next page. */ - if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) + if (rechdrsize < MinXLogHeaderSize) { - if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, - randAccess)) - goto err; - gotheader = true; + int gotlen; + + /* Calculate pointer to beginning of next page */ + targetPagePtr += XLOG_BLCKSZ; + ReadPageData(MinXLogHeaderSize - rechdrsize, + rechdrsize, + MinXLogHeaderSize, + false); + + gotlen = Min(len, MinXLogHeaderSize - rechdrsize); + + memcpy(&rechdr.bytes[rechdrsize], + contdata, + gotlen); + rechdrsize += gotlen; + contdata += gotlen; + len -= gotlen; } - else + + Assert(rechdrsize >= MinXLogHeaderSize); + + /* + * We now have a minimal XLogHeader in buffers. This is enough + * to determine the size of all header data, but we don't yet have + * all of that. + */ + + total_len = XLRSizeOfHeader(rechdr.hdr.xl_info); + + /* + * We now known the expected size of the xlog header structures + * (which includes the xl_rec_len field). + * + * If the full headers section requires more data than what we've loaded + * by now, we need to read that data. + */ + if (total_len > rechdrsize) { - /* XXX: more validation should be done here */ - if (total_len < SizeOfXLogRecord) + /* Consume any bytes that are still in the buffer */ + if (len > 0) { - report_invalid_record(state, - "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, total_len); - goto err; + int consumed = Min(len, total_len - rechdrsize); + + memcpy(&rechdr.bytes[rechdrsize], + contdata, + consumed); + contdata += consumed; + rechdrsize += consumed; + len -= consumed; + } + + /* Continue reading from this page if the page wasn't fully read */ + if (total_len > rechdrsize && state->readLen < XLOG_BLCKSZ) + { + int consumed; + int needed = Min(state->readLen + total_len - rechdrsize, + XLOG_BLCKSZ); + + ReadPageData(needed, rechdrsize, total_len, false); + consumed = Min(total_len - rechdrsize, len); + + memcpy(&rechdr.bytes[rechdrsize], + contdata, + consumed); + + contdata += consumed; + rechdrsize += consumed; + len -= consumed; + } + + /* If we're still not done, read from the next page */ + if (total_len > rechdrsize && state->readLen >= XLOG_BLCKSZ) + { + int consumed, + needed = total_len - rechdrsize; + targetPagePtr += XLOG_BLCKSZ; + + ReadPageData(needed, rechdrsize, total_len, false); + consumed = Min(total_len - rechdrsize, len); + + memcpy(&rechdr.bytes[rechdrsize], + contdata, + consumed); + + contdata += consumed; + rechdrsize += consumed; + len -= consumed; } - gotheader = false; } + /* + * By now, the full record header is loaded; allowing us to read the + * record's length field. + */ + total_len = XLogRecordGetLength(&rechdr.hdr); + + /* + * Because the xlog header is loaded, we should validate it as well. + */ + if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, &rechdr.hdr, + randAccess)) + goto err; + /* * Find space to decode this record. Don't allow oversized allocation if * the caller requested nonblocking. Otherwise, we *have* to try to @@ -705,12 +869,9 @@ restart: goto err; } - len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; - if (total_len > len) + if (total_len > (XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ)) { /* Need to reassemble record */ - char *contdata; - XLogPageHeader pageHeader; char *buffer; uint32 gotlen; @@ -728,105 +889,51 @@ restart: goto err; } - /* Copy the first fragment of the record from the first page. */ - memcpy(state->readRecordBuf, - state->readBuf + RecPtr % XLOG_BLCKSZ, len); - buffer = state->readRecordBuf + len; - gotlen = len; - - do - { - /* Calculate pointer to beginning of next page */ - targetPagePtr += XLOG_BLCKSZ; - - /* Wait for the next page to become available */ - readOff = ReadPageInternal(state, targetPagePtr, - Min(total_len - gotlen + SizeOfXLogShortPHD, - XLOG_BLCKSZ)); + /* Copy the buffered record header fragment */ + memcpy(state->readRecordBuf, &rechdr.bytes[0], rechdrsize); - if (readOff == XLREAD_WOULDBLOCK) - return XLREAD_WOULDBLOCK; - else if (readOff < 0) - goto err; + len = Min(len, total_len - rechdrsize); - Assert(SizeOfXLogShortPHD <= readOff); + /* Copy the remaining bytes of the page */ + memcpy(state->readRecordBuf + rechdrsize, contdata, len); - pageHeader = (XLogPageHeader) state->readBuf; + buffer = state->readRecordBuf + rechdrsize + len; + gotlen = rechdrsize + len; + + /* Finish the last bytes of this page when available */ + if (state->readLen < XLOG_BLCKSZ) + { + int buffered = state->readLen; + ReadPageData((state->readLen - SizeOfXLogShortPHD) + (total_len - gotlen), + gotlen, total_len, true); - /* - * If we were expecting a continuation record and got an - * "overwrite contrecord" flag, that means the continuation record - * was overwritten with a different record. Restart the read by - * assuming the address to read is the location where we found - * this flag; but keep track of the LSN of the record we were - * reading, for later verification. - */ - if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) - { - state->overwrittenRecPtr = RecPtr; - RecPtr = targetPagePtr; - goto restart; - } + if (pageHeader->xlp_rem_len < len) + len = pageHeader->xlp_rem_len; - /* Check that the continuation on next page looks valid */ - if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) - { - report_invalid_record(state, - "there is no contrecord flag at %X/%X", - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + buffer += buffered; + len -= buffered; - /* - * Cross-check that xlp_rem_len agrees with how much of the record - * we expect there to be left. - */ - if (pageHeader->xlp_rem_len == 0 || - total_len != (pageHeader->xlp_rem_len + gotlen)) - { - report_invalid_record(state, - "invalid contrecord length %u (expected %lld) at %X/%X", - pageHeader->xlp_rem_len, - ((long long) total_len) - gotlen, - LSN_FORMAT_ARGS(RecPtr)); - goto err; - } + memcpy(buffer, contdata, len); - /* Append the continuation from this page to the buffer */ - pageHeaderSize = XLogPageHeaderSize(pageHeader); + buffer += len; + gotlen += len; + } - if (readOff < pageHeaderSize) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize); + do + { + /* Calculate pointer to beginning of next page */ + targetPagePtr += XLOG_BLCKSZ; - Assert(pageHeaderSize <= readOff); + ReadPageData(total_len - gotlen, gotlen, total_len, true); - contdata = (char *) state->readBuf + pageHeaderSize; - len = XLOG_BLCKSZ - pageHeaderSize; if (pageHeader->xlp_rem_len < len) len = pageHeader->xlp_rem_len; - if (readOff < pageHeaderSize + len) - readOff = ReadPageInternal(state, targetPagePtr, - pageHeaderSize + len); - - memcpy(buffer, (char *) contdata, len); + memcpy(buffer, contdata, len); buffer += len; gotlen += len; - - /* If we just reassembled the record header, validate it. */ - if (!gotheader) - { - record = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, - record, randAccess)) - goto err; - gotheader = true; - } } while (gotlen < total_len); - Assert(gotheader); - record = (XLogRecord *) state->readRecordBuf; if (!ValidXLogRecord(state, record, RecPtr)) goto err; @@ -846,6 +953,8 @@ restart: else if (readOff < 0) goto err; + record = (XLogRecord *) (state->readBuf + targetRecOff); + /* Record does not cross a page boundary */ if (!ValidXLogRecord(state, record, RecPtr)) goto err; @@ -859,7 +968,7 @@ restart: * Special processing if it's an XLOG SWITCH record */ if (record->xl_rmid == RM_XLOG_ID && - record->xl_rminfo == XLOG_SWITCH) + XLogRecordGetRMInfo(record) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ state->NextRecPtr += state->segcxt.ws_segsize - 1; @@ -1116,12 +1225,13 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess) { - if (record->xl_tot_len < SizeOfXLogRecord) + uint32 reclen = XLogRecordGetLength(record); + if (reclen < MinXLogHeaderSize) { report_invalid_record(state, "invalid record length at %X/%X: wanted %u, got %u", LSN_FORMAT_ARGS(RecPtr), - (uint32) SizeOfXLogRecord, record->xl_tot_len); + (uint32) MinXLogHeaderSize, reclen); return false; } if (!RmgrIdIsValid(record->xl_rmid)) @@ -1184,16 +1294,28 @@ ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) /* Calculate the CRC */ INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); - /* include the record header last */ + COMP_CRC32C(crc, + ((char *) record) + offsetof(XLogRecord, xl_rmid), + XLogRecordGetLength(record) - offsetof(XLogRecord, xl_rmid)); + /* include the xl_prev pointer last */ COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); if (!EQ_CRC32C(record->xl_crc, crc)) { report_invalid_record(state, - "incorrect resource manager data checksum in record at %X/%X", - LSN_FORMAT_ARGS(recptr)); + "incorrect resource manager data checksum in record at %X/%X\n" + "xl_crc: record: %d; calculated: %d\n" + "xl_info: %x\n" + "xl_prev: %X/%X\n" + "xl_rmid: %X\n", + LSN_FORMAT_ARGS(recptr), + record->xl_crc, + crc, + record->xl_info, + LSN_FORMAT_ARGS(record->xl_prev), + record->xl_rmid); + Assert(false); return false; } @@ -1657,11 +1779,11 @@ DecodeXLogRecord(XLogReaderState *state, */ #define COPY_HEADER_FIELD(_dst, _size) \ do { \ - if (remaining < _size) \ + if (remaining < (_size)) \ goto shortdata_err; \ - memcpy(_dst, ptr, _size); \ - ptr += _size; \ - remaining -= _size; \ + memcpy((_dst), ptr, (_size)); \ + ptr += (_size); \ + remaining -= (_size); \ } while(0) char *ptr; @@ -1671,7 +1793,7 @@ DecodeXLogRecord(XLogReaderState *state, RelFileLocator *rlocator = NULL; uint8 block_id; - decoded->header = *record; + decoded->header = (XLRHeaderData) {0}; decoded->lsn = lsn; decoded->next = NULL; decoded->record_origin = InvalidRepOriginId; @@ -1679,11 +1801,53 @@ DecodeXLogRecord(XLogReaderState *state, decoded->main_data = NULL; decoded->main_data_len = 0; decoded->max_block_id = -1; + + decoded->header.xl_prev = record->xl_prev; + decoded->header.xl_crc = record->xl_crc; + decoded->header.xl_rmid = record->xl_rmid; + decoded->header.xl_info = record->xl_info; + decoded->header.xl_tot_len = XLogRecordGetLength(record); + + remaining = decoded->header.xl_tot_len - MinXLogHeaderSize; ptr = (char *) record; - ptr += SizeOfXLogRecord; - remaining = record->xl_tot_len - SizeOfXLogRecord; + ptr += MinXLogHeaderSize; + + switch (record->xl_info & XLR2_LEN_MASK) + { + case XLR2_LEN_ABSENT: + break; + case XLR2_LEN_1B: + ptr += 1; + remaining -= 1; + break; + case XLR2_LEN_2B: + ptr += 2; + remaining -= 2; + break; + case XLR2_LEN_4B: + ptr += 4; + remaining -= 4; + break; + default: + pg_unreachable(); + } + + if (record->xl_info & XLR_HAS_XID) + COPY_HEADER_FIELD(&decoded->header.xl_xid, sizeof(TransactionId)); + else + decoded->header.xl_xid = InvalidTransactionId; + + if (record->xl_info & XLR_HAS_CID) + COPY_HEADER_FIELD(&decoded->header.xl_cid, sizeof(CommandId)); + else + decoded->header.xl_cid = InvalidCommandId; + + if (record->xl_info & XLR_HAS_RMINFO) + COPY_HEADER_FIELD(&decoded->header.xl_rminfo, sizeof(uint8)); + else + decoded->header.xl_rminfo = 0; - /* Decode the headers */ + /* Decode the block headers */ datatotal = 0; while (remaining > datatotal) { @@ -1933,7 +2097,7 @@ DecodeXLogRecord(XLogReaderState *state, /* Report the actual size we used. */ decoded->size = MAXALIGN(out - (char *) decoded); - Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + Assert(DecodeXLogRecordRequiredSpace(decoded->header.xl_tot_len) >= decoded->size); return true; diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c index 96d12994b5..d8c380f694 100644 --- a/src/backend/access/transam/xlogrecovery.c +++ b/src/backend/access/transam/xlogrecovery.c @@ -384,7 +384,7 @@ static char recoveryStopName[MAXFNAMELEN]; static bool recoveryStopAfter; /* prototypes for local functions */ -static void ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI); +static void ApplyWalRecord(XLogReaderState *xlogreader, XLRHeaderData *record, TimeLineID *replayTLI); static void readRecoverySignalFile(void); static void validateRecoveryParameters(void); @@ -412,9 +412,9 @@ static void recoveryPausesHere(bool endOfRecovery); static bool recoveryApplyDelay(XLogReaderState *record); static void ConfirmRecoveryPaused(void); -static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher, - int emode, bool fetching_ckpt, - TimeLineID replayTLI); +static XLRHeaderData *ReadRecord(XLogPrefetcher *xlogprefetcher, + int emode, bool fetching_ckpt, + TimeLineID replayTLI); static int XLogPageRead(XLogReaderState *xlogreader, XLogRecPtr targetPagePtr, int reqLen, XLogRecPtr targetRecPtr, char *readBuf); @@ -426,8 +426,8 @@ static XLogPageReadResult WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, XLogRecPtr replayLSN, bool nonblocking); static int emode_for_corrupt_record(int emode, XLogRecPtr RecPtr); -static XLogRecord *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, - XLogRecPtr RecPtr, TimeLineID replayTLI); +static XLRHeaderData *ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, + XLogRecPtr RecPtr, TimeLineID replayTLI); static bool rescanLatestTimeLine(TimeLineID replayTLI, XLogRecPtr replayLSN); static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli, XLogSource source, bool notfoundOk); @@ -497,7 +497,7 @@ InitWalRecovery(ControlFileData *ControlFile, bool *wasShutdown_ptr, XLogPageReadPrivate *private; struct stat st; bool wasShutdown; - XLogRecord *record; + XLRHeaderData *record; DBState dbstate_at_startup; bool haveTblspcMap = false; bool haveBackupLabel = false; @@ -1566,7 +1566,7 @@ ShutdownWalRecovery(void) void PerformWalRecovery(void) { - XLogRecord *record; + XLRHeaderData *record; bool reachedRecoveryTarget = false; TimeLineID replayTLI; @@ -1811,7 +1811,7 @@ PerformWalRecovery(void) * Subroutine of PerformWalRecovery, to apply one WAL record. */ static void -ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *replayTLI) +ApplyWalRecord(XLogReaderState *xlogreader, XLRHeaderData *record, TimeLineID *replayTLI) { ErrorContextCallback errcallback; bool switchedTLI = false; @@ -3004,11 +3004,11 @@ ConfirmRecoveryPaused(void) * (emode must be either PANIC, LOG). In standby mode, retries until a valid * record is available. */ -static XLogRecord * +static XLRHeaderData * ReadRecord(XLogPrefetcher *xlogprefetcher, int emode, bool fetching_ckpt, TimeLineID replayTLI) { - XLogRecord *record; + XLRHeaderData *record; XLogReaderState *xlogreader = XLogPrefetcherGetReader(xlogprefetcher); XLogPageReadPrivate *private = (XLogPageReadPrivate *) xlogreader->private_data; @@ -3912,11 +3912,11 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr) /* * Subroutine to try to fetch and validate a prior checkpoint record. */ -static XLogRecord * +static XLRHeaderData * ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, TimeLineID replayTLI) { - XLogRecord *record; + XLRHeaderData *record; uint8 rminfo; Assert(xlogreader != NULL); @@ -3951,7 +3951,10 @@ ReadCheckpointRecord(XLogPrefetcher *xlogprefetcher, XLogRecPtr RecPtr, (errmsg("invalid xl_info in checkpoint record"))); return NULL; } - if (record->xl_tot_len != SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint)) + if (record->xl_tot_len != (MinXLogHeaderSize + + sizeof(uint8) /* length field */ + + SizeOfXLogRecordDataHeaderShort + + sizeof(CheckPoint))) { ereport(LOG, (errmsg("invalid length of checkpoint record"))); diff --git a/src/backend/catalog/storage.c b/src/backend/catalog/storage.c index 4ef46a1855..fa4be2c68b 100644 --- a/src/backend/catalog/storage.c +++ b/src/backend/catalog/storage.c @@ -194,7 +194,8 @@ log_smgrcreate(const RelFileLocator *rlocator, ForkNumber forkNum) XLogBeginInsert(); XLogRegisterData((char *) &xlrec, sizeof(xlrec)); - XLogInsertExtended(RM_SMGR_ID, XLR_SPECIAL_REL_UPDATE, XLOG_SMGR_CREATE); + XLogInsertExtended(RM_SMGR_ID, XLR_SPECIAL_REL_UPDATE | XLR_HAS_XID, + XLOG_SMGR_CREATE, InvalidCommandId); } /* @@ -376,8 +377,9 @@ RelationTruncate(Relation rel, BlockNumber nblocks) XLogRegisterData((char *) &xlrec, sizeof(xlrec)); lsn = XLogInsertExtended(RM_SMGR_ID, - XLR_SPECIAL_REL_UPDATE, - XLOG_SMGR_TRUNCATE); + XLR_SPECIAL_REL_UPDATE | XLR_HAS_XID, + XLOG_SMGR_TRUNCATE, + InvalidCommandId); /* * Flush, because otherwise the truncation of the main relation might diff --git a/src/backend/commands/dbcommands.c b/src/backend/commands/dbcommands.c index 25c4672917..fea070d768 100644 --- a/src/backend/commands/dbcommands.c +++ b/src/backend/commands/dbcommands.c @@ -624,9 +624,8 @@ CreateDatabaseUsingFileCopy(Oid src_dboid, Oid dst_dboid, Oid src_tsid, XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_file_copy_rec)); - (void) XLogInsertExtended(RM_DBASE_ID, - XLR_SPECIAL_REL_UPDATE, - XLOG_DBASE_CREATE_FILE_COPY); + (void) XLogInsertExtended(RM_DBASE_ID, XLR_SPECIAL_REL_UPDATE, + XLOG_DBASE_CREATE_FILE_COPY, InvalidCommandId); } pfree(srcpath); pfree(dstpath); @@ -2022,9 +2021,8 @@ movedb(const char *dbname, const char *tblspcname) XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_create_file_copy_rec)); - (void) XLogInsertExtended(RM_DBASE_ID, - XLR_SPECIAL_REL_UPDATE, - XLOG_DBASE_CREATE_FILE_COPY); + (void) XLogInsertExtended(RM_DBASE_ID, XLR_SPECIAL_REL_UPDATE, + XLOG_DBASE_CREATE_FILE_COPY, InvalidCommandId); } /* @@ -2117,9 +2115,8 @@ movedb(const char *dbname, const char *tblspcname) XLogRegisterData((char *) &xlrec, sizeof(xl_dbase_drop_rec)); XLogRegisterData((char *) &src_tblspcoid, sizeof(Oid)); - (void) XLogInsertExtended(RM_DBASE_ID, - XLR_SPECIAL_REL_UPDATE, - XLOG_DBASE_DROP); + (void) XLogInsertExtended(RM_DBASE_ID, XLR_SPECIAL_REL_UPDATE, + XLOG_DBASE_DROP, InvalidCommandId); } /* Now it's safe to release the database lock */ @@ -2837,9 +2834,8 @@ remove_dbtablespaces(Oid db_id) XLogRegisterData((char *) &xlrec, MinSizeOfDbaseDropRec); XLogRegisterData((char *) tablespace_ids, ntblspc * sizeof(Oid)); - (void) XLogInsertExtended(RM_DBASE_ID, - XLR_SPECIAL_REL_UPDATE, - XLOG_DBASE_DROP); + (void) XLogInsertExtended(RM_DBASE_ID, XLR_SPECIAL_REL_UPDATE, + XLOG_DBASE_DROP, InvalidCommandId); } list_free(ltblspc); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index d6abeb9a9d..7cbbb9b73a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -200,7 +200,10 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ParseCommitRecord(XLogRecGetRmInfo(buf->record), xlrec, &parsed); if (!TransactionIdIsValid(parsed.twophase_xid)) + { + Assert(info == XLOG_XACT_COMMIT); xid = XLogRecGetXid(r); + } else xid = parsed.twophase_xid; @@ -228,7 +231,10 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ParseAbortRecord(XLogRecGetRmInfo(buf->record), xlrec, &parsed); if (!TransactionIdIsValid(parsed.twophase_xid)) + { + Assert(info == XLOG_XACT_ABORT); xid = XLogRecGetXid(r); + } else xid = parsed.twophase_xid; diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 625a7f4273..6faa519a35 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -600,7 +600,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) /* Wait for a consistent starting point */ for (;;) { - XLogRecord *record; + XLRHeaderData *record; char *err = NULL; /* the read_page callback waits for new WAL */ diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 7fa2b2cba7..a2a13c8ae4 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -256,7 +256,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* Decode until we run out of records */ while (ctx->reader->EndRecPtr < end_of_wal) { - XLogRecord *record; + XLRHeaderData *record; char *errm = NULL; record = XLogReadRecord(ctx->reader, &errm); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ca945994ef..6ffecd0fcc 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -495,7 +495,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) while (ctx->reader->EndRecPtr < moveto) { char *errm = NULL; - XLogRecord *record; + XLRHeaderData *record; /* * Read records. No changes are generated in fast_forward mode, diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index e9ba500a15..8129a2552b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -3019,7 +3019,7 @@ retry: static void XLogSendLogical(void) { - XLogRecord *record; + XLRHeaderData *record; char *errm; /* diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c index eb5782f82a..78f17b5d09 100644 --- a/src/backend/utils/cache/inval.c +++ b/src/backend/utils/cache/inval.c @@ -1632,6 +1632,7 @@ LogLogicalInvalidations(void) ProcessMessageSubGroupMulti(group, RelCacheMsgs, XLogRegisterData((char *) msgs, n * sizeof(SharedInvalidationMessage))); - XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS); + XLogInsertExtended(RM_XACT_ID, XLR_HAS_XID, + XLOG_XACT_INVALIDATIONS, InvalidCommandId); } } diff --git a/src/bin/pg_resetwal/pg_resetwal.c b/src/bin/pg_resetwal/pg_resetwal.c index 089063f471..380d0a260c 100644 --- a/src/bin/pg_resetwal/pg_resetwal.c +++ b/src/bin/pg_resetwal/pg_resetwal.c @@ -1046,6 +1046,9 @@ WriteEmptyXLOG(void) int fd; int nbytes; char *recptr; + const uint32 rec_tot_len = MinXLogHeaderSize + sizeof(uint8) /* xl_tot_len */ + + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); + const uint8 rec_len = rec_tot_len; memset(buffer.data, 0, XLOG_BLCKSZ); @@ -1060,23 +1063,33 @@ WriteEmptyXLOG(void) longpage->xlp_seg_size = WalSegSz; longpage->xlp_xlog_blcksz = XLOG_BLCKSZ; + /* + * Ensure that the length actually fits within the 8-bit length field in + * the header. + */ + Assert(rec_tot_len < UINT8_MAX); + /* Insert the initial checkpoint record */ recptr = (char *) page + SizeOfXLogLongPHD; record = (XLogRecord *) recptr; record->xl_prev = 0; - record->xl_xid = InvalidTransactionId; - record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint); - record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; + record->xl_info = XLR2_LEN_1B; record->xl_rmid = RM_XLOG_ID; - recptr += SizeOfXLogRecord; + recptr += MinXLogHeaderSize; + + memcpy(recptr, (char *) &rec_len, sizeof(uint8)); + recptr += sizeof(uint8); + *(recptr++) = (char) XLR_BLOCK_ID_DATA_SHORT; *(recptr++) = sizeof(CheckPoint); memcpy(recptr, &ControlFile.checkPointCopy, sizeof(CheckPoint)); INIT_CRC32C(crc); - COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); + COMP_CRC32C(crc, + ((char *) record) + offsetof(XLogRecord, xl_rmid), + rec_len - offsetof(XLogRecord, xl_rmid)); COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); record->xl_crc = crc; diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 132c4db65e..fc2dccb63c 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -66,7 +66,7 @@ void extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex, XLogRecPtr endpoint, const char *restoreCommand) { - XLogRecord *record; + XLRHeaderData *record; XLogReaderState *xlogreader; char *errormsg; XLogPageReadPrivate private; @@ -124,7 +124,7 @@ XLogRecPtr readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex, const char *restoreCommand) { - XLogRecord *record; + XLRHeaderData *record; XLogReaderState *xlogreader; char *errormsg; XLogPageReadPrivate private; @@ -170,7 +170,7 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex, XLogRecPtr *lastchkptredo, const char *restoreCommand) { /* Walk backwards, starting from the given record */ - XLogRecord *record; + XLRHeaderData *record; XLogRecPtr searchptr; XLogReaderState *xlogreader; char *errormsg; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 70886beedd..a821d7b40a 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -696,7 +696,7 @@ main(int argc, char **argv) XLogDumpPrivate private; XLogDumpConfig config; XLogStats stats; - XLogRecord *record; + XLRHeaderData *record; XLogRecPtr first_record; char *waldir = NULL; char *errormsg; diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h index cfe53c7175..f41bca6cea 100644 --- a/src/include/access/xloginsert.h +++ b/src/include/access/xloginsert.h @@ -42,7 +42,7 @@ extern void XLogBeginInsert(void); extern void XLogSetRecordFlags(uint8 flags); extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 rminfo); -extern XLogRecPtr XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo); +extern XLogRecPtr XLogInsertExtended(RmgrId rmid, uint8 info, uint8 rminfo, CommandId cid); extern void XLogEnsureRecordSpace(int max_block_id, int ndatas); extern void XLogRegisterData(char *data, uint32 len); extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags); diff --git a/src/include/access/xlogprefetcher.h b/src/include/access/xlogprefetcher.h index fdd67fcedd..fbfd9e02a4 100644 --- a/src/include/access/xlogprefetcher.h +++ b/src/include/access/xlogprefetcher.h @@ -47,8 +47,8 @@ extern XLogReaderState *XLogPrefetcherGetReader(XLogPrefetcher *prefetcher); extern void XLogPrefetcherBeginRead(XLogPrefetcher *prefetcher, XLogRecPtr recPtr); -extern XLogRecord *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, - char **errmsg); +extern XLRHeaderData *XLogPrefetcherReadRecord(XLogPrefetcher *prefetcher, + char **errmsg); extern void XLogPrefetcherComputeStats(XLogPrefetcher *prefetcher); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index fb6cae08ad..9a31aec414 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -163,7 +163,7 @@ typedef struct DecodedXLogRecord /* Public members. */ XLogRecPtr lsn; /* location */ XLogRecPtr next_lsn; /* location of next record */ - XLogRecord header; /* header */ + XLRHeaderData header; /* header */ RepOriginId record_origin; TransactionId toplevel_xid; /* XID of top-level transaction */ char *main_data; /* record's main data portion */ @@ -355,8 +355,8 @@ typedef enum XLogPageReadResult } XLogPageReadResult; /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ -extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, - char **errormsg); +extern struct XLRHeaderData *XLogReadRecord(XLogReaderState *state, + char **errormsg); /* Consume the next record or error. */ extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 17093d93b6..94125f9878 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -19,7 +19,11 @@ /* * The overall layout of an XLOG record is: - * Fixed-size header (XLogRecord struct) + * Fixed-size header (XLogRecord struct) + variable-sized header data: + * - xl_cid (0 or 4 bytes) + * - xl_cid (0 or 4 bytes) + * - xl_rmgr_flags (0 or 1 byte) + * - xl_len (0, 1, 2 or 4 bytes) * XLogRecordBlockHeader struct * XLogRecordBlockHeader struct * ... @@ -37,23 +41,97 @@ * The XLogRecordBlockHeader, XLogRecordDataHeaderShort and * XLogRecordDataHeaderLong structs all begin with a single 'id' byte. It's * used to distinguish between block references, and the main data structs. + * + * The smallest size that XLogRecord header takes up is now 14 bytes: 8 bytes + * in xl_prev, 4 in checksum, and 1 in xl_rmid and xl_info each, while the + * max-sized xlog header now takes up 27 bytes; with 4 bytes each in + * xl_tot_len, xl_xid and xl_cid, plus one in xl_rminfo. */ -typedef struct XLogRecord -{ - uint32 xl_tot_len; /* total len of entire record */ - TransactionId xl_xid; /* xact id */ - XLogRecPtr xl_prev; /* ptr to previous record in log */ - uint8 xl_info; /* flag bits, see below */ - RmgrId xl_rmid; /* resource manager for this record */ - uint8 xl_rminfo; /* flag bits for rmgr use */ - /* 1 byte of padding here, initialize to zero */ - pg_crc32c xl_crc; /* CRC for this record */ - - /* XLogRecordBlockHeaders and XLogRecordDataHeader follow, no padding */ +typedef struct XLogRecord { + XLogRecPtr xl_prev; + pg_crc32c xl_crc; + RmgrId xl_rmid; + /* Flags for record handling and variable-length header fields */ + uint8 xl_info; + /* + * Without padding: + * - depending on flags, length field follows (0, 1, 2 or 4 bytes) + * - if HAS_XID, TransactionId follows + * - if HAS_CID, CommandID follows + * - if HAS_RMINFO, uint8 with rminfo flags follows + * - XLogRecordBlockHeaders and XLogRecordDataHeader follow + */ } XLogRecord; -#define SizeOfXLogRecord (offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32c)) +/* + * + */ +typedef struct XLRHeaderData { + XLogRecPtr xl_prev; + pg_crc32c xl_crc; + RmgrId xl_rmid; + uint8 xl_info; + TransactionId xl_xid; + CommandId xl_cid; + uint8 xl_rminfo; + uint32 xl_tot_len; +} XLRHeaderData; + +#define MinXLogHeaderSize ( \ + offsetof(XLogRecord, xl_info) \ + + sizeof(uint8) /* xl_info */ \ +) + +#define MaxXLogHeaderSize ( \ + MinXLogHeaderSize \ + + sizeof(TransactionId) /* xl_xid */ \ + + sizeof(CommandId) /* xl_cid */ \ + + sizeof(uint8) /* xl_rminfo */ \ + + sizeof(uint32) /* xl_len */ \ +) + +/* + * Mask for getting the size of the length field + */ +#define XLR2_LEN_MASK (0x03) + +/* + * IFF the record does not contain any registered data, the length field will + * be absent (as the size of a plain record is knowable from just the + * fixed-size struct's flags) + */ +#define XLR2_LEN_ABSENT 0x00 +/* + * Size of the xlog record is <= 255 bytes + */ +#define XLR2_LEN_1B 0x01 +/* + * Size of the xlog record is <= (2^16 - 1) + */ +#define XLR2_LEN_2B 0x02 +/* + * Size of the xlog record is <= (2^32 - 1) + */ +#define XLR2_LEN_4B 0x03 + +/* + * Does this record contain an XID? This must be included if the data has + * transactional visibility. + */ +#define XLR_HAS_XID 0x04 + +/* + * Doest this record contain a CID? This must be included if the data has + * transactional visibility, and remote snapshot transfer support is enabled. + */ +#define XLR_HAS_CID 0x08 + +/* + * If the redo manager needs non-zero bits in the header to discern different + * types of WAL records, this flag is set. + */ +#define XLR_HAS_RMINFO 0x10 /* * If a WAL record modifies any relation files, in ways not covered by the @@ -61,7 +139,7 @@ typedef struct XLogRecord * by PostgreSQL itself, but it allows external tools that read WAL and keep * track of modified blocks to recognize such special record types. */ -#define XLR_SPECIAL_REL_UPDATE 0x01 +#define XLR_SPECIAL_REL_UPDATE 0x20 /* * Enforces consistency checks of replayed WAL at recovery. If enabled, @@ -70,7 +148,68 @@ typedef struct XLogRecord * of XLogInsert can use this value if necessary, but if * wal_consistency_checking is enabled for a rmgr this is set unconditionally. */ -#define XLR_CHECK_CONSISTENCY 0x02 +#define XLR_CHECK_CONSISTENCY 0x40 + +#define XLR_INFO_USERFLAGS ( \ + XLR_HAS_XID \ + | XLR_HAS_CID \ + | XLR_SPECIAL_REL_UPDATE \ + | XLR_CHECK_CONSISTENCY \ +) + +#define XLRSizeOfHeader(infomask) ( \ + MinXLogHeaderSize \ + + ((1 << ((infomask) & XLR2_LEN_MASK)) >> 1) \ + + (((infomask) & XLR_HAS_XID) ? sizeof(TransactionId) : 0) \ + + (((infomask) & XLR_HAS_CID) ? sizeof(CommandId) : 0) \ + + (((infomask) & XLR_HAS_RMINFO) ? sizeof(uint8) : 0) \ +) + +static inline uint32 +XLogRecordGetLength(XLogRecord *record) +{ + char *lenptr = (char *) record; + uint8 len8; + uint16 len16; + uint32 len32; + + lenptr += MinXLogHeaderSize; + + switch ((record->xl_info) & XLR2_LEN_MASK) { + case XLR2_LEN_ABSENT: + return XLRSizeOfHeader(record->xl_info); + case XLR2_LEN_1B: + memcpy(&len8, lenptr, sizeof(uint8)); + return (uint32) len8; + case XLR2_LEN_2B: + memcpy(&len16, lenptr, sizeof(uint16)); + return (uint32) len16; + case XLR2_LEN_4B: + memcpy(&len32, lenptr, sizeof(uint32)); + return (uint32) len32; + default: + pg_unreachable(); + } +} + +static inline int8 +XLogRecordGetRMInfo(XLogRecord *record) +{ + int infooff = MinXLogHeaderSize; + + if (!(record->xl_info & XLR_HAS_RMINFO)) + return 0; + + infooff += (1 << (record->xl_info & XLR2_LEN_MASK)) >> 1; + + if (record->xl_info & XLR_HAS_XID) + infooff += sizeof(TransactionId); + + if (record->xl_info & XLR_HAS_CID) + infooff += sizeof(CommandId); + + return *(((uint8 *) record) + infooff); +} /* * Header info for block data appended to an XLOG record. @@ -145,7 +284,7 @@ typedef struct XLogRecordBlockImageHeader #define BKPIMAGE_COMPRESS_ZSTD 0x10 #define BKPIMAGE_COMPRESSED(info) \ - ((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \ + (((info) & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \ BKPIMAGE_COMPRESS_ZSTD)) != 0) /* diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index 9f644a0c1b..dfafbc36df 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -69,7 +69,12 @@ ignore: random # aggregates depends on create_aggregate # join depends on create_misc # ---------- -test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update delete namespace prepared_xacts +test: select_into select_distinct select_distinct_on select_implicit select_having subselect union case join aggregates transactions random portals arrays btree_index hash_index update delete namespace + +# ---------- +# Another group of parallel tests +# ---------- +# test: prepared_xacts # ---------- # Another group of parallel tests -- 2.30.2