From 961da3bf64b6d91949db83b9de72dcd72df9aa7e Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Mon, 27 Nov 2023 18:48:45 +0100 Subject: [PATCH v20231128 8/8] log XID instead of a boolean flag --- src/backend/access/rmgrdesc/seqdesc.c | 4 ++-- src/backend/access/transam/xact.c | 20 ++++++++++++++++++ src/backend/commands/sequence.c | 21 ++++++++++++------- src/backend/replication/logical/decode.c | 11 +++++++++- .../replication/logical/reorderbuffer.c | 5 +++++ src/include/access/xact.h | 1 + src/include/commands/sequence.h | 2 +- 7 files changed, 52 insertions(+), 12 deletions(-) diff --git a/src/backend/access/rmgrdesc/seqdesc.c b/src/backend/access/rmgrdesc/seqdesc.c index ba60544085e..296fa5d9169 100644 --- a/src/backend/access/rmgrdesc/seqdesc.c +++ b/src/backend/access/rmgrdesc/seqdesc.c @@ -25,9 +25,9 @@ seq_desc(StringInfo buf, XLogReaderState *record) xl_seq_rec *xlrec = (xl_seq_rec *) rec; if (info == XLOG_SEQ_LOG) - appendStringInfo(buf, "rel %u/%u/%u", + appendStringInfo(buf, "rel %u/%u/%u xid %u", xlrec->locator.spcOid, xlrec->locator.dbOid, - xlrec->locator.relNumber); + xlrec->locator.relNumber, xlrec->xid); } const char * diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 536edb3792f..ba0522d14b2 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -805,6 +805,26 @@ SubTransactionIsActive(SubTransactionId subxid) return false; } +/* + * SubTransactionIsActive + * + * Test if the specified subxact ID is still active. Note caller is + * responsible for checking whether this ID is relevant to the current xact. + */ +TransactionId +SubTransactionGetXid(SubTransactionId subxid) +{ + TransactionState s; + + for (s = CurrentTransactionState; s != NULL; s = s->parent) + { + if (s->state == TRANS_ABORT) + continue; + if (s->subTransactionId == subxid) + return XidFromFullTransactionId(s->fullTransactionId); + } + return InvalidTransactionId; +} /* * GetCurrentCommandId diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c index 9da9c8270d9..f3e1a7a462c 100644 --- a/src/backend/commands/sequence.c +++ b/src/backend/commands/sequence.c @@ -112,7 +112,7 @@ static void init_params(ParseState *pstate, List *options, bool for_identity, List **owned_by); static void do_setval(Oid relid, int64 next, bool iscalled); static void process_owned_by(Relation seqrel, List *owned_by, bool for_identity); -static inline bool is_sequence_transactional(Relation seqrel); +static inline TransactionId sequence_xid(Relation seqrel); /* @@ -256,11 +256,16 @@ DefineSequence(ParseState *pstate, CreateSeqStmt *seq) return address; } -static inline bool -is_sequence_transactional(Relation seqrel) +static inline TransactionId +sequence_xid(Relation seqrel) { - return (seqrel->rd_newRelfilelocatorSubid != InvalidSubTransactionId) || - (seqrel->rd_createSubid != InvalidSubTransactionId); + if (seqrel->rd_newRelfilelocatorSubid != InvalidSubTransactionId) + return SubTransactionGetXid(seqrel->rd_newRelfilelocatorSubid); + + if (seqrel->rd_createSubid != InvalidSubTransactionId) + return SubTransactionGetXid(seqrel->rd_createSubid); + + return InvalidTransactionId; } /* @@ -609,7 +614,7 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.locator = rel->rd_locator; - xlrec.is_transactional = is_sequence_transactional(rel); + xlrec.xid = sequence_xid(rel); XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) tuple->t_data, tuple->t_len); @@ -1074,7 +1079,7 @@ nextval_internal(Oid relid, bool check_permissions) seq->log_cnt = 0; xlrec.locator = seqrel->rd_locator; - xlrec.is_transactional = is_sequence_transactional(seqrel); + xlrec.xid = sequence_xid(seqrel); XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); @@ -1276,7 +1281,7 @@ do_setval(Oid relid, int64 next, bool iscalled) XLogRegisterBuffer(0, buf, REGBUF_WILL_INIT); xlrec.locator = seqrel->rd_locator; - xlrec.is_transactional = is_sequence_transactional(seqrel); + xlrec.xid = sequence_xid(seqrel); XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec)); XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index c58b5b45f29..ff2f3f4e1aa 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -1390,6 +1390,7 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) Snapshot snapshot = NULL; RepOriginId origin_id = XLogRecGetOrigin(r); bool transactional; + TransactionId sequence_xid; xl_seq_rec *xlrec; /* ignore sequences when the plugin does not have the callbacks */ @@ -1436,7 +1437,8 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * XXX: can xlrec be combined with tupledata? */ xlrec = (xl_seq_rec *) XLogRecGetData(r); - transactional = xlrec->is_transactional; + sequence_xid = xlrec->xid; + transactional = TransactionIdIsValid(sequence_xid); /* Skip the change if already processed (per the snapshot). */ if (transactional && @@ -1478,6 +1480,13 @@ seq_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (!transactional) snapshot = SnapBuildGetOrBuildSnapshot(builder); + /* + * FIXME can we override the xid like this? or should we pass both the + * original XID and the XID we recorded. + */ + if (transactional) + xid = sequence_xid; + /* Queue the change (or send immediately if not transactional). */ ReorderBufferQueueSequence(ctx->reorder, xid, snapshot, buf->endptr, origin_id, target_locator, transactional, diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 4121269a097..82fbfce8031 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1016,6 +1016,11 @@ ReorderBufferQueueSequence(ReorderBuffer *rb, TransactionId xid, /* non-transactional changes require a valid snapshot */ Assert(snapshot_now); + /* + * FIXME can this actually be InvalidTransactionId? We get txn=NULL + * and then rb->sequence() fails in pgoutput_sequence(), as it tries + * to do maybe_send_schema(). + */ if (xid != InvalidTransactionId) txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); diff --git a/src/include/access/xact.h b/src/include/access/xact.h index cb90f227ceb..fdb38c598bc 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -444,6 +444,7 @@ extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); +extern TransactionId SubTransactionGetXid(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); extern void SetParallelStartTimestamps(TimestampTz xact_ts, TimestampTz stmt_ts); extern TimestampTz GetCurrentTransactionStartTimestamp(void); diff --git a/src/include/commands/sequence.h b/src/include/commands/sequence.h index b863f9cda36..11dfe57f04b 100644 --- a/src/include/commands/sequence.h +++ b/src/include/commands/sequence.h @@ -48,7 +48,7 @@ typedef FormData_pg_sequence_data *Form_pg_sequence_data; typedef struct xl_seq_rec { RelFileLocator locator; - bool is_transactional; + TransactionId xid; /* valid XID, if transactional */ /* SEQUENCE TUPLE DATA FOLLOWS AT THE END */ } xl_seq_rec; -- 2.41.0