From 7e9da4f2427cca7784e1fdb504450e6987071cb1 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Fri, 17 Jan 2025 14:08:02 +0800 Subject: [PATCH v28 1/4] Maintain the oldest non removeable tranasction ID by apply worker This set of patches aims to support the detection of update_deleted conflicts, which occur when the apply worker cannot find the target tuple to be updated (e.g., the tuple has been removed by a different origin). To detect this conflict consistently and correctly, we must ensure that tuples deleted by other origins are not prematurely removed by VACUUM before conflict detection. If these tuples are removed too soon, a different conflict might arise and be resolved incorrectly, causing data inconsistency between nodes. To achieve this, we will retain the dead tuples on the subscriber for some period. The concept is that dead tuples are useful for detecting conflicts only during the application of concurrent transactions from remote nodes. After applying and flushing all remote transactions that occurred concurrently with the tuple DELETE, any subsequent UPDATE from a remote node should have a later timestamp. In such cases, it is acceptable to detect an update_missing scenario and convert the UPDATE to an INSERT when applying it. But, for concurrent remote transactions with earlier timestamps than the DELETE, detecting update_deleted is necessary, as the UPDATEs in remote transactions should be ignored if their timestamp is earlier than that of the dead tuples. We assume that the appropriate resolution for update_deleted conflicts, to achieve eventual consistency, is the last-update-win strategy. This means that when detecting the update_deleted conflict, and the remote update has a later timestamp, the resolution would be to convert the UPDATE to an INSERT. Remote updates with earlier timestamps compared to the dead tuples will be disregarded. To implement this, an additional replication slot named pg_conflict_detection will be created on the subscriber side and maintained by the launcher. This slot will be used to retain dead tuples. Each apply worker will maintain its own non-removable transaction ID, while the launcher collects these IDs to determine whether to advance the xmin value of the replication slot. The process of advancing the non-removable transaction ID in the apply worker involves: 1) Call GetOldestActiveTransactionId() to take oldestRunningXid as the candidate xid. 2) Send a message to the walsender requesting the publisher status, which includes the latest WAL write position and information about transactions that are in the commit phase. 3) Wait for the status from the walsender. After receiving the first status, do not proceed if there are concurrent remote transactions that are still in the commit phase. These transactions might have been assigned an earlier commit timestamp but have not yet written the commit WAL record. Continue to request the publisher status until all these transactions have completed. 4) Advance the non-removable transaction ID if the current flush location has reached or surpassed the last received WAL position. These steps are repeated at intervals that are dynamically adjusted based on whether a new transaction ID has been assigned since the last advancement. This mechanism ensures that dead tuples are not removed until all concurrent transactions have been applied. It works for both bidirectional and non-bidirectional replication scenarios. Since the mechanism relies on a single replication slot, it not only assists in retaining dead tuples but also preserves commit timestamps and origin data. These information will be displayed in the additional logs generated for logical replication conflicts. Furthermore, the preserved commit timestamps and origin data are essential for consistently detecting update_origin_differs conflicts. This patch allows each apply worker to maintain the non-removable transaction ID in the shared memory following the steps described above. The actual replication slot management is implemented in the following patches. --- doc/src/sgml/protocol.sgml | 90 ++++ src/backend/access/transam/twophase.c | 26 +- src/backend/access/transam/xact.c | 6 +- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 491 ++++++++++++++++++++- src/backend/replication/walsender.c | 54 +++ src/backend/storage/ipc/procarray.c | 59 +++ src/include/replication/worker_internal.h | 17 + src/include/storage/proc.h | 8 + src/include/storage/procarray.h | 1 + src/tools/pgindent/typedefs.list | 2 + 11 files changed, 745 insertions(+), 10 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index fb5dec1172e..49a4d3ab1b8 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2450,6 +2450,69 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Primary status update (B) + + + + Byte1('s') + + + Identifies the message as a primary status update. + + + + + + Int64 + + + The latest WAL write position on the server. + + + + + + Int32 + + + The oldest transaction ID that is currently in the commit + phase on the server. + + + + + + Int32 + + + The next transaction ID to be assigned on the server. + + + + + + Int32 + + + The epoch of the next transaction ID to be assigned. + + + + + + Int64 + + + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + @@ -2594,6 +2657,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Request primary status update (F) + + + + Byte1('p') + + + Identifies the message as a request for a primary status update. + + + + + + Int64 + + + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 73a80559194..41fb4fc5025 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1181,7 +1181,11 @@ EndPrepare(GlobalTransaction gxact) * starting immediately after the WAL record is inserted could complete * without fsync'ing our state file. (This is essentially the same kind * of race condition as the COMMIT-to-clog-write case that - * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.). Note that DELAY_CHKPT_IN_COMMIT is used to find transactions + * in the critical commit section. We need to know about such transactions + * for conflict detection and resolution in logical replication. See + * GetOldestTransactionIdInCommit and its use. * * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. @@ -2286,7 +2290,7 @@ ProcessTwoPhaseBuffer(TransactionId xid, * RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit (q.v. if you change - * this function): in particular, we must set DELAY_CHKPT_START to avoid a + * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a * race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), @@ -2306,7 +2310,7 @@ RecordTransactionCommitPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; - TimestampTz committs = GetCurrentTimestamp(); + TimestampTz committs; bool replorigin; /* @@ -2319,8 +2323,18 @@ RecordTransactionCommitPrepared(TransactionId xid, START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + /* + * Note it is important to set committs value after marking ourselves as + * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because + * we want to ensure all such transactions are finished before we allow + * the logical replication client to advance its xid which is used to hold + * back dead rows for conflict detection. See + * maybe_advance_nonremovable_xid. + */ + committs = GetCurrentTimestamp(); /* * Emit the XLOG commit record. Note that we mark 2PC commits as @@ -2369,7 +2383,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionIdCommitTree(xid, nchildren, children); /* Checkpoint can proceed now */ - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1b4f21a88d3..93668a81a34 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1431,9 +1431,9 @@ RecordTransactionCommit(void) * modifying it. This makes checkpoint's determination of which xacts * are delaying the checkpoint a bit fuzzy, but it doesn't matter. */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); START_CRIT_SECTION(); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; /* * Insert the commit XLOG record. @@ -1536,7 +1536,7 @@ RecordTransactionCommit(void) */ if (markXidCommitted) { - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index a3c7adbf1a8..31ebef831c2 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -441,6 +441,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->oldest_nonremovable_xid = InvalidFullTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 31ab69ea13a..82d5515b0cd 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -179,6 +179,7 @@ #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -275,6 +276,63 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See maybe_advance_nonremovable_xid() for details of the transition + * between these phases. + */ +typedef enum +{ + RCI_GET_CANDIDATE_XID, + RCI_REQUEST_PUBLISHER_STATUS, + RCI_WAIT_FOR_PUBLISHER_STATUS, + RCI_WAIT_FOR_LOCAL_FLUSH +} RetainConflictInfoPhase; + +/* + * Critical information for managing phase transitions within the + * RetainConflictInfoPhase. + */ +typedef struct RetainConflictInfoData +{ + RetainConflictInfoPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + TransactionId remote_oldestxid; /* oldest transaction ID that was in the + * commit phase on the publisher */ + TransactionId remote_nextxid; /* next transaction ID to be assigned on + * the publisher */ + uint32 remote_epoch; /* epoch of remote_nextxid */ + FullTransactionId last_phase_at; /* publisher transaction ID that must + * be awaited to complete before + * entering the final phase + * (RCI_WAIT_FOR_LOCAL_FLUSH) */ + FullTransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz reply_time; /* when the publisher responds with status */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RCI_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainConflictInfoData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -339,6 +397,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We + * record this information while sending feedback to the server and use this + * both while sending feedback and advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -379,6 +444,16 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainConflictInfoData *data, + bool status_received); +static void get_candidate_xid(RetainConflictInfoData *data); +static void request_publisher_status(RetainConflictInfoData *data); +static void wait_for_publisher_status(RetainConflictInfoData *data, + bool status_received); +static void wait_for_local_flush(RetainConflictInfoData *data); +static void adjust_xid_advance_interval(RetainConflictInfoData *data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3586,6 +3661,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainConflictInfoData data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3664,6 +3740,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3690,6 +3768,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&data, false); } else if (c == 'k') { @@ -3705,8 +3785,32 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + data.remote_lsn = pq_getmsgint64(&s); + data.remote_oldestxid = pq_getmsgint(&s, 4); + data.remote_nextxid = pq_getmsgint(&s, 4); + data.remote_epoch = pq_getmsgint(&s, 4); + data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&data, true); + + UpdateWorkerStats(last_received, data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3719,6 +3823,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3753,6 +3862,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to attempt to advance the + * non-removable transaction ID. + */ + if (data.phase == RCI_GET_CANDIDATE_XID && data.xid_advance_interval) + wait_time = Min(wait_time, data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3816,6 +3932,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3851,7 +3969,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3929,6 +4046,378 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } +/* + * Attempt to advance the non-removable transaction ID. + * + * The oldest_nonremovable_xid is maintained in shared memory to prevent dead + * rows from being removed prematurely when the apply worker still needs them + * to detect update_deleted conflicts. + * + * The non-removable transaction ID is advanced to the oldest running + * transaction ID once all concurrent transactions on the publisher have been + * applied and flushed locally. The process involves: + * + * - RCI_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RCI_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RCI_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RCI_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RCI_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * XXX It might seem feasible to track the latest commit timestamp on the + * publisher and send the WAL position once the timestamp exceeds that on the + * subscriber. However, commit timestamps can regress since a commit with a + * later LSN is not guaranteed to have a later timestamp than those with + * earlier LSNs. + */ +static void +maybe_advance_nonremovable_xid(RetainConflictInfoData *data, + bool status_received) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect update_deleted conflict + * even for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return; + + switch (data->phase) + { + case RCI_GET_CANDIDATE_XID: + get_candidate_xid(data); + break; + case RCI_REQUEST_PUBLISHER_STATUS: + request_publisher_status(data); + break; + case RCI_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(data, status_received); + break; + case RCI_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(data); + break; + } +} + +/* + * Workhorse for the RCI_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainConflictInfoData *data) +{ + TransactionId oldest_running_xid; + FullTransactionId next_full_xid; + FullTransactionId full_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = data->last_recv_time ? data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(data->candidate_xid_time, now, + data->xid_advance_interval)) + return; + + data->candidate_xid_time = now; + + oldest_running_xid = GetOldestActiveTransactionId(); + next_full_xid = ReadNextFullTransactionId(); + + /* + * Compute FullTransactionId for the oldest running transaction ID. This + * handles the case where transaction ID wraparound has occurred. + */ + full_xid = FullTransactionIdFromAllowableAt(next_full_xid, oldest_running_xid); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (FullTransactionIdFollowsOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + full_xid)) + { + adjust_xid_advance_interval(data, false); + return; + } + + adjust_xid_advance_interval(data, true); + + data->candidate_xid = full_xid; + data->phase = RCI_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + maybe_advance_nonremovable_xid(data, false); +} + +/* + * Workhorse for the RCI_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainConflictInfoData *data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + data->phase = RCI_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RCI_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainConflictInfoData *data, bool status_received) +{ + FullTransactionId remote_full_xid; + FullTransactionId remote_next_full_xid; + + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + remote_next_full_xid = FullTransactionIdFromEpochAndXid(data->remote_epoch, + data->remote_nextxid); + + if (!FullTransactionIdIsValid(data->last_phase_at)) + data->last_phase_at = remote_next_full_xid; + + /* + * Compute FullTransactionId for the remote oldest committing transaction + * ID. This handles the case where transaction ID wraparound has occurred. + */ + remote_full_xid = FullTransactionIdFromAllowableAt(remote_next_full_xid, + data->remote_oldestxid); + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + */ + if (FullTransactionIdPrecedesOrEquals(data->last_phase_at, + remote_full_xid)) + data->phase = RCI_WAIT_FOR_LOCAL_FLUSH; + else + data->phase = RCI_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + maybe_advance_nonremovable_xid(data, false); +} + +/* + * Workhorse for the RCI_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainConflictInfoData *data) +{ + Assert(!XLogRecPtrIsInvalid(data->remote_lsn) && + FullTransactionIdIsValid(data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect update_delete conflict. + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(data->reply_time, + data->candidate_xid_time, 0)) + ereport(ERROR, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("oldest_nonremovable_xid transaction ID may be advanced prematurely"), + errdetail("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costy operations in get_flush_position() too frequently + * during change application. + */ + if (data->last_recv_time && + TimestampDifferenceExceeds(data->flushpos_update_time, + data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + data->flushpos_update_time = data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed remote flush up to %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(data->remote_lsn), + XidFromFullTransactionId(data->candidate_xid)); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. + */ + data->phase = RCI_GET_CANDIDATE_XID; + data->remote_lsn = InvalidXLogRecPtr; + data->remote_oldestxid = InvalidTransactionId; + data->remote_nextxid = InvalidTransactionId; + data->remote_epoch = 0; + data->last_phase_at = InvalidFullTransactionId; + data->candidate_xid = InvalidFullTransactionId; + data->reply_time = 0; + data->flushpos_update_time = 0; + + /* process the next phase */ + maybe_advance_nonremovable_xid(data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainConflictInfoData *data, bool new_xid_found) +{ + if (!new_xid_found && data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + data->xid_advance_interval = Min(data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + /* * Exit routine for apply workers due to subscription parameter changes. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 446d10c1a7d..9d27121a2e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -83,6 +83,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -253,6 +254,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -2313,6 +2315,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'p': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2659,6 +2665,54 @@ ProcessStandbyHSFeedbackMessage(void) } } +/* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestXidInCommit; + FullTransactionId nextFullXid; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + /* + * This shouldn't happen because we don't support getting primary status + * message from standby. + */ + if (RecoveryInProgress()) + elog(ERROR, "the primary status is unavailable during recovery"); + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + oldestXidInCommit = GetOldestTransactionIdInCommit(); + nextFullXid = ReadNextFullTransactionId(); + lsn = GetXLogWriteRecPtr(); + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint32(&output_message, oldestXidInCommit); + pq_sendint32(&output_message, XidFromFullTransactionId(nextFullXid)); + pq_sendint32(&output_message, EpochFromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); +} + /* * Compute how long send/receive loops should sleep. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 2e54c11f880..064841b482d 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2925,6 +2925,65 @@ GetOldestActiveTransactionId(void) return oldestRunningXid; } + +/* + * GetOldestTransactionIdInCommit() + * + * Similar to GetOldestActiveTransactionId but returns the oldest transaction ID + * that is currently in the commit phase. + */ +TransactionId +GetOldestTransactionIdInCommit(void) +{ + ProcArrayStruct *arrayP = procArray; + TransactionId *other_xids = ProcGlobal->xids; + TransactionId oldestXidInCommit; + int index; + + Assert(!RecoveryInProgress()); + + /* + * Read nextXid, as the upper bound of what's still active. + * + * Reading a TransactionId is atomic, but we must grab the lock to make + * sure that all XIDs < nextXid are already present in the proc array (or + * have already completed), when we spin over it. + */ + LWLockAcquire(XidGenLock, LW_SHARED); + oldestXidInCommit = XidFromFullTransactionId(TransamVariables->nextXid); + LWLockRelease(XidGenLock); + + /* + * Spin over procArray collecting all xids and subxids. + */ + LWLockAcquire(ProcArrayLock, LW_SHARED); + for (index = 0; index < arrayP->numProcs; index++) + { + TransactionId xid; + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; + + /* Fetch xid just once - see GetNewTransactionId */ + xid = UINT32_ACCESS_ONCE(other_xids[index]); + + if (!TransactionIdIsNormal(xid)) + continue; + + if ((proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) != 0 && + TransactionIdPrecedes(xid, oldestXidInCommit)) + oldestXidInCommit = xid; + + /* + * Top-level XID of a transaction is always less than any of its + * subxids, so we don't need to check if any of the subxids are + * smaller than oldestXidInCommit + */ + } + LWLockRelease(ProcArrayLock); + + return oldestXidInCommit; +} + /* * GetOldestSafeDecodingTransactionId -- lowest xid not affected by vacuum * diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..b09486017f4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,23 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * The changes made by this and later transactions shouldn't be removed. + * This allows the detection of update_deleted conflicts when applying + * changes in this logical replication worker. + * + * The logical replication launcher manages an internal replication slot + * named "pg_conflict_detection". It asynchronously collects this ID to + * decide when to advance the xmin value of the slot. + * + * It's necessary to use FullTransactionId here to mitigate potential race + * conditions. Such scenarios might occur if the replication slot is not + * yet created by the launcher while the apply worker has already + * initialized this field. During this period, a transaction ID wraparound + * could falsely make this ID appear as if it originates from the future. + */ + FullTransactionId oldest_nonremovable_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 20777f7d5ae..d2452904ddb 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -121,9 +121,17 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend; * the checkpoint are actually destroyed on disk. Replay can cope with a file * or block that doesn't exist, but not with a block that has the wrong * contents. + * + * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but + * it explicitly indicates that the reason for delaying the checkpoint is due + * to a transaction being within a critical commit section. We need this to + * ensure all such transactions are finished before we allow the logical + * replication client to advance its xid which is used to hold back dead rows + * for conflict detection. */ #define DELAY_CHKPT_START (1<<0) #define DELAY_CHKPT_COMPLETE (1<<1) +#define DELAY_CHKPT_IN_COMMIT (DELAY_CHKPT_START | 1<<2) typedef enum { diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index ef0b733ebe8..bd8b17a6d0d 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -57,6 +57,7 @@ extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestNonRemovableTransactionId(Relation rel); extern TransactionId GetOldestTransactionIdConsideredRunning(void); extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestTransactionIdInCommit(void); extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index fb39c915d76..0bc5552b2b1 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2483,6 +2483,8 @@ RestrictInfo Result ResultRelInfo ResultState +RetainConflictInfoData +RetainConflictInfoPhase ReturnSetInfo ReturnStmt ReturningClause -- 2.30.0.windows.2