From d4faaecca0e2790b9c0279f292865c780b16099c Mon Sep 17 00:00:00 2001 From: Zhijie Hou Date: Fri, 17 Jan 2025 14:08:02 +0800 Subject: [PATCH v33 1/7] Maintain the oldest non-removable transaction ID by apply worker This set of patches aims to support the detection of update_deleted conflicts, which occur when the apply worker cannot find the target tuple to be updated (e.g., the tuple has been removed by a different origin). To detect this conflict consistently and correctly, we must ensure that tuples deleted by other origins are not prematurely removed by VACUUM before conflict detection. If these tuples are removed too soon, a different conflict might arise and be resolved incorrectly, causing data inconsistency between nodes. To achieve this, we will retain the dead tuples on the subscriber for some period. The concept is that dead tuples are useful for detecting conflicts only during the application of concurrent transactions from remote nodes. After applying and flushing all remote transactions that occurred concurrently with the tuple DELETE, any subsequent UPDATE from a remote node should have a later timestamp. In such cases, it is acceptable to detect an update_missing scenario and convert the UPDATE to an INSERT when applying it. But, for concurrent remote transactions with earlier timestamps than the DELETE, detecting update_deleted is necessary, as the UPDATEs in remote transactions should be ignored if their timestamp is earlier than that of the dead tuples. We assume that the appropriate resolution for update_deleted conflicts, to achieve eventual consistency, is the last-update-win strategy. This means that when detecting the update_deleted conflict, and the remote update has a later timestamp, the resolution would be to convert the UPDATE to an INSERT. Remote updates with earlier timestamps compared to the dead tuples will be disregarded. To implement this, an additional replication slot named pg_conflict_detection will be created on the subscriber side and maintained by the launcher. This slot will be used to retain dead tuples. Each apply worker will maintain its own non-removable transaction ID, while the launcher collects these IDs to determine whether to advance the xmin value of the replication slot. The process of advancing the non-removable transaction ID in the apply worker involves: 1) Call GetOldestActiveTransactionId() to take oldestRunningXid as the candidate xid. 2) Send a message to the walsender requesting the publisher status, which includes the latest WAL write position and information about transactions that are in the commit phase. 3) Wait for the status from the walsender. After receiving the first status, do not proceed if there are concurrent remote transactions that are still in the commit phase. These transactions might have been assigned an earlier commit timestamp but have not yet written the commit WAL record. Continue to request the publisher status until all these transactions have completed. 4) Advance the non-removable transaction ID if the current flush location has reached or surpassed the last received WAL position. These steps are repeated at intervals that are dynamically adjusted based on whether a new transaction ID has been assigned since the last advancement. This mechanism ensures that dead tuples are not removed until all concurrent transactions have been applied. It works for both bidirectional and non-bidirectional replication scenarios. Since the mechanism relies on a single replication slot, it not only assists in retaining dead tuples but also preserves commit timestamps and origin data. These information will be displayed in the additional logs generated for logical replication conflicts. Furthermore, the preserved commit timestamps and origin data are essential for consistently detecting update_origin_differs and delete_origin_differs conflicts. These detections require comparing the origin data of local tuples with remote ones. If the origin data is removed prematurely due to vacuum freeze, it is impossible to detect these conflicts. This patch allows each apply worker to maintain the non-removable transaction ID in the shared memory following the steps described above. The actual replication slot management is implemented in the following patches. --- doc/src/sgml/protocol.sgml | 86 ++++ src/backend/access/transam/twophase.c | 26 +- src/backend/access/transam/xact.c | 12 +- src/backend/access/transam/xlog.c | 2 +- src/backend/replication/logical/launcher.c | 1 + src/backend/replication/logical/worker.c | 520 ++++++++++++++++++++- src/backend/replication/walsender.c | 56 +++ src/backend/storage/ipc/procarray.c | 11 +- src/include/replication/worker_internal.h | 11 + src/include/storage/proc.h | 8 + src/include/storage/procarray.h | 2 +- src/tools/pgindent/typedefs.list | 2 + 12 files changed, 724 insertions(+), 13 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index c4d3853cbf2..ed56df85ca7 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2643,6 +2643,65 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Primary status update (B) + + + + Byte1('s') + + + Identifies the message as a primary status update. + + + + + + Int64 + + + The latest WAL write position on the server. + + + + + + Int64 + + + The oldest transaction ID that is currently in the commit phase on + the server, along with its epoch. The most significant 32 bits are + the epoch. The least significant 32 bits are the transaction ID. + If no transactions are active on the server, this number will be + the next transaction ID to be assigned. + + + + + + Int64 + + + The next transaction ID to be assigned on the server, along with + its epoch. The most significant 32 bits are the epoch. The least + significant 32 bits are the transaction ID. + + + + + + Int64 + + + The server's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + @@ -2787,6 +2846,33 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Request primary status update (F) + + + + Byte1('p') + + + Identifies the message as a request for a primary status update. + + + + + + Int64 + + + The client's system clock at the time of transmission, as + microseconds since midnight on 2000-01-01. + + + + + + + diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 73a80559194..ca9e349e099 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1181,7 +1181,11 @@ EndPrepare(GlobalTransaction gxact) * starting immediately after the WAL record is inserted could complete * without fsync'ing our state file. (This is essentially the same kind * of race condition as the COMMIT-to-clog-write case that - * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.) + * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes + * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in + * the critical commit section. We need to know about such transactions + * for conflict detection in logical replication. See + * GetOldestActiveTransactionId(true) and its use. * * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. @@ -2286,7 +2290,7 @@ ProcessTwoPhaseBuffer(TransactionId xid, * RecordTransactionCommitPrepared * * This is basically the same as RecordTransactionCommit (q.v. if you change - * this function): in particular, we must set DELAY_CHKPT_START to avoid a + * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a * race condition. * * We know the transaction made at least one XLOG entry (its PREPARE), @@ -2306,7 +2310,7 @@ RecordTransactionCommitPrepared(TransactionId xid, const char *gid) { XLogRecPtr recptr; - TimestampTz committs = GetCurrentTimestamp(); + TimestampTz committs; bool replorigin; /* @@ -2319,8 +2323,18 @@ RecordTransactionCommitPrepared(TransactionId xid, START_CRIT_SECTION(); /* See notes in RecordTransactionCommit */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + /* + * Note it is important to set committs value after marking ourselves as + * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because + * we want to ensure all transactions that have acquired commit timestamp + * are finished before we allow the logical replication client to advance + * its xid which is used to hold back dead rows for conflict detection. + * See maybe_advance_nonremovable_xid. + */ + committs = GetCurrentTimestamp(); /* * Emit the XLOG commit record. Note that we mark 2PC commits as @@ -2369,7 +2383,7 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionIdCommitTree(xid, nchildren, children); /* Checkpoint can proceed now */ - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2e67e998adb..fa3f5a4315c 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1459,10 +1459,16 @@ RecordTransactionCommit(void) * without holding the ProcArrayLock, since we're the only one * modifying it. This makes checkpoint's determination of which xacts * are delaying the checkpoint a bit fuzzy, but it doesn't matter. + * + * Note, it is important to get the commit timestamp after marking the + * transaction in the commit critical section. See + * RecordTransactionCommitPrepared. */ - Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0); + Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0); START_CRIT_SECTION(); - MyProc->delayChkptFlags |= DELAY_CHKPT_START; + MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT; + + Assert(xactStopTimestamp == 0); /* * Insert the commit XLOG record. @@ -1565,7 +1571,7 @@ RecordTransactionCommit(void) */ if (markXidCommitted) { - MyProc->delayChkptFlags &= ~DELAY_CHKPT_START; + MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT; END_CRIT_SECTION(); } diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 1914859b2ee..5f19981f9c3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7142,7 +7142,7 @@ CreateCheckPoint(int flags) * starting snapshot of locks and transactions. */ if (!shutdown && XLogStandbyInfoActive()) - checkPoint.oldestActiveXid = GetOldestActiveTransactionId(); + checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false); else checkPoint.oldestActiveXid = InvalidTransactionId; diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 10677da56b2..9b155705dbb 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -441,6 +441,7 @@ retry: worker->stream_fileset = NULL; worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid; worker->parallel_apply = is_parallel_apply_worker; + worker->oldest_nonremovable_xid = InvalidTransactionId; worker->last_lsn = InvalidXLogRecPtr; TIMESTAMP_NOBEGIN(worker->last_send_time); TIMESTAMP_NOBEGIN(worker->last_recv_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index a23262957ac..a4f4772fdd5 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -179,6 +179,7 @@ #include "storage/buffile.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/dynahash.h" @@ -275,6 +276,62 @@ typedef enum TRANS_PARALLEL_APPLY, } TransApplyAction; +/* + * The phases involved in advancing the non-removable transaction ID. + * + * See maybe_advance_nonremovable_xid() for details of the transition + * between these phases. + */ +typedef enum +{ + RCI_GET_CANDIDATE_XID, + RCI_REQUEST_PUBLISHER_STATUS, + RCI_WAIT_FOR_PUBLISHER_STATUS, + RCI_WAIT_FOR_LOCAL_FLUSH +} RetainConflictInfoPhase; + +/* + * Critical information for managing phase transitions within the + * RetainConflictInfoPhase. + */ +typedef struct RetainConflictInfoData +{ + RetainConflictInfoPhase phase; /* current phase */ + XLogRecPtr remote_lsn; /* WAL write position on the publisher */ + FullTransactionId remote_oldestxid; /* oldest transaction ID that was in + * the commit phase on the publisher */ + FullTransactionId remote_nextxid; /* next transaction ID to be assigned + * on the publisher */ + TimestampTz reply_time; /* when the publisher responds with status */ + FullTransactionId remote_wait_for; /* publisher transaction ID that must + * be awaited to complete before + * entering the final phase + * (RCI_WAIT_FOR_LOCAL_FLUSH) */ + TransactionId candidate_xid; /* candidate for the non-removable + * transaction ID */ + TimestampTz flushpos_update_time; /* when the remote flush position was + * updated in final phase + * (RCI_WAIT_FOR_LOCAL_FLUSH) */ + + /* + * The following fields are used to determine the timing for the next + * round of transaction ID advancement. + */ + TimestampTz last_recv_time; /* when the last message was received */ + TimestampTz candidate_xid_time; /* when the candidate_xid is decided */ + int xid_advance_interval; /* how much time (ms) to wait before + * attempting to advance the + * non-removable transaction ID */ +} RetainConflictInfoData; + +/* + * The minimum (100ms) and maximum (3 minutes) intervals for advancing + * non-removable transaction IDs. The maximum interval is a bit arbitrary but + * is sufficient to not cause any undue network traffic. + */ +#define MIN_XID_ADVANCE_INTERVAL 100 +#define MAX_XID_ADVANCE_INTERVAL 180000 + /* errcontext tracker */ static ApplyErrorCallbackArg apply_error_callback_arg = { @@ -339,6 +396,13 @@ static XLogRecPtr skip_xact_finish_lsn = InvalidXLogRecPtr; /* BufFile handle of the current streaming file */ static BufFile *stream_fd = NULL; +/* + * The remote WAL position that has been applied and flushed locally. We record + * and use this information both while sending feedback to the server and + * advancing oldest_nonremovable_xid. + */ +static XLogRecPtr last_flushpos = InvalidXLogRecPtr; + typedef struct SubXactInfo { TransactionId xid; /* XID of the subxact */ @@ -379,6 +443,18 @@ static void stream_close_file(void); static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply); +static void maybe_advance_nonremovable_xid(RetainConflictInfoData *rci_data, + bool status_received); +static void process_rci_phase_transition(RetainConflictInfoData *rci_data, + bool status_received); +static void get_candidate_xid(RetainConflictInfoData *rci_data); +static void request_publisher_status(RetainConflictInfoData *rci_data); +static void wait_for_publisher_status(RetainConflictInfoData *rci_data, + bool status_received); +static void wait_for_local_flush(RetainConflictInfoData *rci_data); +static void adjust_xid_advance_interval(RetainConflictInfoData *rci_data, + bool new_xid_found); + static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo, @@ -3584,6 +3660,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received) bool ping_sent = false; TimeLineID tli; ErrorContextCallback errcallback; + RetainConflictInfoData rci_data = {0}; /* * Init the ApplyMessageContext which we clean up after each replication @@ -3662,6 +3739,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_recv_timestamp = GetCurrentTimestamp(); ping_sent = false; + rci_data.last_recv_time = last_recv_timestamp; + /* Ensure we are reading the data into our memory context. */ MemoryContextSwitchTo(ApplyMessageContext); @@ -3688,6 +3767,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) UpdateWorkerStats(last_received, send_time, false); apply_dispatch(&s); + + maybe_advance_nonremovable_xid(&rci_data, false); } else if (c == 'k') { @@ -3703,8 +3784,31 @@ LogicalRepApplyLoop(XLogRecPtr last_received) last_received = end_lsn; send_feedback(last_received, reply_requested, false); + + maybe_advance_nonremovable_xid(&rci_data, false); + UpdateWorkerStats(last_received, timestamp, true); } + else if (c == 's') /* Primary status update */ + { + rci_data.remote_lsn = pq_getmsgint64(&s); + rci_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rci_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s)); + rci_data.reply_time = pq_getmsgint64(&s); + + /* + * This should never happen, see + * ProcessStandbyPSRequestMessage. But if it happens + * due to a bug, we don't want to proceed as it can + * incorrectly advance oldest_nonremovable_xid. + */ + if (XLogRecPtrIsInvalid(rci_data.remote_lsn)) + elog(ERROR, "cannot get the latest WAL position from the publisher"); + + maybe_advance_nonremovable_xid(&rci_data, true); + + UpdateWorkerStats(last_received, rci_data.reply_time, false); + } /* other message types are purposefully ignored */ MemoryContextReset(ApplyMessageContext); @@ -3717,6 +3821,11 @@ LogicalRepApplyLoop(XLogRecPtr last_received) /* confirm all writes so far */ send_feedback(last_received, false, false); + /* Reset the timestamp if no message was received */ + rci_data.last_recv_time = 0; + + maybe_advance_nonremovable_xid(&rci_data, false); + if (!in_remote_transaction && !in_streamed_transaction) { /* @@ -3751,6 +3860,14 @@ LogicalRepApplyLoop(XLogRecPtr last_received) else wait_time = NAPTIME_PER_CYCLE; + /* + * Ensure to wake up when it's possible to advance the non-removable + * transaction ID. + */ + if (rci_data.phase == RCI_GET_CANDIDATE_XID && + rci_data.xid_advance_interval) + wait_time = Min(wait_time, rci_data.xid_advance_interval); + rc = WaitLatchOrSocket(MyLatch, WL_SOCKET_READABLE | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, @@ -3814,6 +3931,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) send_feedback(last_received, requestReply, requestReply); + maybe_advance_nonremovable_xid(&rci_data, false); + /* * Force reporting to ensure long idle periods don't lead to * arbitrarily delayed stats. Stats can only be reported outside @@ -3849,7 +3968,6 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) static XLogRecPtr last_recvpos = InvalidXLogRecPtr; static XLogRecPtr last_writepos = InvalidXLogRecPtr; - static XLogRecPtr last_flushpos = InvalidXLogRecPtr; XLogRecPtr writepos; XLogRecPtr flushpos; @@ -3927,6 +4045,406 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply) last_flushpos = flushpos; } +/* + * Attempt to advance the non-removable transaction ID. + * + * The oldest_nonremovable_xid is maintained in shared memory to prevent dead + * rows from being removed prematurely when the apply worker still needs them + * to detect update_deleted conflicts. Additionally, this helps to retain + * the required commit_ts module information, which further helps to detect + * update_origin_differs and delete_origin_differs conflicts reliably, as + * otherwise, vacuum freeze could remove the required information. + * + * The non-removable transaction ID is advanced to the oldest running + * transaction ID once all concurrent transactions on the publisher have been + * applied and flushed locally. The process involves: + * + * - RCI_GET_CANDIDATE_XID: + * Call GetOldestActiveTransactionId() to take oldestRunningXid as the + * candidate xid. + * + * - RCI_REQUEST_PUBLISHER_STATUS: + * Send a message to the walsender requesting the publisher status, which + * includes the latest WAL write position and information about transactions + * that are in the commit phase. + * + * - RCI_WAIT_FOR_PUBLISHER_STATUS: + * Wait for the status from the walsender. After receiving the first status, + * do not proceed if there are concurrent remote transactions that are still + * in the commit phase. These transactions might have been assigned an + * earlier commit timestamp but have not yet written the commit WAL record. + * Continue to request the publisher status (RCI_REQUEST_PUBLISHER_STATUS) + * until all these transactions have completed. + * + * - RCI_WAIT_FOR_LOCAL_FLUSH: + * Advance the non-removable transaction ID if the current flush location has + * reached or surpassed the last received WAL position. + * + * The overall state progression is: GET_CANDIDATE_XID -> + * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to + * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) -> + * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID. + * + * Retaining the dead tuples for this period is sufficient for ensuring + * eventual consistency using last-update-wins strategy, as dead tuples are + * useful for detecting conflicts only during the application of concurrent + * transactions from remote nodes. After applying and flushing all remote + * transactions that occurred concurrently with the tuple DELETE, any + * subsequent UPDATE from a remote node should have a later timestamp. In such + * cases, it is acceptable to detect an update_missing scenario and convert the + * UPDATE to an INSERT when applying it. But, for concurrent remote + * transactions with earlier timestamps than the DELETE, detecting + * update_deleted is necessary, as the UPDATEs in remote transactions should be + * ignored if their timestamp is earlier than that of the dead tuples. + * + * Note that advancing the non-removable transaction ID is not supported if the + * publisher is also a physical standby. This is because the logical walsender + * on the standby can only get the WAL replay position but there may be more + * WALs that are being replicated from the primary and those WALs could have + * earlier commit timestamp. + * + * XXX It seems feasible to get the latest commit's WAL location from the + * publisher and wait till that is applied. However, we can't do that + * because commit timestamps can regress as a commit with a later LSN is not + * guaranteed to have a later timestamp than those with earlier LSNs. Having + * said that, even if that is possible, it won't improve performance much as + * the apply always lag and moves slowly as compared with the transactions + * on the publisher. + */ +static void +maybe_advance_nonremovable_xid(RetainConflictInfoData *rci_data, + bool status_received) +{ + /* + * It is sufficient to manage non-removable transaction ID for a + * subscription by the main apply worker to detect update_deleted conflict + * even for table sync or parallel apply workers. + */ + if (!am_leader_apply_worker()) + return; + + process_rci_phase_transition(rci_data, status_received); +} + +/* + * Process phase transitions during the non-removable transaction ID + * advancement. See comments atop of maybe_advance_nonremovable_xid() for + * details of the transition. + */ +static void +process_rci_phase_transition(RetainConflictInfoData *rci_data, + bool status_received) +{ + switch (rci_data->phase) + { + case RCI_GET_CANDIDATE_XID: + get_candidate_xid(rci_data); + break; + case RCI_REQUEST_PUBLISHER_STATUS: + request_publisher_status(rci_data); + break; + case RCI_WAIT_FOR_PUBLISHER_STATUS: + wait_for_publisher_status(rci_data, status_received); + break; + case RCI_WAIT_FOR_LOCAL_FLUSH: + wait_for_local_flush(rci_data); + break; + } +} + +/* + * Workhorse for the RCI_GET_CANDIDATE_XID phase. + */ +static void +get_candidate_xid(RetainConflictInfoData *rci_data) +{ + TransactionId oldest_running_xid; + TimestampTz now; + + /* + * Use last_recv_time when applying changes in the loop to avoid + * unnecessary system time retrieval. If last_recv_time is not available, + * obtain the current timestamp. + */ + now = rci_data->last_recv_time ? rci_data->last_recv_time : GetCurrentTimestamp(); + + /* + * Compute the candidate_xid and request the publisher status at most once + * per xid_advance_interval. Refer to adjust_xid_advance_interval() for + * details on how this value is dynamically adjusted. This is to avoid + * using CPU and network resources without making much progress. + */ + if (!TimestampDifferenceExceeds(rci_data->candidate_xid_time, now, + rci_data->xid_advance_interval)) + return; + + /* + * Immediately update the timer, even if the function returns later + * without setting candidate_xid due to inactivity on the subscriber. This + * avoids frequent calls to GetOldestActiveTransactionId. + */ + rci_data->candidate_xid_time = now; + + oldest_running_xid = GetOldestActiveTransactionId(false); + + /* + * Oldest active transaction ID (oldest_running_xid) can't be behind any of + * its previously computed value. + */ + Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)); + + /* Return if the oldest_nonremovable_xid cannot be advanced */ + if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid, + oldest_running_xid)) + { + adjust_xid_advance_interval(rci_data, false); + return; + } + + adjust_xid_advance_interval(rci_data, true); + + rci_data->candidate_xid = oldest_running_xid; + rci_data->phase = RCI_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rci_phase_transition(rci_data, false); +} + +/* + * Workhorse for the RCI_REQUEST_PUBLISHER_STATUS phase. + */ +static void +request_publisher_status(RetainConflictInfoData *rci_data) +{ + static StringInfo request_message = NULL; + + if (!request_message) + { + MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext); + + request_message = makeStringInfo(); + MemoryContextSwitchTo(oldctx); + } + else + resetStringInfo(request_message); + + /* + * Send the current time to update the remote walsender's latest reply + * message received time. + */ + pq_sendbyte(request_message, 'p'); + pq_sendint64(request_message, GetCurrentTimestamp()); + + elog(DEBUG2, "sending publisher status request message"); + + /* Send a request for the publisher status */ + walrcv_send(LogRepWorkerWalRcvConn, + request_message->data, request_message->len); + + rci_data->phase = RCI_WAIT_FOR_PUBLISHER_STATUS; + + /* + * Skip calling maybe_advance_nonremovable_xid() since further transition + * is possible only once we receive the publisher status message. + */ +} + +/* + * Workhorse for the RCI_WAIT_FOR_PUBLISHER_STATUS phase. + */ +static void +wait_for_publisher_status(RetainConflictInfoData *rci_data, + bool status_received) +{ + /* + * Return if we have requested but not yet received the publisher status. + */ + if (!status_received) + return; + + if (!FullTransactionIdIsValid(rci_data->remote_wait_for)) + rci_data->remote_wait_for = rci_data->remote_nextxid; + + /* + * Check if all remote concurrent transactions that were active at the + * first status request have now completed. If completed, proceed to the + * next phase; otherwise, continue checking the publisher status until + * these transactions finish. + * + * It's possible that transactions in the commit phase during the last + * cycle have now finished committing, but remote_oldestxid remains older + * than remote_wait_for. This can happen if some old transaction came in the + * commit phase when we requested status in this cycle. We do not handle + * this case explicitly as it's rare and the benefit doesn't justify the + * required complexity. Tracking would require either caching all xids at + * the publisher or sending them to subscribers. The condition will + * resolve naturally once the remaining transactions are finished. + * + * Directly advancing the non-removable transaction ID is possible if + * there are no activities on the publisher since the last advancement + * cycle. However, it requires maintaining two fields, last_remote_nextxid + * and last_remote_lsn, within the structure for comparison with the + * current cycle's values. Considering the minimal cost of continuing in + * RCI_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to + * advance the transaction ID here. + */ + if (FullTransactionIdPrecedesOrEquals(rci_data->remote_wait_for, + rci_data->remote_oldestxid)) + rci_data->phase = RCI_WAIT_FOR_LOCAL_FLUSH; + else + rci_data->phase = RCI_REQUEST_PUBLISHER_STATUS; + + /* process the next phase */ + process_rci_phase_transition(rci_data, false); +} + +/* + * Workhorse for the RCI_WAIT_FOR_LOCAL_FLUSH phase. + */ +static void +wait_for_local_flush(RetainConflictInfoData *rci_data) +{ + Assert(!XLogRecPtrIsInvalid(rci_data->remote_lsn) && + TransactionIdIsValid(rci_data->candidate_xid)); + + /* + * We expect the publisher and subscriber clocks to be in sync using time + * sync service like NTP. Otherwise, we will advance this worker's + * oldest_nonremovable_xid prematurely, leading to the removal of rows + * required to detect update_delete conflict. This check primarily + * addresses scenarios where the publisher's clock falls behind; if the + * publisher's clock is ahead, subsequent transactions will naturally bear + * later commit timestamps, conforming to the design outlined atop of + * maybe_advance_nonremovable_xid(). + * + * XXX Consider waiting for the publisher's clock to catch up with the + * subscriber's before proceeding to the next phase. + */ + if (TimestampDifferenceExceeds(rci_data->reply_time, + rci_data->candidate_xid_time, 0)) + ereport(ERROR, + errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"), + errdetail_internal("The clock on the publisher is behind that of the subscriber.")); + + /* + * Do not attempt to advance the non-removable transaction ID when table + * sync is in progress. During this time, changes from a single + * transaction may be applied by multiple table sync workers corresponding + * to the target tables. So, it's necessary for all table sync workers to + * apply and flush the corresponding changes before advancing the + * transaction ID, otherwise, dead tuples that are still needed for + * conflict detection in table sync workers could be removed prematurely. + * However, confirming the apply and flush progress across all table sync + * workers is complex and not worth the effort, so we simply return if not + * all tables are in the READY state. + * + * It is safe to add new tables with initial states to the subscription + * after this check because any changes applied to these tables should + * have a WAL position greater than the rci_data->remote_lsn. + */ + if (!AllTablesyncsReady()) + return; + + /* + * Update and check the remote flush position if we are applying changes + * in a loop. This is done at most once per WalWriterDelay to avoid + * performing costly operations in get_flush_position() too frequently + * during change application. + */ + if (last_flushpos < rci_data->remote_lsn && rci_data->last_recv_time && + TimestampDifferenceExceeds(rci_data->flushpos_update_time, + rci_data->last_recv_time, WalWriterDelay)) + { + XLogRecPtr writepos; + XLogRecPtr flushpos; + bool have_pending_txes; + + /* Fetch the latest remote flush position */ + get_flush_position(&writepos, &flushpos, &have_pending_txes); + + if (flushpos > last_flushpos) + last_flushpos = flushpos; + + rci_data->flushpos_update_time = rci_data->last_recv_time; + } + + /* Return to wait for the changes to be applied */ + if (last_flushpos < rci_data->remote_lsn) + return; + + /* + * Reaching here means the remote WAL position has been received, and all + * transactions up to that position on the publisher have been applied and + * flushed locally. So, we can advance the non-removable transaction ID. + */ + SpinLockAcquire(&MyLogicalRepWorker->relmutex); + MyLogicalRepWorker->oldest_nonremovable_xid = rci_data->candidate_xid; + SpinLockRelease(&MyLogicalRepWorker->relmutex); + + elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u", + LSN_FORMAT_ARGS(rci_data->remote_lsn), + rci_data->candidate_xid); + + /* + * Reset all data fields except those used to determine the timing for the + * next round of transaction ID advancement. We can even use + * flushpos_update_time in the next round to decide whether to get the + * latest flush position. + */ + rci_data->phase = RCI_GET_CANDIDATE_XID; + rci_data->remote_lsn = InvalidXLogRecPtr; + rci_data->remote_oldestxid = InvalidFullTransactionId; + rci_data->remote_nextxid = InvalidFullTransactionId; + rci_data->reply_time = 0; + rci_data->remote_wait_for = InvalidFullTransactionId; + rci_data->candidate_xid = InvalidTransactionId; + + /* process the next phase */ + process_rci_phase_transition(rci_data, false); +} + +/* + * Adjust the interval for advancing non-removable transaction IDs. + * + * We double the interval to try advancing the non-removable transaction IDs + * if there is no activity on the node. The maximum value of the interval is + * capped by wal_receiver_status_interval if it is not zero, otherwise to a + * 3 minutes which should be sufficient to avoid using CPU or network + * resources without much benefit. + * + * The interval is reset to a minimum value of 100ms once there is some + * activity on the node. + * + * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can + * consider the other interval or a separate GUC if the need arises. + */ +static void +adjust_xid_advance_interval(RetainConflictInfoData *rci_data, bool new_xid_found) +{ + if (!new_xid_found && rci_data->xid_advance_interval) + { + int max_interval = wal_receiver_status_interval + ? wal_receiver_status_interval * 1000 + : MAX_XID_ADVANCE_INTERVAL; + + /* + * No new transaction ID has been assigned since the last check, so + * double the interval, but not beyond the maximum allowable value. + */ + rci_data->xid_advance_interval = Min(rci_data->xid_advance_interval * 2, + max_interval); + } + else + { + /* + * A new transaction ID was found or the interval is not yet + * initialized, so set the interval to the minimum value. + */ + rci_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL; + } +} + /* * Exit routine for apply workers due to subscription parameter changes. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f2c33250e8b..466bef3e927 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -84,6 +84,7 @@ #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/procarray.h" #include "tcop/dest.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -258,6 +259,7 @@ static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); +static void ProcessStandbyPSRequestMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); @@ -2355,6 +2357,10 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; + case 'p': + ProcessStandbyPSRequestMessage(); + break; + default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -2701,6 +2707,56 @@ ProcessStandbyHSFeedbackMessage(void) } } +/* + * Process the request for a primary status update message. + */ +static void +ProcessStandbyPSRequestMessage(void) +{ + XLogRecPtr lsn = InvalidXLogRecPtr; + TransactionId oldestXidInCommit; + FullTransactionId nextFullXid; + FullTransactionId fullOldestXidInCommit; + WalSnd *walsnd = MyWalSnd; + TimestampTz replyTime; + + /* + * This shouldn't happen because we don't support getting primary status + * message from standby. + */ + if (RecoveryInProgress()) + elog(ERROR, "the primary status is unavailable during recovery"); + + replyTime = pq_getmsgint64(&reply_message); + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + + oldestXidInCommit = GetOldestActiveTransactionId(true); + nextFullXid = ReadNextFullTransactionId(); + fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid, + oldestXidInCommit); + lsn = GetXLogWriteRecPtr(); + + elog(DEBUG2, "sending primary status"); + + /* construct the message... */ + resetStringInfo(&output_message); + pq_sendbyte(&output_message, 's'); + pq_sendint64(&output_message, lsn); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit)); + pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid)); + pq_sendint64(&output_message, GetCurrentTimestamp()); + + /* ... and send it wrapped in CopyData */ + pq_putmessage_noblock('d', output_message.data, output_message.len); +} + /* * Compute how long send/receive loops should sleep. * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index e5b945a9ee3..ce87f40e986 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -2875,9 +2875,12 @@ GetRunningTransactionData(void) * We don't worry about updating other counters, we want to keep this as * simple as possible and leave GetSnapshotData() as the primary code for * that bookkeeping. + * + * inCommitOnly indicates getting the oldestActiveXid among the transactions + * in the commit critical section. */ TransactionId -GetOldestActiveTransactionId(void) +GetOldestActiveTransactionId(bool inCommitOnly) { ProcArrayStruct *arrayP = procArray; TransactionId *other_xids = ProcGlobal->xids; @@ -2904,6 +2907,8 @@ GetOldestActiveTransactionId(void) for (index = 0; index < arrayP->numProcs; index++) { TransactionId xid; + int pgprocno = arrayP->pgprocnos[index]; + PGPROC *proc = &allProcs[pgprocno]; /* Fetch xid just once - see GetNewTransactionId */ xid = UINT32_ACCESS_ONCE(other_xids[index]); @@ -2911,6 +2916,10 @@ GetOldestActiveTransactionId(void) if (!TransactionIdIsNormal(xid)) continue; + if (inCommitOnly && + (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0) + continue; + if (TransactionIdPrecedes(xid, oldestRunningXid)) oldestRunningXid = xid; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 30b2775952c..c9ef5259b68 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -86,6 +86,17 @@ typedef struct LogicalRepWorker /* Indicates whether apply can be performed in parallel. */ bool parallel_apply; + /* + * The changes made by this and later transactions shouldn't be removed. + * This allows the detection of update_deleted conflicts when applying + * changes in this logical replication worker. + * + * The logical replication launcher manages an internal replication slot + * named "pg_conflict_detection". It asynchronously collects this ID to + * decide when to advance the xmin value of the slot. + */ + TransactionId oldest_nonremovable_xid; + /* Stats. */ XLogRecPtr last_lsn; TimestampTz last_send_time; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 9f9b3fcfbf1..c6f5ebceefd 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -130,9 +130,17 @@ extern PGDLLIMPORT int FastPathLockGroupsPerBackend; * the checkpoint are actually destroyed on disk. Replay can cope with a file * or block that doesn't exist, but not with a block that has the wrong * contents. + * + * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but + * it explicitly indicates that the reason for delaying the checkpoint is due + * to a transaction being within a critical commit section. We need this new + * flag to ensure all the transactions that have acquired commit timestamp are + * finished before we allow the logical replication client to advance its xid + * which is used to hold back dead rows for conflict detection. */ #define DELAY_CHKPT_START (1<<0) #define DELAY_CHKPT_COMPLETE (1<<1) +#define DELAY_CHKPT_IN_COMMIT (DELAY_CHKPT_START | 1<<2) typedef enum { diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index ef0b733ebe8..e61d77534a0 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -56,7 +56,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestNonRemovableTransactionId(Relation rel); extern TransactionId GetOldestTransactionIdConsideredRunning(void); -extern TransactionId GetOldestActiveTransactionId(void); +extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly); extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index a8346cda633..b77253e35e4 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2554,6 +2554,8 @@ RestrictInfo Result ResultRelInfo ResultState +RetainConflictInfoData +RetainConflictInfoPhase ReturnSetInfo ReturnStmt ReturningClause -- 2.30.0.windows.2