From 3e83a7e99b7e4d5abed63dac79112b797b1d2a84 Mon Sep 17 00:00:00 2001 From: "Chao Li (Evan)" Date: Tue, 30 Dec 2025 12:55:31 +0800 Subject: [PATCH v9 2/2] Consolidate replication origin session globals into a single struct. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit moves the separate global variables for replication origin state into a single RepOriginXactState struct. This groups logically related variables, which improves code readability and simplifies state management (e.g., resetting the state) by handling them as a unit. Author: Chao Li Suggested-by: Álvaro Herrera Reviewed-by: Ashutosh Bapat Discussion: https://postgr.es/m/CAEoWx2=pYvfRthXHTzSrOsf5_FfyY4zJyK4zV2v4W=yjUij1cA@mail.gmail.com --- src/backend/access/transam/twophase.c | 32 ++++++------- src/backend/access/transam/xact.c | 44 ++++++++++-------- src/backend/access/transam/xloginsert.c | 12 +++-- .../replication/logical/applyparallelworker.c | 6 +-- src/backend/replication/logical/origin.c | 46 +++++++++++++------ src/backend/replication/logical/tablesync.c | 4 +- src/backend/replication/logical/worker.c | 28 ++++++----- src/include/replication/origin.h | 15 ++++-- src/tools/pgindent/typedefs.list | 1 + 9 files changed, 113 insertions(+), 75 deletions(-) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index e50abb331cc..329c6bbf3c7 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1157,13 +1157,13 @@ EndPrepare(GlobalTransaction gxact) Assert(hdr->magic == TWOPHASE_MAGIC); hdr->total_len = records.total_len + sizeof(pg_crc32c); - replorigin = (replorigin_session_origin != InvalidRepOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidRepOriginId && + replorigin_xact_state.origin != DoNotReplicateId); if (replorigin) { - hdr->origin_lsn = replorigin_session_origin_lsn; - hdr->origin_timestamp = replorigin_session_origin_timestamp; + hdr->origin_lsn = replorigin_xact_state.origin_lsn; + hdr->origin_timestamp = replorigin_xact_state.origin_timestamp; } /* @@ -1211,7 +1211,7 @@ EndPrepare(GlobalTransaction gxact) if (replorigin) { /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, gxact->prepare_end_lsn); } @@ -2330,8 +2330,8 @@ RecordTransactionCommitPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidRepOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* Load the injection point before entering the critical section */ INJECTION_POINT_LOAD("commit-after-delay-checkpoint"); @@ -2376,23 +2376,23 @@ RecordTransactionCommitPrepared(TransactionId xid, if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* * Record commit timestamp. The value comes from plain commit timestamp * if replorigin is not enabled, or replorigin already set a value for us - * in replorigin_session_origin_timestamp otherwise. + * in replorigin_xact_state.origin_timestamp otherwise. * * We don't need to WAL-log anything here, as the commit record written * above already contains the data. */ - if (!replorigin || replorigin_session_origin_timestamp == 0) - replorigin_session_origin_timestamp = committs; + if (!replorigin || replorigin_xact_state.origin_timestamp == 0) + replorigin_xact_state.origin_timestamp = committs; TransactionTreeSetCommitTsData(xid, nchildren, children, - replorigin_session_origin_timestamp, - replorigin_session_origin); + replorigin_xact_state.origin_timestamp, + replorigin_xact_state.origin); /* * We don't currently try to sleep before flush here ... nor is there any @@ -2445,8 +2445,8 @@ RecordTransactionAbortPrepared(TransactionId xid, * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidRepOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* * Catch the scenario where we aborted partway through @@ -2472,7 +2472,7 @@ RecordTransactionAbortPrepared(TransactionId xid, if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* Always flush, since we're about to remove the 2PC state file */ diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index c857e23552f..c5c3f21c9a7 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1413,8 +1413,8 @@ RecordTransactionCommit(void) * Are we using the replication origins feature? Or, in other words, * are we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidRepOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* * Mark ourselves as within our "commit critical section". This @@ -1462,25 +1462,25 @@ RecordTransactionCommit(void) if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* * Record commit timestamp. The value comes from plain commit * timestamp if there's no replication origin; otherwise, the - * timestamp was already set in replorigin_session_origin_timestamp by - * replication. + * timestamp was already set in replorigin_xact_state.origin_timestamp + * by replication. * * We don't need to WAL-log anything here, as the commit record * written above already contains the data. */ - if (!replorigin || replorigin_session_origin_timestamp == 0) - replorigin_session_origin_timestamp = GetCurrentTransactionStopTimestamp(); + if (!replorigin || replorigin_xact_state.origin_timestamp == 0) + replorigin_xact_state.origin_timestamp = GetCurrentTransactionStopTimestamp(); TransactionTreeSetCommitTsData(xid, nchildren, children, - replorigin_session_origin_timestamp, - replorigin_session_origin); + replorigin_xact_state.origin_timestamp, + replorigin_xact_state.origin); } /* @@ -1810,8 +1810,8 @@ RecordTransactionAbort(bool isSubXact) * Are we using the replication origins feature? Or, in other words, are * we replaying remote actions? */ - replorigin = (replorigin_session_origin != InvalidRepOriginId && - replorigin_session_origin != DoNotReplicateId); + replorigin = (replorigin_xact_state.origin != InvalidRepOriginId && + replorigin_xact_state.origin != DoNotReplicateId); /* Fetch the data we need for the abort record */ nrels = smgrGetPendingDeletes(false, &rels); @@ -1838,7 +1838,7 @@ RecordTransactionAbort(bool isSubXact) if (replorigin) /* Move LSNs forward for this replication origin */ - replorigin_session_advance(replorigin_session_origin_lsn, + replorigin_session_advance(replorigin_xact_state.origin_lsn, XactLastRecEnd); /* @@ -5927,13 +5927,17 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xinfo.xinfo |= XACT_XINFO_HAS_GID; } - /* dump transaction origin information */ - if (replorigin_session_origin != InvalidRepOriginId) + /* + * Dump transaction origin information + * + * Note that DoNotReplicateId is intentionally excluded here. + */ + if (replorigin_xact_state.origin != InvalidRepOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; - xl_origin.origin_lsn = replorigin_session_origin_lsn; - xl_origin.origin_timestamp = replorigin_session_origin_timestamp; + xl_origin.origin_lsn = replorigin_xact_state.origin_lsn; + xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp; } if (xl_xinfo.xinfo != 0) @@ -6080,13 +6084,15 @@ XactLogAbortRecord(TimestampTz abort_time, /* * Dump transaction origin information. We need this during recovery to * update the replication origin progress. + * + * Note that DoNotReplicateId is intentionally excluded here. */ - if (replorigin_session_origin != InvalidRepOriginId) + if (replorigin_xact_state.origin != InvalidRepOriginId) { xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; - xl_origin.origin_lsn = replorigin_session_origin_lsn; - xl_origin.origin_timestamp = replorigin_session_origin_timestamp; + xl_origin.origin_lsn = replorigin_xact_state.origin_lsn; + xl_origin.origin_timestamp = replorigin_xact_state.origin_timestamp; } if (xl_xinfo.xinfo != 0) diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 92c48e768c3..64534e45216 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -859,13 +859,17 @@ XLogRecordAssemble(RmgrId rmid, uint8 info, scratch += sizeof(BlockNumber); } - /* followed by the record's origin, if any */ + /* + * followed by the record's origin, if any + * + * DoNotReplicateId is intentionally excluded here + */ if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) && - replorigin_session_origin != InvalidRepOriginId) + replorigin_xact_state.origin != InvalidRepOriginId) { *(scratch++) = (char) XLR_BLOCK_ID_ORIGIN; - memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin)); - scratch += sizeof(replorigin_session_origin); + memcpy(scratch, &replorigin_xact_state.origin, sizeof(replorigin_xact_state.origin)); + scratch += sizeof(replorigin_xact_state.origin); } /* followed by toplevel XID, if not already included in previous record */ diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 5ebd2353fed..232f0447b30 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -962,7 +962,7 @@ ParallelApplyWorkerMain(Datum main_arg) * origin which was already acquired by its leader process. */ replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); - replorigin_session_origin = originid; + replorigin_xact_set_origin(originid); CommitTransactionCommand(); /* @@ -1430,8 +1430,8 @@ pa_stream_abort(LogicalRepStreamAbortData *abort_data) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = abort_data->abort_lsn; - replorigin_session_origin_timestamp = abort_data->abort_time; + replorigin_xact_set_lsn_timestamp(abort_data->abort_lsn, + abort_data->abort_time); /* * If the two XIDs are the same, it's in fact abort of toplevel xact, so diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 09616641903..1dcc20b00db 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -159,10 +159,12 @@ typedef struct ReplicationStateCtl ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; } ReplicationStateCtl; -/* external variables */ -RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ -XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; -TimestampTz replorigin_session_origin_timestamp = 0; +/* Global variable for per-transaction replication origin state */ +RepOriginXactState replorigin_xact_state = { + .origin = InvalidRepOriginId, /* assumed identity */ + .origin_lsn = InvalidXLogRecPtr, + .origin_timestamp = 0 +}; /* * Base address into a shared memory array of replication states of size @@ -896,7 +898,7 @@ replorigin_redo(XLogReaderState *record) * Tell the replication origin progress machinery that a commit from 'node' * that originated at the LSN remote_commit on the remote node was replayed * successfully and that we don't need to do so again. In combination with - * setting up replorigin_session_origin_lsn and replorigin_session_origin + * setting up replorigin_xact_state {.origin_lsn, .origin_timestamp} * that ensures we won't lose knowledge about that after a crash if the * transaction had a persistent effect (think of asynchronous commits). * @@ -1287,6 +1289,26 @@ replorigin_session_get_progress(bool flush) return remote_lsn; } +/* + * Set the per-transaction replication origin state. + */ +void +replorigin_xact_set_origin(RepOriginId origin) +{ + replorigin_xact_state.origin = origin; +} + +/* + * Set the per-transaction replication origin LSN and timestamp. + */ +void +replorigin_xact_set_lsn_timestamp(XLogRecPtr origin_lsn, + TimestampTz origin_timestamp) +{ + replorigin_xact_state.origin_lsn = origin_lsn; + replorigin_xact_state.origin_timestamp = origin_timestamp; +} + /* * Clear the per-transaction replication origin state. * @@ -1295,13 +1317,12 @@ replorigin_session_get_progress(bool flush) void replorigin_xact_clear(bool clear_origin) { - replorigin_session_origin_lsn = InvalidXLogRecPtr; - replorigin_session_origin_timestamp = 0; + replorigin_xact_state.origin_lsn = InvalidXLogRecPtr; + replorigin_xact_state.origin_timestamp = 0; if (clear_origin) - replorigin_session_origin = InvalidRepOriginId; + replorigin_xact_state.origin = InvalidRepOriginId; } - /* --------------------------------------------------------------------------- * SQL functions for working with replication origin. * @@ -1408,7 +1429,7 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) pid = PG_GETARG_INT32(1); replorigin_session_setup(origin, pid); - replorigin_session_origin = origin; + replorigin_xact_set_origin(origin); pfree(name); @@ -1438,7 +1459,7 @@ pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) { replorigin_check_prerequisites(false, false); - PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); + PG_RETURN_BOOL(replorigin_xact_state.origin != InvalidRepOriginId); } @@ -1482,8 +1503,7 @@ pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("no replication origin is configured"))); - replorigin_session_origin_lsn = location; - replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); + replorigin_xact_set_lsn_timestamp(location, PG_GETARG_TIMESTAMPTZ(1)); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 73a0d2838cf..e34377a559c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1318,7 +1318,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) */ originid = replorigin_by_name(originname, false); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_set_origin(originid); *origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -1405,7 +1405,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_set_origin(originid); /* * If the user did not opt to run as the owner of the subscription diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c1aceabbdf5..d5e23854250 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1318,8 +1318,7 @@ apply_handle_prepare_internal(LogicalRepPreparedTxnData *prepare_data) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = prepare_data->end_lsn; - replorigin_session_origin_timestamp = prepare_data->prepare_time; + replorigin_xact_set_lsn_timestamp(prepare_data->end_lsn, prepare_data->prepare_time); PrepareTransactionBlock(gid); } @@ -1421,8 +1420,7 @@ apply_handle_commit_prepared(StringInfo s) * Update origin state so we can restart streaming from correct position * in case of crash. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + replorigin_xact_set_lsn_timestamp(prepare_data.end_lsn, prepare_data.commit_time); FinishPreparedTransaction(gid, true); end_replication_step(); @@ -1479,8 +1477,8 @@ apply_handle_rollback_prepared(StringInfo s) * Update origin state so we can restart streaming from correct * position in case of crash. */ - replorigin_session_origin_lsn = rollback_data.rollback_end_lsn; - replorigin_session_origin_timestamp = rollback_data.rollback_time; + replorigin_xact_set_lsn_timestamp(rollback_data.rollback_end_lsn, + rollback_data.rollback_time); /* There is no transaction when ABORT/ROLLBACK PREPARED is called */ begin_replication_step(); @@ -2526,8 +2524,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) * Update origin state so we can restart streaming from correct * position in case of crash. */ - replorigin_session_origin_lsn = commit_data->end_lsn; - replorigin_session_origin_timestamp = commit_data->committime; + replorigin_xact_set_lsn_timestamp(commit_data->end_lsn, + commit_data->committime); CommitTransactionCommand(); @@ -2940,7 +2938,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { TupleTableSlot *newslot; @@ -2982,7 +2980,7 @@ apply_handle_update_internal(ApplyExecutionData *edata, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) type = CT_UPDATE_DELETED; else type = CT_UPDATE_MISSING; @@ -3135,7 +3133,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata, */ if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { conflicttuple.slot = localslot; ReportApplyConflict(estate, relinfo, LOG, CT_DELETE_ORIGIN_DIFFERS, @@ -3477,7 +3475,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) type = CT_UPDATE_DELETED; else type = CT_UPDATE_MISSING; @@ -3503,7 +3501,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, if (GetTupleTransactionInfo(localslot, &conflicttuple.xmin, &conflicttuple.origin, &conflicttuple.ts) && - conflicttuple.origin != replorigin_session_origin) + conflicttuple.origin != replorigin_xact_state.origin) { TupleTableSlot *newslot; @@ -5652,7 +5650,7 @@ run_apply_worker(void) if (!OidIsValid(originid)) originid = replorigin_create(originname); replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; + replorigin_xact_set_origin(originid); origin_startpos = replorigin_session_get_progress(false); CommitTransactionCommand(); @@ -5869,7 +5867,7 @@ InitializeLogRepWorker(void) } /* - * Callback on exit to reset the origin state. + * Callback on exit to clear transaction-level replication origin state. */ static void on_exit_clear_state(int code, Datum arg) diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index 1eaabacde03..c953785f6a4 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -40,9 +40,14 @@ typedef struct xl_replorigin_drop */ #define MAX_RONAME_LEN 512 -extern PGDLLIMPORT RepOriginId replorigin_session_origin; -extern PGDLLIMPORT XLogRecPtr replorigin_session_origin_lsn; -extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; +typedef struct RepOriginXactState +{ + RepOriginId origin; + XLogRecPtr origin_lsn; + TimestampTz origin_timestamp; +} RepOriginXactState; + +extern PGDLLIMPORT RepOriginXactState replorigin_xact_state; /* GUCs */ extern PGDLLIMPORT int max_active_replication_origins; @@ -67,6 +72,10 @@ extern void replorigin_session_setup(RepOriginId node, int acquired_by); extern void replorigin_session_reset(void); extern XLogRecPtr replorigin_session_get_progress(bool flush); +/* Per-transaction replication origin state manipulation */ +extern void replorigin_xact_set_origin(RepOriginId origin); +extern void replorigin_xact_set_lsn_timestamp(XLogRecPtr origin_lsn, + TimestampTz origin_timestamp); extern void replorigin_xact_clear(bool clear_origin); /* Checkpoint/Startup integration */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 09e7f1d420e..94a1dbed466 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2569,6 +2569,7 @@ ReorderBufferTupleCidKey ReorderBufferUpdateProgressTxnCB ReorderTuple RepOriginId +RepOriginXactState ReparameterizeForeignPathByChild_function ReplaceVarsFromTargetList_context ReplaceVarsNoMatchOption -- 2.39.5 (Apple Git-154)