From f70b1e79f8dcb23f1acffd5a29758e7952e61d9c Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Thu, 26 Sep 2019 17:07:31 +0200 Subject: [PATCH 02/13] Immediately WAL-log assignments The logical decoding infrastructure needs to know which top-level transaction the subxact belongs to, in order to decode all the changes. Until now that might be delayed until commit, due to the caching (GPROC_MAX_CACHED_SUBXIDS), preventing features requiring incremental decoding. So instead we write the assignment info into WAL immediately, as part of the next WAL record (to minimize overhead). --- src/backend/access/rmgrdesc/xactdesc.c | 26 ------ src/backend/access/transam/xact.c | 152 +++++++++---------------------- src/backend/access/transam/xlog.c | 2 - src/backend/access/transam/xloginsert.c | 22 ++++- src/backend/access/transam/xlogreader.c | 5 + src/backend/replication/logical/decode.c | 39 ++++---- src/include/access/xact.h | 15 +-- src/include/access/xlog.h | 2 + src/include/access/xlogreader.h | 3 + src/include/access/xlogrecord.h | 1 + src/tools/pgindent/typedefs.list | 1 - 11 files changed, 96 insertions(+), 172 deletions(-) diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c index a61f38d..66fc8fb 100644 --- a/src/backend/access/rmgrdesc/xactdesc.c +++ b/src/backend/access/rmgrdesc/xactdesc.c @@ -293,17 +293,6 @@ xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec) } } -static void -xact_desc_assignment(StringInfo buf, xl_xact_assignment *xlrec) -{ - int i; - - appendStringInfoString(buf, "subxacts:"); - - for (i = 0; i < xlrec->nsubxacts; i++) - appendStringInfo(buf, " %u", xlrec->xsub[i]); -} - void xact_desc(StringInfo buf, XLogReaderState *record) { @@ -323,18 +312,6 @@ xact_desc(StringInfo buf, XLogReaderState *record) xact_desc_abort(buf, XLogRecGetInfo(record), xlrec); } - else if (info == XLOG_XACT_ASSIGNMENT) - { - xl_xact_assignment *xlrec = (xl_xact_assignment *) rec; - - /* - * Note that we ignore the WAL record's xid, since we're more - * interested in the top-level xid that issued the record and which - * xids are being reported here. - */ - appendStringInfo(buf, "xtop %u: ", xlrec->xtop); - xact_desc_assignment(buf, xlrec); - } } const char * @@ -359,9 +336,6 @@ xact_identify(uint8 info) case XLOG_XACT_ABORT_PREPARED: id = "ABORT_PREPARED"; break; - case XLOG_XACT_ASSIGNMENT: - id = "ASSIGNMENT"; - break; } return id; diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 9162286..33141fb 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -188,9 +188,9 @@ typedef struct TransactionStateData int prevSecContext; /* previous SecurityRestrictionContext */ bool prevXactReadOnly; /* entry-time xact r/o state */ bool startedInRecovery; /* did we start in recovery? */ - bool didLogXid; /* has xid been included in WAL record? */ int parallelModeLevel; /* Enter/ExitParallelMode counter */ bool chain; /* start a new block after this one */ + bool assigned; /* assigned to toplevel XID */ struct TransactionStateData *parent; /* back link to parent */ } TransactionStateData; @@ -225,13 +225,6 @@ static TransactionStateData TopTransactionStateData = { .blockState = TBLOCK_DEFAULT, }; -/* - * unreportedXids holds XIDs of all subtransactions that have not yet been - * reported in an XLOG_XACT_ASSIGNMENT record. - */ -static int nUnreportedXids; -static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS]; - static TransactionState CurrentTransactionState = &TopTransactionStateData; /* @@ -502,19 +495,6 @@ GetCurrentFullTransactionIdIfAny(void) } /* - * MarkCurrentTransactionIdLoggedIfAny - * - * Remember that the current xid - if it is assigned - now has been wal logged. - */ -void -MarkCurrentTransactionIdLoggedIfAny(void) -{ - if (FullTransactionIdIsValid(CurrentTransactionState->fullTransactionId)) - CurrentTransactionState->didLogXid = true; -} - - -/* * GetStableLatestTransactionId * * Get the transaction's XID if it has one, else read the next-to-be-assigned @@ -555,7 +535,6 @@ AssignTransactionId(TransactionState s) { bool isSubXact = (s->parent != NULL); ResourceOwner currentOwner; - bool log_unknown_top = false; /* Assert that caller didn't screw up */ Assert(!FullTransactionIdIsValid(s->fullTransactionId)); @@ -598,20 +577,6 @@ AssignTransactionId(TransactionState s) } /* - * When wal_level=logical, guarantee that a subtransaction's xid can only - * be seen in the WAL stream if its toplevel xid has been logged before. - * If necessary we log an xact_assignment record with fewer than - * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set - * for a transaction even though it appears in a WAL record, we just might - * superfluously log something. That can happen when an xid is included - * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in - * xl_standby_locks. - */ - if (isSubXact && XLogLogicalInfoActive() && - !TopTransactionStateData.didLogXid) - log_unknown_top = true; - - /* * Generate a new FullTransactionId and record its xid in PG_PROC and * pg_subtrans. * @@ -646,59 +611,6 @@ AssignTransactionId(TransactionState s) XactLockTableInsert(XidFromFullTransactionId(s->fullTransactionId)); CurrentResourceOwner = currentOwner; - - /* - * Every PGPROC_MAX_CACHED_SUBXIDS assigned transaction ids within each - * top-level transaction we issue a WAL record for the assignment. We - * include the top-level xid and all the subxids that have not yet been - * reported using XLOG_XACT_ASSIGNMENT records. - * - * This is required to limit the amount of shared memory required in a hot - * standby server to keep track of in-progress XIDs. See notes for - * RecordKnownAssignedTransactionIds(). - * - * We don't keep track of the immediate parent of each subxid, only the - * top-level transaction that each subxact belongs to. This is correct in - * recovery only because aborted subtransactions are separately WAL - * logged. - * - * This is correct even for the case where several levels above us didn't - * have an xid assigned as we recursed up to them beforehand. - */ - if (isSubXact && XLogStandbyInfoActive()) - { - unreportedXids[nUnreportedXids] = XidFromFullTransactionId(s->fullTransactionId); - nUnreportedXids++; - - /* - * ensure this test matches similar one in - * RecoverPreparedTransactions() - */ - if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS || - log_unknown_top) - { - xl_xact_assignment xlrec; - - /* - * xtop is always set by now because we recurse up transaction - * stack to the highest unassigned xid and then come back down - */ - xlrec.xtop = GetTopTransactionId(); - Assert(TransactionIdIsValid(xlrec.xtop)); - xlrec.nsubxacts = nUnreportedXids; - - XLogBeginInsert(); - XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment); - XLogRegisterData((char *) unreportedXids, - nUnreportedXids * sizeof(TransactionId)); - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT); - - nUnreportedXids = 0; - /* mark top, not current xact as having been logged */ - TopTransactionStateData.didLogXid = true; - } - } } /* @@ -1792,13 +1704,6 @@ AtSubAbort_childXids(void) s->childXids = NULL; s->nChildXids = 0; s->maxChildXids = 0; - - /* - * We could prune the unreportedXids array here. But we don't bother. That - * would potentially reduce number of XLOG_XACT_ASSIGNMENT records but it - * would likely introduce more CPU time into the more common paths, so we - * choose not to do that. - */ } /* ---------------------------------------------------------------- @@ -1963,12 +1868,6 @@ StartTransaction(void) currentCommandIdUsed = false; /* - * initialize reported xid accounting - */ - nUnreportedXids = 0; - s->didLogXid = false; - - /* * must initialize resource-management stuff first */ AtStart_Memory(); @@ -5095,6 +4994,7 @@ PushTransaction(void) GetUserIdAndSecContext(&s->prevUser, &s->prevSecContext); s->prevXactReadOnly = XactReadOnly; s->parallelModeLevel = 0; + s->assigned = false; CurrentTransactionState = s; @@ -5990,14 +5890,46 @@ xact_redo(XLogReaderState *record) XLogRecGetOrigin(record)); LWLockRelease(TwoPhaseStateLock); } - else if (info == XLOG_XACT_ASSIGNMENT) - { - xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); - - if (standbyState >= STANDBY_INITIALIZED) - ProcArrayApplyXidAssignment(xlrec->xtop, - xlrec->nsubxacts, xlrec->xsub); - } else elog(PANIC, "xact_redo: unknown op code %u", info); } + +/* + * IsSubTransactionAssignmentPending + * + * This returns true if we are inside a valid substransaction, for which + * the assignment was not yet written to any WAL record. + */ +bool +IsSubTransactionAssignmentPending(void) +{ + /* we need to be in a transaction state */ + if (!IsTransactionState()) + return false; + + /* it has to be a subtransaction */ + if (!IsSubTransaction()) + return false; + + /* the subtransaction has to have a XID assigned */ + if (!TransactionIdIsValid(GetCurrentTransactionIdIfAny())) + return false; + + /* and it needs to have 'assigned' */ + return !CurrentTransactionState->assigned; + +} + +/* + * MarkSubTransactionAssigned + * + * Mark the subtransaction assignment as completed. + */ +void +MarkSubTransactionAssigned(void) +{ + Assert(IsSubTransactionAssignmentPending()); + + CurrentTransactionState->assigned = true; + +} diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 790e2c8..b1daa05 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1121,8 +1121,6 @@ XLogInsertRecord(XLogRecData *rdata, */ WALInsertLockRelease(); - MarkCurrentTransactionIdLoggedIfAny(); - END_CRIT_SECTION(); /* diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 3ec67d4..15ce79c 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -88,11 +88,13 @@ static XLogRecData hdr_rdt; static char *hdr_scratch = NULL; #define SizeOfXlogOrigin (sizeof(RepOriginId) + sizeof(char)) +#define SizeOfTransactionId (sizeof(TransactionId) + sizeof(char)) #define HEADER_SCRATCH_SIZE \ (SizeOfXLogRecord + \ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ - SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin) + SizeOfXLogRecordDataHeaderLong + SizeOfXlogOrigin + \ + SizeOfTransactionId) /* * An array of XLogRecData structs, to hold registered data. @@ -194,6 +196,10 @@ XLogResetInsertion(void) { int i; + /* reset the subxact assignment flag (if needed) */ + if (curinsert_flags & XLOG_INCLUDE_XID) + MarkSubTransactionAssigned(); + for (i = 0; i < max_registered_block_id; i++) registered_buffers[i].in_use = false; @@ -397,7 +403,7 @@ void XLogSetRecordFlags(uint8 flags) { Assert(begininsert_called); - curinsert_flags = flags; + curinsert_flags |= flags; } /* @@ -743,6 +749,18 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(replorigin_session_origin); } + /* followed by toplevel XID, if not already included in previous record */ + if (IsSubTransactionAssignmentPending()) + { + TransactionId xid = GetTopTransactionIdIfAny(); + + /* update the flag (later used by XLogInsertRecord) */ + curinsert_flags |= XLOG_INCLUDE_XID; + *(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID; + memcpy(scratch, &xid, sizeof(TransactionId)); + scratch += sizeof(TransactionId); + } + /* followed by main data, if any */ if (mainrdata_len > 0) { diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index c8b0d23..3b02fbf 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -1072,6 +1072,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) state->decoded_record = record; state->record_origin = InvalidRepOriginId; + state->toplevel_xid = InvalidTransactionId; ptr = (char *) record; ptr += SizeOfXLogRecord; @@ -1110,6 +1111,10 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) { COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); } + else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) + { + COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + } else if (block_id <= XLR_MAX_BLOCK_ID) { /* XLogRecordBlockHeader */ diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c53e7e2..ff74c65 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -96,12 +96,28 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { - XLogRecordBuffer buf; + XLogRecordBuffer buf; + TransactionId txid; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; buf.record = record; + txid = XLogRecGetTopXid(record); + + /* + * If the toplevel_xid is valid, we need to assign the subxact to the + * toplevel transaction. We need to do this for all records, hence we + * do it before the switch. + */ + if (TransactionIdIsValid(txid)) + { + ReorderBufferAssignChild(ctx->reorder, + record->toplevel_xid, + record->decoded_record->xl_xid, + buf.origptr); + } + /* cast so we get a warning when new rmgrs are added */ switch ((RmgrIds) XLogRecGetRmid(record)) { @@ -220,12 +236,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * If the snapshot isn't yet fully built, we cannot decode anything, so * bail out. * - * However, it's critical to process XLOG_XACT_ASSIGNMENT records even + * However, it's critical to process records with subxid assignment even * when the snapshot is being built: it is possible to get later records * that require subxids to be properly assigned. */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT && - info != XLOG_XACT_ASSIGNMENT) + !TransactionIdIsValid(r->toplevel_xid)) return; switch (info) @@ -266,23 +282,6 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeAbort(ctx, buf, &parsed, xid); break; } - case XLOG_XACT_ASSIGNMENT: - { - xl_xact_assignment *xlrec; - int i; - TransactionId *sub_xid; - - xlrec = (xl_xact_assignment *) XLogRecGetData(r); - - sub_xid = &xlrec->xsub[0]; - - for (i = 0; i < xlrec->nsubxacts; i++) - { - ReorderBufferAssignChild(reorder, xlrec->xtop, - *(sub_xid++), buf->origptr); - } - break; - } case XLOG_XACT_PREPARE: /* diff --git a/src/include/access/xact.h b/src/include/access/xact.h index d714551..7553f84 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -145,7 +145,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XLOG_XACT_ABORT 0x20 #define XLOG_XACT_COMMIT_PREPARED 0x30 #define XLOG_XACT_ABORT_PREPARED 0x40 -#define XLOG_XACT_ASSIGNMENT 0x50 +/* free opcode 0x50 */ /* free opcode 0x60 */ /* free opcode 0x70 */ @@ -188,15 +188,6 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid, #define XactCompletionForceSyncCommit(xinfo) \ ((xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT) != 0) -typedef struct xl_xact_assignment -{ - TransactionId xtop; /* assigned XID's top-level XID */ - int nsubxacts; /* number of subtransaction XIDs */ - TransactionId xsub[FLEXIBLE_ARRAY_MEMBER]; /* assigned subxids */ -} xl_xact_assignment; - -#define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub) - /* * Commit and abort records can contain a lot of information. But a large * portion of the records won't need all possible pieces of information. So we @@ -363,7 +354,6 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); -extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); @@ -410,6 +400,9 @@ extern void UnregisterXactCallback(XactCallback callback, void *arg); extern void RegisterSubXactCallback(SubXactCallback callback, void *arg); extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg); +extern bool IsSubTransactionAssignmentPending(void); +extern void MarkSubTransactionAssigned(void); + extern int xactGetCommittedChildren(TransactionId **ptr); extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index d519252..060901d 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -227,6 +227,8 @@ extern bool XLOG_DEBUG; */ #define XLOG_INCLUDE_ORIGIN 0x01 /* include the replication origin */ #define XLOG_MARK_UNIMPORTANT 0x02 /* record not important for durability */ +#define XLOG_INCLUDE_XID 0x04 /* include XID of toplevel xact */ +#define XLOG_INCLUDE_INVALS 0x08 /* include invalidations */ /* Checkpoint statistics */ diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 1bbee38..c37a83d 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -148,6 +148,8 @@ struct XLogReaderState RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of toplevel transaction */ + /* information about blocks referenced by the record. */ DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; @@ -243,6 +245,7 @@ extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, #define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) #define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) #define XLogRecGetOrigin(decoder) ((decoder)->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) #define XLogRecGetData(decoder) ((decoder)->main_data) #define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) #define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h index 9375e54..bcfba0a 100644 --- a/src/include/access/xlogrecord.h +++ b/src/include/access/xlogrecord.h @@ -223,5 +223,6 @@ typedef struct XLogRecordDataHeaderLong #define XLR_BLOCK_ID_DATA_SHORT 255 #define XLR_BLOCK_ID_DATA_LONG 254 #define XLR_BLOCK_ID_ORIGIN 253 +#define XLR_BLOCK_ID_TOPLEVEL_XID 252 #endif /* XLOGRECORD_H */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 60c76cb..d08c08a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3423,7 +3423,6 @@ xl_standby_locks xl_tblspc_create_rec xl_tblspc_drop_rec xl_xact_abort -xl_xact_assignment xl_xact_commit xl_xact_dbinfo xl_xact_invals -- 1.8.3.1