From 3f2e86bd1efbe0707d02705bb641c5629716c598 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Tue, 23 Jul 2024 07:38:35 +0000 Subject: [PATCH 1/2] Add XactLastPrepareEnd to indicate the last PREPARE record We are using XactLastCommitEnd to track transaction activities. This approach works well for regular transactions, but it has been wrongly re-used for prepared transactions. This commit adds a new global variable, XactLastPrepareEnd, to track more appropriately. Originally reported by Wang Wei --- src/backend/access/transam/twophase.c | 3 +++ src/backend/access/transam/xlog.c | 1 + src/backend/replication/logical/worker.c | 10 +++++----- src/include/access/xlog.h | 1 + src/include/replication/worker_internal.h | 4 ++-- 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 9a8257fcaf..2d7e15c2c3 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1269,6 +1269,9 @@ EndPrepare(GlobalTransaction gxact) */ SyncRepWaitForLSN(gxact->prepare_end_lsn, false); + /* remember end of last prepare record */ + XactLastPrepareEnd = gxact->prepare_end_lsn; + records.tail = records.head = NULL; records.num_chunks = 0; } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 473a9c5c2f..3dfae5e7d9 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -251,6 +251,7 @@ static int LocalXLogInsertAllowed = -1; XLogRecPtr ProcLastRecPtr = InvalidXLogRecPtr; XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; XLogRecPtr XactLastCommitEnd = InvalidXLogRecPtr; +XLogRecPtr XactLastPrepareEnd = InvalidXLogRecPtr; /* * RedoRecPtr is this backend's local copy of the REDO record pointer diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c0bda6269b..46f7a5c3a5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1136,7 +1136,7 @@ apply_handle_prepare(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd); in_remote_transaction = false; @@ -1193,7 +1193,7 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ @@ -1309,7 +1309,7 @@ apply_handle_stream_prepare(StringInfo s) CommitTransactionCommand(); - store_flush_position(prepare_data.end_lsn, XactLastCommitEnd); + store_flush_position(prepare_data.end_lsn, XactLastPrepareEnd); in_remote_transaction = false; @@ -1367,7 +1367,7 @@ apply_handle_stream_prepare(StringInfo s) CommitTransactionCommand(); - MyParallelShared->last_commit_end = XactLastCommitEnd; + MyParallelShared->last_commit_end = XactLastPrepareEnd; pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); pa_unlock_transaction(MyParallelShared->xid, AccessExclusiveLock); @@ -3447,7 +3447,7 @@ store_flush_position(XLogRecPtr remote_lsn, XLogRecPtr local_lsn) /* Need to do this in permanent context */ MemoryContextSwitchTo(ApplyContext); - /* Track commit lsn */ + /* Track record lsn */ flushpos = (FlushPosition *) palloc(sizeof(FlushPosition)); flushpos->local_end = local_lsn; flushpos->remote_end = remote_lsn; diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index c40fd56b29..9530338209 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -32,6 +32,7 @@ extern PGDLLIMPORT int wal_sync_method; extern PGDLLIMPORT XLogRecPtr ProcLastRecPtr; extern PGDLLIMPORT XLogRecPtr XactLastRecEnd; extern PGDLLIMPORT XLogRecPtr XactLastCommitEnd; +extern PGDLLIMPORT XLogRecPtr XactLastPrepareEnd; /* these variables are GUC parameters related to XLOG */ extern PGDLLIMPORT int wal_segment_size; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 515aefd519..bdc73d2374 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -163,8 +163,8 @@ typedef struct ParallelApplyWorkerShared pg_atomic_uint32 pending_stream_count; /* - * XactLastCommitEnd from the parallel apply worker. This is required by - * the leader worker so it can update the lsn_mappings. + * XactLastCommitEnd or XactLastPrepareEnd from the parallel apply worker. + * This is required by the leader worker so it can update the lsn_mappings. */ XLogRecPtr last_commit_end; -- 2.43.0