From 53d079af84b8ac44497f27eab52c73cfc2bbd298 Mon Sep 17 00:00:00 2001 From: ChangAo Chen Date: Mon, 10 Jun 2024 20:56:37 +0800 Subject: [PATCH v1] Track transactions committed in BUILDING_SNAPSHOT. Transactions committed in BUILDING_SNAPSHOT state are useful for building historic snapshot, but we didn't track them previously. This results in a transaction taking an incorrect snapshot and logical replication being interrupted. So it's necessary to track these transactions. --- src/backend/replication/logical/decode.c | 27 +++++++++++++-------- src/backend/replication/logical/snapbuild.c | 7 ++++++ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8ec5adfd90..6159beca4b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -206,10 +206,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; /* - * If the snapshot isn't yet fully built, we cannot decode anything, so - * bail out. + * If the snapshot isn't yet fully built, we cannot decode anything. + * But we need to track transactions committed in BUILDING_SNAPSHOT + * state, or the snapshot will be incorrect. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT) return; switch (info) @@ -282,9 +283,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { TransactionId xid; xl_xact_invals *invals; + bool has_snapshot = false; xid = XLogRecGetXid(r); invals = (xl_xact_invals *) XLogRecGetData(r); + has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT; /* * Execute the invalidations for xid-less transactions, @@ -293,7 +296,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) */ if (TransactionIdIsValid(xid)) { - if (!ctx->fast_forward) + if (!ctx->fast_forward && has_snapshot) ReorderBufferAddInvalidations(reorder, xid, buf->origptr, invals->nmsgs, @@ -301,12 +304,13 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); } - else if ((!ctx->fast_forward)) + else if (!ctx->fast_forward && has_snapshot) ReorderBufferImmediateInvalidation(ctx->reorder, invals->nmsgs, invals->msgs); + + break; } - break; case XLOG_XACT_PREPARE: { xl_xact_parsed_prepare parsed; @@ -411,9 +415,10 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding changes. + * point in decoding changes. If we are building snapshot, we need to + * handle XLOG_HEAP2_NEW_CID which mark a transaction as catalog modifying. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -470,9 +475,10 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) /* * If we don't have snapshot or we are just fast-forwarding, there is no - * point in decoding data changes. + * point in decoding data changes. If we are building snapshot, we need to + * handle XLOG_HEAP_INPLACE which mark a transaction as catalog modifying. */ - if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT || + if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT || ctx->fast_forward) return; @@ -1300,6 +1306,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, Oid txn_dbid, RepOriginId origin_id) { if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || + SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT || (txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) || FilterByOrigin(ctx, origin_id)) return true; diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index e37e22f441..8eacede275 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -826,6 +826,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, */ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + /* + * If we don't have snapshot, the transaction won't be + * replayed, just return here. + */ + if (builder->state < SNAPBUILD_FULL_SNAPSHOT) + return; + ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, xlrec->target_locator, xlrec->target_tid, xlrec->cmin, xlrec->cmax, -- 2.34.1