From f8239516407569e1e4b4c96507975f02dd9400ce Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Fri, 5 Jun 2020 09:03:16 +0530 Subject: [PATCH v27] Immediately WAL-log subtransaction and top-level XID association. The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So we also write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead). However, we can not remove the existing XLOG_XACT_ASSIGNMENT WAL as that is required for avoiding overflow in the hot standby snapshot. Author: Tomas Vondra, Dilip Kumar, Amit Kapila Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com --- src/backend/access/transam/xact.c | 50 ++++++++++++++++++++++++++++++++ src/backend/access/transam/xloginsert.c | 23 +++++++++++++-- src/backend/access/transam/xlogreader.c | 5 ++++ src/backend/replication/logical/decode.c | 44 ++++++++++++++-------------- src/include/access/xact.h | 3 ++ src/include/access/xlog.h | 1 + src/include/access/xlogreader.h | 3 ++ src/include/access/xlogrecord.h | 1 + 8 files changed, 107 insertions(+), 23 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index cd30b62..04fd5ca 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -191,6 +191,7 @@ typedef struct TransactionStateData bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to top-level XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -223,6 +224,7 @@ typedef struct SerializedTransactionState static TransactionStateData TopTransactionStateData = { .state = TRANS_DEFAULT, .blockState = TBLOCK_DEFAULT, + .assigned = false, }; /* @@ -5118,6 +5120,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -6020,3 +6023,50 @@ xact_redo(XLogReaderState *record) else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This is used to decide whether we need to WAL log the top-level XID for + * operation in a subtransaction. We require that for logical decoding, see + * LogicalDecodingProcessRecord. + * + * This returns true if wal_level >= logical and we are inside a valid + * subtransaction, for which the assignment was not yet written to any WAL + * record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + /* wal_level has to be logical */ + if (!XLogLogicalInfoActive()) + return false; + + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it should not be already 'assigned' */ + return !CurrentTransactionState->assigned; +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; +} diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b21679f..c526bb1 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -89,11 +89,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfXLogTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfXLogTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -195,6 +197,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -398,7 +404,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -748,6 +754,19 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogResetInsertion) */ + XLogSetRecordFlags(XLOG_INCLUDE_XID); + + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 5995798..560ec27 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1195,6 +1195,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1233,6 +1234,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c2e5e3a..0c0c371 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -94,11 +94,27 @@ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { XLogRecordBuffer buf; + TransactionId txid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; buf.record = record; + txid = XLogRecGetTopXid(record); + + /* + * If the top-level xid is valid, we need to assign the subxact to the + * top-level xact. We need to do this for all records, hence we do it + * before the switch. + */ + if (TransactionIdIsValid(txid)) + { + ReorderBufferAssignChild(ctx->reorder, + txid, + record->decoded_record->xl_xid, + buf.origptr); + } + /* cast so we get a warning when new rmgrs are added */ switch ((RmgrId) XLogRecGetRmid(record)) { @@ -216,13 +232,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If the snapshot isn't yet fully built, we cannot decode anything, so * bail out. - * - * However, it's critical to process XLOG_XACT_ASSIGNMENT records even - * when the snapshot is being built: it is possible to get later records - * that require subxids to be properly assigned. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && - info != XLOG_XACT_ASSIGNMENT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; switch (info) @@ -264,22 +275,13 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) break; } case XLOG_XACT_ASSIGNMENT: - { - xl_xact_assignment *xlrec; - int i; - TransactionId *sub_xid; - xlrec = (xl_xact_assignment *) XLogRecGetData(r); - - sub_xid = &xlrec->xsub[0]; - - for (i = 0; i < xlrec->nsubxacts; i++) - { - ReorderBufferAssignChild(reorder, xlrec->xtop, - *(sub_xid++), buf->origptr); - } - break; - } + /* + * We assign subxact to the toplevel xact while processing each + * record if required. So, we don't need to do anything here. + * See LogicalDecodingProcessRecord. + */ + break; case XLOG_XACT_PREPARE: /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7ee04ba..8645b38 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -428,6 +428,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); +extern bool IsSubTransactionAssignmentPending(void); +extern void MarkSubTransactionAssigned(void); + extern int xactGetCommittedChildren(TransactionId **ptr); extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index e917dfe..05cc2b6 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -237,6 +237,7 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +#define XLOG_INCLUDE_XID 0x04 /* include XID of top-level xact */ /* Checkpoint statistics */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d930fe9..24a4c44 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -191,6 +191,8 @@ struct XLogReaderState RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -308,6 +310,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index acd9af0..2f0c8bf 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_ORIGIN 253 +#define XLR_BLOCK_ID_TOPLEVEL_XID 252 #endif /* XLOGRECORD_H */ -- 1.8.3.1