From 15ccf4a8c4dff76fd729dd6922162be34f57c69c Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Thu, 28 Mar 2024 20:19:22 +0800 Subject: [PATCH v4] advance the restart_lsn of synced slots using logical decoding --- src/backend/replication/logical/logical.c | 148 +++++++++++++++++- src/backend/replication/logical/slotsync.c | 75 ++++++--- src/backend/replication/slotfuncs.c | 118 +------------- src/include/replication/logical.h | 2 + .../t/040_standby_failover_slots_sync.pl | 23 ++- 5 files changed, 208 insertions(+), 158 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 51ffb623c0..bbc7cdaf50 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -36,6 +36,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/slotsync.h" #include "replication/snapbuild.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -516,17 +517,24 @@ CreateDecodingContext(XLogRecPtr start_lsn, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use physical replication slot for logical decoding"))); - if (slot->data.database != MyDatabaseId) + /* + * Do not allow decoding if the replication slot belongs to a different + * database unless we are in fast-forward mode. In fast-forward mode, we + * ignore storage-level changes and do not need to access the database + * object. + */ + if (slot->data.database != MyDatabaseId && !fast_forward) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("replication slot \"%s\" was not created in this database", NameStr(slot->data.name)))); /* - * Do not allow consumption of a "synchronized" slot until the standby - * gets promoted. + * Do not allow consumption of a "synchronized" slot until the standby gets + * promoted unless we are syncing replication slots, in which case we need + * to advance the LSN and xmin of the slot during decoding. */ - if (RecoveryInProgress() && slot->data.synced) + if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots()) ereport(ERROR, errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use replication slot \"%s\" for logical decoding", @@ -2034,3 +2042,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) return has_pending_wal; } + +/* + * Helper function for advancing our logical replication slot forward. + * + * The slot's restart_lsn is used as start point for reading records, while + * confirmed_flush is used as base point for the decoding context. + * + * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, + * because we need to digest WAL to advance restart_lsn allowing to recycle + * WAL and removal of old catalog tuples. As decoding is done in fast_forward + * mode, no changes are generated anyway. + * + * *ready_for_decoding will be set to true if the logical decoding reaches + * the consistent point; Otherwise, it will be set to false. + */ +XLogRecPtr +LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto, + bool *ready_for_decoding) +{ + LogicalDecodingContext *ctx; + ResourceOwner old_resowner = CurrentResourceOwner; + XLogRecPtr retlsn; + + Assert(moveto != InvalidXLogRecPtr); + + if (ready_for_decoding) + *ready_for_decoding = false; + + PG_TRY(); + { + /* + * Create our decoding context in fast_forward mode, passing start_lsn + * as InvalidXLogRecPtr, so that we start processing from my slot's + * confirmed_flush. + */ + ctx = CreateDecodingContext(InvalidXLogRecPtr, + NIL, + true, /* fast_forward */ + XL_ROUTINE(.page_read = read_local_xlog_page, + .segment_open = wal_segment_open, + .segment_close = wal_segment_close), + NULL, NULL, NULL); + + /* + * Wait for specified streaming replication standby servers (if any) + * to confirm receipt of WAL up to moveto lsn. + */ + WaitForStandbyConfirmation(moveto); + + /* + * Start reading at the slot's restart_lsn, which we know to point to + * a valid record. + */ + XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); + + /* invalidate non-timetravel entries */ + InvalidateSystemCaches(); + + /* Decode records until we reach the requested target */ + while (ctx->reader->EndRecPtr < moveto) + { + char *errm = NULL; + XLogRecord *record; + + /* + * Read records. No changes are generated in fast_forward mode, + * but snapbuilder/slot statuses are updated properly. + */ + record = XLogReadRecord(ctx->reader, &errm); + if (errm) + elog(ERROR, "could not find record while advancing replication slot: %s", + errm); + + /* + * Process the record. Storage-level changes are ignored in + * fast_forward mode, but other modules (such as snapbuilder) + * might still have critical updates to do. + */ + if (record) + LogicalDecodingProcessRecord(ctx, ctx->reader); + + CHECK_FOR_INTERRUPTS(); + } + + if (DecodingContextReady(ctx) && ready_for_decoding) + *ready_for_decoding = true; + + /* + * Logical decoding could have clobbered CurrentResourceOwner during + * transaction management, so restore the executor's value. (This is + * a kluge, but it's not worth cleaning up right now.) + */ + CurrentResourceOwner = old_resowner; + + if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) + { + LogicalConfirmReceivedLocation(moveto); + + /* + * If only the confirmed_flush LSN has changed the slot won't get + * marked as dirty by the above. Callers on the walsender + * interface are expected to keep track of their own progress and + * don't need it written out. But SQL-interface users cannot + * specify their own start positions and it's harder for them to + * keep track of their progress, so we should make more of an + * effort to save it for them. + * + * Dirty the slot so it is written out at the next checkpoint. The + * LSN position advanced to may still be lost on a crash but this + * makes the data consistent after a clean shutdown. + */ + ReplicationSlotMarkDirty(); + } + + retlsn = MyReplicationSlot->data.confirmed_flush; + + /* free context, call shutdown callback */ + FreeDecodingContext(ctx); + + InvalidateSystemCaches(); + } + PG_CATCH(); + { + /* clear all timetravel entries */ + InvalidateSystemCaches(); + + PG_RE_THROW(); + } + PG_END_TRY(); + + return retlsn; +} diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c5..df8b76c8ab 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -25,6 +25,12 @@ * which slot sync worker can perform the sync periodically or user can call * pg_sync_replication_slots() periodically to perform the syncs. * + * If synchronized slots fail to build a consistent snapshot from the + * restart_lsn, they would become unreliable after promotion due to potential + * data loss from changes before reaching a consistent point. So, we mark such + * slots as RS_TEMPORARY. Once they successfully reach the consistent point, + * they will be marked to RS_PERSISTENT. + * * The slot sync worker waits for some time before the next synchronization, * with the duration varying based on whether any slots were updated during * the last cycle. Refer to the comments above wait_for_slot_activity() for @@ -49,7 +55,7 @@ #include "postmaster/fork_process.h" #include "postmaster/interrupt.h" #include "postmaster/postmaster.h" -#include "replication/slot.h" +#include "replication/logical.h" #include "replication/slotsync.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -147,28 +153,46 @@ static void slotsync_failure_callback(int code, Datum arg); * * If no update was needed (the data of the remote slot is the same as the * local slot) return false, otherwise true. + * + * If the LSN of the slot is modified, the ready_for_decoding will be set to + * true if the slot can reach a consistent point; otherwise, it will be set to + * false. */ static bool -update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *ready_for_decoding) { ReplicationSlot *slot = MyReplicationSlot; - bool xmin_changed; - bool restart_lsn_changed; NameData plugin_name; + bool updated_lsn = false; Assert(slot->data.invalidated == RS_INVAL_NONE); - xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin); - restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn); + if (remote_slot->confirmed_lsn != slot->data.confirmed_flush) + { + /* + * By advancing the restart_lsn, confirmed_lsn, and xmin using + * fast-forward logical decoding, we can verify whether a consistent + * snapshot can be built. This process also involves saving necessary + * snapshots to disk during decoding, ensuring that logical decoding + * efficiently reaches a consistent point at the restart_lsn without + * the potential loss of data during snapshot creation. + * + * XXX we could optimize this by skipping logical decoding advancement + * if a logical snapshot at restart_lsn is already saved on disk. + */ + LogicalSlotAdvanceAndCheckReadynessForDecoding(remote_slot->confirmed_lsn, + ready_for_decoding); + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + updated_lsn = true; + } - if (!xmin_changed && - !restart_lsn_changed && - remote_dbid == slot->data.database && + if (remote_dbid == slot->data.database && remote_slot->two_phase == slot->data.two_phase && remote_slot->failover == slot->data.failover && - remote_slot->confirmed_lsn == slot->data.confirmed_flush && strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0) - return false; + return updated_lsn; /* Avoid expensive operations while holding a spinlock. */ namestrcpy(&plugin_name, remote_slot->plugin); @@ -178,18 +202,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) slot->data.database = remote_dbid; slot->data.two_phase = remote_slot->two_phase; slot->data.failover = remote_slot->failover; - slot->data.restart_lsn = remote_slot->restart_lsn; - slot->data.confirmed_flush = remote_slot->confirmed_lsn; - slot->data.catalog_xmin = remote_slot->catalog_xmin; - slot->effective_catalog_xmin = remote_slot->catalog_xmin; SpinLockRelease(&slot->mutex); - if (xmin_changed) - ReplicationSlotsComputeRequiredXmin(false); - - if (restart_lsn_changed) - ReplicationSlotsComputeRequiredLSN(); - return true; } @@ -413,6 +427,7 @@ static bool update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) { ReplicationSlot *slot = MyReplicationSlot; + bool ready_for_decoding = false; /* * Check if the primary server has caught up. Refer to the comment atop @@ -443,9 +458,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - /* First time slot update, the function must return true */ - if (!update_local_synced_slot(remote_slot, remote_dbid)) - elog(ERROR, "failed to update slot"); + (void) update_local_synced_slot(remote_slot, remote_dbid, + &ready_for_decoding); + + /* + * Don't persist the slot if it cannot reach the consistent point from the + * restart_lsn. + */ + if (!ready_for_decoding) + { + elog(DEBUG1, "The synced slot could not find consistent point from %X/%X", + LSN_FORMAT_ARGS(slot->data.restart_lsn)); + return false; + } ReplicationSlotPersist(); @@ -578,7 +603,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) LSN_FORMAT_ARGS(remote_slot->restart_lsn)); /* Make sure the slot changes persist across server restart */ - if (update_local_synced_slot(remote_slot, remote_dbid)) + if (update_local_synced_slot(remote_slot, remote_dbid, NULL)) { ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index da57177c25..4ef93ea05b 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto) } /* - * Helper function for advancing our logical replication slot forward. - * - * The slot's restart_lsn is used as start point for reading records, while - * confirmed_flush is used as base point for the decoding context. - * - * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush, - * because we need to digest WAL to advance restart_lsn allowing to recycle - * WAL and removal of old catalog tuples. As decoding is done in fast_forward - * mode, no changes are generated anyway. + * Advance our logical replication slot forward. See + * LogicalSlotAdvanceAndCheckReadynessForDecoding for details. */ static XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto) { - LogicalDecodingContext *ctx; - ResourceOwner old_resowner = CurrentResourceOwner; - XLogRecPtr retlsn; - - Assert(moveto != InvalidXLogRecPtr); - - PG_TRY(); - { - /* - * Create our decoding context in fast_forward mode, passing start_lsn - * as InvalidXLogRecPtr, so that we start processing from my slot's - * confirmed_flush. - */ - ctx = CreateDecodingContext(InvalidXLogRecPtr, - NIL, - true, /* fast_forward */ - XL_ROUTINE(.page_read = read_local_xlog_page, - .segment_open = wal_segment_open, - .segment_close = wal_segment_close), - NULL, NULL, NULL); - - /* - * Wait for specified streaming replication standby servers (if any) - * to confirm receipt of WAL up to moveto lsn. - */ - WaitForStandbyConfirmation(moveto); - - /* - * Start reading at the slot's restart_lsn, which we know to point to - * a valid record. - */ - XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn); - - /* invalidate non-timetravel entries */ - InvalidateSystemCaches(); - - /* Decode records until we reach the requested target */ - while (ctx->reader->EndRecPtr < moveto) - { - char *errm = NULL; - XLogRecord *record; - - /* - * Read records. No changes are generated in fast_forward mode, - * but snapbuilder/slot statuses are updated properly. - */ - record = XLogReadRecord(ctx->reader, &errm); - if (errm) - elog(ERROR, "could not find record while advancing replication slot: %s", - errm); - - /* - * Process the record. Storage-level changes are ignored in - * fast_forward mode, but other modules (such as snapbuilder) - * might still have critical updates to do. - */ - if (record) - LogicalDecodingProcessRecord(ctx, ctx->reader); - - CHECK_FOR_INTERRUPTS(); - } - - /* - * Logical decoding could have clobbered CurrentResourceOwner during - * transaction management, so restore the executor's value. (This is - * a kluge, but it's not worth cleaning up right now.) - */ - CurrentResourceOwner = old_resowner; - - if (ctx->reader->EndRecPtr != InvalidXLogRecPtr) - { - LogicalConfirmReceivedLocation(moveto); - - /* - * If only the confirmed_flush LSN has changed the slot won't get - * marked as dirty by the above. Callers on the walsender - * interface are expected to keep track of their own progress and - * don't need it written out. But SQL-interface users cannot - * specify their own start positions and it's harder for them to - * keep track of their progress, so we should make more of an - * effort to save it for them. - * - * Dirty the slot so it is written out at the next checkpoint. The - * LSN position advanced to may still be lost on a crash but this - * makes the data consistent after a clean shutdown. - */ - ReplicationSlotMarkDirty(); - } - - retlsn = MyReplicationSlot->data.confirmed_flush; - - /* free context, call shutdown callback */ - FreeDecodingContext(ctx); - - InvalidateSystemCaches(); - } - PG_CATCH(); - { - /* clear all timetravel entries */ - InvalidateSystemCaches(); - - PG_RE_THROW(); - } - PG_END_TRY(); - - return retlsn; + return LogicalSlotAdvanceAndCheckReadynessForDecoding(moveto, NULL); } /* diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index dc2df4ce92..f0abac2c75 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal); +extern XLogRecPtr LogicalSlotAdvanceAndCheckReadynessForDecoding(XLogRecPtr moveto, + bool *ready_for_decoding); #endif diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index f47bfd78eb..e7021050fc 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -280,11 +280,11 @@ is( $standby1->safe_psql( 'logical slot is re-synced'); # Reset the log_min_messages to the default value. -$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'"); -$primary->reload; +#$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'"); +#$primary->reload; -$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'"); -$standby1->reload; +#$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'"); +#$standby1->reload; ################################################## # Test that a synchronized slot can not be decoded, altered or dropped by the @@ -479,8 +479,8 @@ $subscriber1->safe_psql( $subscriber1->wait_for_subscription_sync; -# Do not allow any further advancement of the restart_lsn and -# confirmed_flush_lsn for the lsub1_slot. +# Do not allow any further advancement of the confirmed_flush_lsn for the +# lsub1_slot. $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE"); # Wait for the replication slot to become inactive on the publisher @@ -489,20 +489,15 @@ $primary->poll_query_until( "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'", 1); -# Get the restart_lsn for the logical slot lsub1_slot on the primary -my $primary_restart_lsn = $primary->safe_psql('postgres', - "SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); - # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary my $primary_flush_lsn = $primary->safe_psql('postgres', "SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';"); -# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced -# to the standby +# Confirm that confirmed_flush_lsn of lsub1_slot slot are synced to the standby ok( $standby1->poll_query_until( 'postgres', - "SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"), - 'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby'); + "SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"), + 'confirmed_flush_lsn of slot lsub1_slot synced to standby'); ################################################## # Test that logical failover replication slots wait for the specified -- 2.34.1