diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c17c6be040..ac95240366 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -157,6 +157,13 @@ static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; */ static XLogRecPtr sentPtr = 0; +/* + * These variables are used to avoid updating physical slot's restart_lsn + * if the record is crossing a boundary + */ +static XLogSegNo restartSegNo = 0; +static XLogRecPtr firstLsnInRestartSegNo = InvalidXLogRecPtr; + /* Buffers for constructing outgoing messages and processing reply messages. */ static StringInfoData output_message; static StringInfoData reply_message; @@ -1729,6 +1736,31 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn) ReplicationSlot *slot = MyReplicationSlot; Assert(lsn != InvalidXLogRecPtr); + + /* + * Avoid updating restart_lsn if the lsn we received is for a record that + * is in a new segNo. This is to avoid the restart_lsn incorrectly + * advancing if the previous record gets rewritten after an engine crash. + */ + if (!XLByteInSeg(lsn, restartSegNo)) + { + XLByteToSeg(lsn, restartSegNo); + firstLsnInRestartSegNo = lsn; + elog(DEBUG2, "lsn is not in restartSegNo, update to match"); + return; + } + + if (lsn <= firstLsnInRestartSegNo) + { + /* + * This means flushLsn hasn't advanced the past first flushLsn we received in the + * current restartSegNo so there's a chance it crossed a WAL boundary so we avoid updating + * restart_lsn until we get a new complete record that's been flushed across + */ + elog(DEBUG2, "lsn is not greater than firstLsnInFlushSeg, wait for a new record before updating slot's restart_lsn"); + return; + } + SpinLockAcquire(&slot->mutex); if (slot->data.restart_lsn != lsn) {