From 98ffc5a3705f3cf99f958c696541896825fb2a01 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Fri, 1 Jun 2018 14:37:50 -0400 Subject: [PATCH 2/2] Fix a couple of bugs with replication slot advancing feature A review of the code has showed up a couple of issues fixed by this commit: - Physical slots have been using the confirmed LSN position as a start comparison point which is always 0/0, instead use the restart LSN position (logical slots need to use the confirmed LSN position, which was correct). - The actual slot update was incorrect for both physical and logical slots. Physical slots need to use their restart_lsn as base comparison point (confirmed_flush was used because of previous point), and logical slots need to begin reading WAL from restart_lsn (confirmed_flush was used as well), while confirmed_flush is compiled depending on the decoding context and record read. - Never return 0/0 is a slot cannot be advanced. This way, if a slot is advanced while the activity is idle, then the same position is returned by to caller over and over without raising an error. Instead return the LSN the slot has been advanced to. With repetitive calls, the same position is returned. Note that as the slot is owned by the backend advancing it, then the read of those field is fine lockless, while updates need to happen while the slot mutex is held, so fix that on the way. Author: Michael Paquier Reviewed-by: Peter Jelinek, Simon Riggs Discussion: https://postgr.es/m/CANP8+jLyS=X-CAk59BJnsxKQfjwrmKicHQykyn52Qj-Q=9GLCw@mail.gmail.com Discussion: https://www.postgresql.org/message-id/2840048a-1184-417a-9da8-3299d207a1d7%40postgrespro.ru --- src/backend/replication/slotfuncs.c | 50 +++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d9e10263bb..63cada0786 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -318,32 +318,43 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) /* * Helper function for advancing physical replication slot forward. + * The LSN position to move to is compared simply to the slot's + * restart_lsn, knowing that any position older than that would be + * removed by successive checkpoints. */ static XLogRecPtr -pg_physical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +pg_physical_replication_slot_advance(XLogRecPtr moveto) { - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; + XLogRecPtr retlsn = startlsn; - SpinLockAcquire(&MyReplicationSlot->mutex); - if (MyReplicationSlot->data.restart_lsn < moveto) + if (startlsn < moveto) { + SpinLockAcquire(&MyReplicationSlot->mutex); MyReplicationSlot->data.restart_lsn = moveto; + SpinLockRelease(&MyReplicationSlot->mutex); retlsn = moveto; } - SpinLockRelease(&MyReplicationSlot->mutex); return retlsn; } /* * Helper function for advancing logical replication slot forward. + * The slot's restart_lsn is used as start point for reading records, + * while confirmed_lsn is used as base point for the decoding context. + * The LSN position to move to is checked by doing a per-record scan and + * logical decoding which makes sure that confirmed_lsn is updated to a + * LSN which allows the future slot consumer to get consistent logical + * changes. */ static XLogRecPtr -pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) +pg_logical_replication_slot_advance(XLogRecPtr moveto) { LogicalDecodingContext *ctx; ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr retlsn = InvalidXLogRecPtr; + XLogRecPtr startlsn = MyReplicationSlot->data.restart_lsn; + XLogRecPtr retlsn = startlsn; PG_TRY(); { @@ -384,7 +395,7 @@ pg_logical_replication_slot_advance(XLogRecPtr startlsn, XLogRecPtr moveto) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - /* check limits */ + /* Stop once the moving point wanted by caller has been reached */ if (moveto <= ctx->reader->EndRecPtr) break; @@ -441,7 +452,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) Name slotname = PG_GETARG_NAME(0); XLogRecPtr moveto = PG_GETARG_LSN(1); XLogRecPtr endlsn; - XLogRecPtr startlsn; + XLogRecPtr minlsn; TupleDesc tupdesc; Datum values[2]; bool nulls[2]; @@ -472,21 +483,32 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) /* Acquire the slot so we "own" it */ ReplicationSlotAcquire(NameStr(*slotname), true); - startlsn = MyReplicationSlot->data.confirmed_flush; - if (moveto < startlsn) + /* + * Check if the slot is not moving backwards. Physical slots rely simply + * on restart_lsn as a minimum point, while logical slots have confirmed + * consumption up to confirmed_lsn, meaning that in both cases data older + * than that is not available anymore. + */ + if (OidIsValid(MyReplicationSlot->data.database)) + minlsn = MyReplicationSlot->data.confirmed_flush; + else + minlsn = MyReplicationSlot->data.restart_lsn; + + if (moveto < minlsn) { ReplicationSlotRelease(); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot move slot to %X/%X, minimum is %X/%X", (uint32) (moveto >> 32), (uint32) moveto, - (uint32) (startlsn >> 32), (uint32) startlsn))); + (uint32) (minlsn >> 32), (uint32) minlsn))); } + /* Do the actual slot update, depending on the slot type */ if (OidIsValid(MyReplicationSlot->data.database)) - endlsn = pg_logical_replication_slot_advance(startlsn, moveto); + endlsn = pg_logical_replication_slot_advance(moveto); else - endlsn = pg_physical_replication_slot_advance(startlsn, moveto); + endlsn = pg_physical_replication_slot_advance(moveto); values[0] = NameGetDatum(&MyReplicationSlot->data.name); nulls[0] = false; -- 2.17.1