From 12dd3434ef13609b324bbbbe68a3f0e2a48934a2 Mon Sep 17 00:00:00 2001 From: ChangAo Chen Date: Fri, 21 Nov 2025 15:19:22 +0800 Subject: [PATCH v6 1/2] Track transactions committed in BUILDING_SNAPSHOT. The historic snapshot previously didn't track transactions committed in BUILDING_SNAPSHOT, this might result in a transaction taking an incorrect snapshot and logical decoding being interrupted. So we need to track these transactions. We also need to handle the xlog which means a catalog change in BUILDING_SNAPSHOT because the historic snapshot only tracks catalog modifying transactions. --- src/backend/replication/logical/decode.c | 33 ++++++++++++++++++++---- 1 file changed, 28 insertions(+), 5 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index cc03f0706e9..de1bed30781 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -206,12 +206,16 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* - * If the snapshot isn't yet fully built, we cannot decode anything, so - * bail out. + * If the snapshot hasn't started building yet, the transaction won't be + * decoded or tracked by the snapshot, so bail out. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT) return; + /* + * Note that if the snapshot isn't yet fully built, the xlog is only used + * to build the snapshot and won't be decoded. + */ switch (info) { case XLOG_XACT_COMMIT: @@ -282,18 +286,24 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { TransactionId xid; xl_xact_invals *invals; + bool has_snapshot; xid = XLogRecGetXid(r); invals = (xl_xact_invals *) XLogRecGetData(r); + has_snapshot = + SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT; /* * Execute the invalidations for xid-less transactions, * otherwise, accumulate them so that they can be processed at * the commit time. + * + * Note that we only need to do this when we are not fast-forwarding + * and there is a snapshot. */ if (TransactionIdIsValid(xid)) { - if (!ctx->fast_forward) + if (!ctx->fast_forward && has_snapshot) ReorderBufferAddInvalidations(reorder, xid, buf->origptr, invals->nmsgs, @@ -301,7 +311,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } - else if (!ctx->fast_forward) + else if (!ctx->fast_forward && has_snapshot) ReorderBufferImmediateInvalidation(ctx->reorder, invals->nmsgs, invals->msgs); @@ -419,7 +429,19 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * SnapBuildProcessRunningXacts(). */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + { + /* + * If we are building snapshot and the xlog means a catalog + * change, we need to mark it in the reorder buffer. + * + * Now only XLOG_HEAP2_NEW_CID means a catalog change. + */ + if (SnapBuildCurrentState(builder) >= SNAPBUILD_BUILDING_SNAPSHOT && + TransactionIdIsValid(xid) && info == XLOG_HEAP2_NEW_CID) + ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + return; + } switch (info) { @@ -1306,6 +1328,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id) { if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || FilterByOrigin(ctx, origin_id)) return true; -- 2.34.1