From d7133f583cf5fd101777b91bd85d4f0ecad8897a Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Fri, 21 Nov 2025 12:35:46 +0530 Subject: [PATCH v10 1/2] Add slotsync skip statistics This patch introduces two new columns slotsync_skip_count, slotsync_last_skip to pg_stat_replication_slots view. These columns indicates the number of time the slotsync was skipped and the last time at which slotsync was skipped. --- contrib/test_decoding/expected/stats.out | 12 +- doc/src/sgml/monitoring.sgml | 25 +++ src/backend/catalog/system_views.sql | 2 + src/backend/replication/logical/slotsync.c | 49 +++++ src/backend/utils/activity/pgstat_replslot.c | 25 +++ src/backend/utils/adt/pgstatfuncs.c | 18 +- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 3 + src/test/recovery/meson.build | 1 + .../recovery/t/050_slotsync_skip_stats.pl | 176 ++++++++++++++++++ src/test/regress/expected/rules.out | 4 +- 11 files changed, 307 insertions(+), 14 deletions(-) create mode 100644 src/test/recovery/t/050_slotsync_skip_stats.pl diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index 28da9123cc8..d0749bc0daf 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -78,17 +78,17 @@ SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, -- verify accessing/resetting stats for non-existent slot does something reasonable SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip_at | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+-----------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | (1 row) SELECT pg_stat_reset_replication_slot('do-not-exist'); ERROR: replication slot "do-not-exist" does not exist SELECT * FROM pg_stat_get_replication_slot('do-not-exist'); - slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | stats_reset ---------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+------------- - do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | + slot_name | spill_txns | spill_count | spill_bytes | stream_txns | stream_count | stream_bytes | mem_exceeded_count | total_txns | total_bytes | slotsync_skip_count | slotsync_last_skip_at | stats_reset +--------------+------------+-------------+-------------+-------------+--------------+--------------+--------------------+------------+-------------+---------------------+-----------------------+------------- + do-not-exist | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | 0 | | (1 row) -- spilling the xact diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 436ef0e8bd0..b054ebb4ade 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1659,6 +1659,31 @@ description | Waiting for a newly initialized WAL file to reach durable storage + + + slotsync_skip_countbigint + + + Number of times the slot synchronization is skipped. The value of this + column has no meaning on the primary server; it defaults to 0 for all + slots, but may (if leftover from a promoted standby) also have a + positive value. + + + + + + + slotsync_last_skip_attimestamp with time zone + + + Time at which last slot synchronization was skipped. The value of this + column has no meaning on the primary server; it defaults to NULL for all + slots, but may (if leftover from a promoted standby) contain a timestamp. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 95ad29a64b9..c77a5d15a2e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1076,6 +1076,8 @@ CREATE VIEW pg_stat_replication_slots AS s.mem_exceeded_count, s.total_txns, s.total_bytes, + s.slotsync_skip_count, + s.slotsync_last_skip_at, s.stats_reset FROM pg_replication_slots as r, LATERAL pg_stat_get_replication_slot(slot_name) as s diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 8b4afd87dc9..052117f0481 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -64,6 +64,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/injection_point.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -218,6 +219,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, LSN_FORMAT_ARGS(slot->data.restart_lsn), slot->data.catalog_xmin)); + /* Update slot sync skip stats */ + pgstat_report_replslotsync_skip(slot); + if (remote_slot_precedes) *remote_slot_precedes = true; @@ -277,6 +281,17 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.", LSN_FORMAT_ARGS(remote_slot->confirmed_lsn), LSN_FORMAT_ARGS(slot->data.confirmed_flush))); + + /* + * If found_consistent_snapshot is not NULL, a true value means + * the slot synchronization was successful, while a false value + * means it was skipped (see + * update_and_persist_local_synced_slot()). If + * found_consistent_snapshot is NULL, no such check exists, + * indicating slot synchronization is successful. + */ + if (found_consistent_snapshot && !(*found_consistent_snapshot)) + pgstat_report_replslotsync_skip(slot); } updated_xmin_or_lsn = true; @@ -580,6 +595,9 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * current location when recreating the slot in the next cycle. It may * take more time to create such a slot. Therefore, we keep this slot * and attempt the synchronization in the next cycle. + * + * We do not need to update the slot sync skip stats here as it will + * be already updated in function update_local_synced_slot. */ return false; } @@ -590,6 +608,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) */ if (!found_consistent_snapshot) { + /* + * We do not need to update the slot sync skip stats here as it will + * be already updated in function update_local_synced_slot. + */ ereport(LOG, errmsg("could not synchronize replication slot \"%s\"", remote_slot->name), errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", @@ -600,6 +622,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotPersist(); + /* + * For the success case we do not update the slot sync skip stats here as + * it is already be updated in update_local_synced_slot. + */ ereport(LOG, errmsg("newly created replication slot \"%s\" is sync-ready now", remote_slot->name)); @@ -634,6 +660,25 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) latestFlushPtr = GetStandbyFlushRecPtr(NULL); if (remote_slot->confirmed_lsn > latestFlushPtr) { + /* If slot is present on the local, update the slot sync skip stats */ + if ((slot = SearchNamedReplicationSlot(remote_slot->name, true))) + { + bool synced; + + SpinLockAcquire(&slot->mutex); + synced = slot->data.synced; + SpinLockRelease(&slot->mutex); + + if (synced) + { + ReplicationSlotAcquire(NameStr(slot->data.name), true, false); + + pgstat_report_replslotsync_skip(slot); + + ReplicationSlotRelease(); + } + } + /* * Can get here only if GUC 'synchronized_standby_slots' on the * primary server was not configured correctly. @@ -707,6 +752,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Skip the sync of an invalidated slot */ if (slot->data.invalidated != RS_INVAL_NONE) { + pgstat_report_replslotsync_skip(slot); + ReplicationSlotRelease(); return slot_updated; } @@ -939,6 +986,8 @@ synchronize_slots(WalReceiverConn *wrconn) if (started_tx) CommitTransactionCommand(); + INJECTION_POINT("slot-sync-skip", NULL); + return some_slot_updated; } diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index d210c261ac6..d9cc4ec2314 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -102,6 +102,31 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re pgstat_unlock_entry(entry_ref); } +/* + * Report replication slot sync skip statistics. + * + * We can rely on the stats for the slot to exist and to belong to this + * slot. We can only get here if pgstat_create_replslot() or + * pgstat_acquire_replslot() have already been called. + */ +void +pgstat_report_replslotsync_skip(ReplicationSlot *slot) +{ + PgStat_EntryRef *entry_ref; + PgStatShared_ReplSlot *shstatent; + PgStat_StatReplSlotEntry *statent; + + entry_ref = pgstat_get_entry_ref_locked(PGSTAT_KIND_REPLSLOT, InvalidOid, + ReplicationSlotIndex(slot), false); + shstatent = (PgStatShared_ReplSlot *) entry_ref->shared_stats; + statent = &shstatent->stats; + + statent->slotsync_skip_count += 1; + statent->slotsync_last_skip_at = GetCurrentTimestamp(); + + pgstat_unlock_entry(entry_ref); +} + /* * Report replication slot creation. * diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 3d98d064a94..46e103ce7c3 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2129,7 +2129,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slot(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 11 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 13 text *slotname_text = PG_GETARG_TEXT_P(0); NameData slotname; TupleDesc tupdesc; @@ -2160,7 +2160,11 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 10, "total_bytes", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 11, "slotsync_skip_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 12, "slotsync_last_skip_at", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 13, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2186,11 +2190,17 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[7] = Int64GetDatum(slotent->mem_exceeded_count); values[8] = Int64GetDatum(slotent->total_txns); values[9] = Int64GetDatum(slotent->total_bytes); + values[10] = Int64GetDatum(slotent->slotsync_skip_count); + + if (slotent->slotsync_last_skip_at == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(slotent->slotsync_last_skip_at); if (slotent->stat_reset_timestamp == 0) - nulls[10] = true; + nulls[12] = true; else - values[10] = TimestampTzGetDatum(slotent->stat_reset_timestamp); + values[12] = TimestampTzGetDatum(slotent->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index aaadfd8c748..b10809ba9b6 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5691,9 +5691,9 @@ { oid => '6169', descr => 'statistics: information about replication slot', proname => 'pg_stat_get_replication_slot', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'text', - proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,stats_reset}', + proallargtypes => '{text,text,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,mem_exceeded_count,total_txns,total_bytes,slotsync_skip_count,slotsync_last_skip_at,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, { oid => '6230', descr => 'statistics: check if a stats object exists', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index a68e725259a..144042c1940 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -400,6 +400,8 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter mem_exceeded_count; PgStat_Counter total_txns; PgStat_Counter total_bytes; + PgStat_Counter slotsync_skip_count; + TimestampTz slotsync_last_skip_at; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; @@ -745,6 +747,7 @@ extern PgStat_TableStatus *find_tabstat_entry(Oid rel_id); extern void pgstat_reset_replslot(const char *name); struct ReplicationSlot; extern void pgstat_report_replslot(struct ReplicationSlot *slot, const PgStat_StatReplSlotEntry *repSlotStat); +extern void pgstat_report_replslotsync_skip(struct ReplicationSlot *slot); extern void pgstat_create_replslot(struct ReplicationSlot *slot); extern void pgstat_acquire_replslot(struct ReplicationSlot *slot); extern void pgstat_drop_replslot(struct ReplicationSlot *slot); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index 523a5cd5b52..17551cf114a 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -58,6 +58,7 @@ tests += { 't/047_checkpoint_physical_slot.pl', 't/048_vacuum_horizon_floor.pl', 't/049_wait_for_lsn.pl', + 't/050_slotsync_skip_stats.pl', ], }, } diff --git a/src/test/recovery/t/050_slotsync_skip_stats.pl b/src/test/recovery/t/050_slotsync_skip_stats.pl new file mode 100644 index 00000000000..39ce9ef702b --- /dev/null +++ b/src/test/recovery/t/050_slotsync_skip_stats.pl @@ -0,0 +1,176 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +# Skip all tests if injection points are not supported in this build +if ($ENV{enable_injection_points} ne 'yes') +{ + plan skip_all => 'Injection points not supported by this build'; +} + +# Initialize the primary cluster +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); +$primary->append_conf( + 'postgresql.conf', qq{ +autovacuum = off +}); +$primary->start; + +# Check if the extension injection_points is available, as it may be +# possible that this script is run with installcheck, where the module +# would not be installed by default. +if (!$primary->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + +# Load the injection_points extension +$primary->safe_psql('postgres', q(CREATE EXTENSION injection_points)); + +# Take a backup of the primary for standby initialization +my $backup_name = 'backup'; +$primary->backup($backup_name); + +# Initialize standby from primary backup +my $standby = PostgreSQL::Test::Cluster->new('standby'); +$standby->init_from_backup($primary, $backup_name, has_streaming => 1); + +my $connstr = $primary->connstr; +$standby->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb1_slot' +primary_conninfo = '$connstr dbname=postgres' +)); + +# Create a physical replication slot on primary for standby +$primary->safe_psql('postgres', + q{SELECT pg_create_physical_replication_slot('sb1_slot');}); + +$standby->start; + +# Create a logical replication slot on primary for testing +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('slot_sync', 'test_decoding', false, false, true)" +); + +# Wait for standby to catch up +$primary->wait_for_replay_catchup($standby); + +# Initial sync of replication slots +$standby->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Verify slot is synced successfully +my $result = $standby->safe_psql('postgres', + "SELECT slotsync_skip_count FROM pg_stat_replication_slots WHERE slot_name = 'slot_sync'" +); +is($result, '0', "check slot sync skip count after initial sync"); + +# Update pg_hba.conf and restart the primary to reject streaming replication +# connections. WAL records won't be replicated to the standby until the +# configuration is restored. +unlink($primary->data_dir . '/pg_hba.conf'); +$primary->append_conf( + 'pg_hba.conf', qq{ +local all all trust +host all all 127.0.0.1/32 trust +host all all ::1/128 trust +}); +$primary->restart; + +# Advance the failover slot so that confirmed flush LSN of remote slot become +# ahead of standby's flushed LSN +$primary->safe_psql( + 'postgres', qq( + CREATE TABLE t1(a int); + INSERT INTO t1 VALUES(1); + SELECT pg_replication_slot_advance('slot_sync', pg_current_wal_lsn()); +)); + +my ($stdout, $stderr); +# Attempt to sync replication slots while standby is behind +($result, $stdout, $stderr) = + $standby->psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Verify pg_sync_replication_slots is failing +ok( $stderr =~ + qr/skipping slot synchronization because the received slot sync.*is ahead of the standby position/, + 'pg_sync_replication_slots failed as expected'); + +# Check slot sync skip count when standby is behind +$result = $standby->safe_psql('postgres', + "SELECT slotsync_skip_count FROM pg_stat_replication_slots WHERE slot_name = 'slot_sync'" +); +is($result, '1', "check slot sync skip count"); + +# Repeat sync to ensure skip count increments +($result, $stdout, $stderr) = + $standby->psql('postgres', "SELECT pg_sync_replication_slots();"); + +$result = $standby->safe_psql('postgres', + "SELECT slotsync_skip_count FROM pg_stat_replication_slots WHERE slot_name = 'slot_sync'" +); +is($result, '2', "check slot sync skip count"); + +# Restore streaming replication connection +$primary->append_conf( + 'pg_hba.conf', qq{ +local replication all trust +host replication all 127.0.0.1/32 trust +host replication all ::1/128 trust +}); +$primary->restart; + +# Wait for standby to catch up +$primary->wait_for_replay_catchup($standby); + +# Cleanup: drop the logical slot and ensure standby catches up +$primary->safe_psql('postgres', + "SELECT pg_drop_replication_slot('slot_sync')"); +$primary->wait_for_replay_catchup($standby); + +$standby->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Test for case when slot sync is skipped when the remote slot is +# behind the local slot. +$primary->safe_psql('postgres', + "SELECT pg_create_logical_replication_slot('slot_sync', 'test_decoding', false, false, true)" +); + +# Attach injection point to simulate wait +my $standby_psql = $standby->background_psql('postgres'); +$standby_psql->query_safe( + q(select injection_points_attach('slot-sync-skip','wait'))); + +# Initiate sync of failover slots +$standby_psql->query_until( + qr/slot_sync/, + q( +\echo slot_sync +select pg_sync_replication_slots(); +)); + +# Wait for backend to reach injection point +$standby->wait_for_event('client backend', 'slot-sync-skip'); + +# Logical slot is temporary and sync will skip because remote is behind +$result = $standby->safe_psql('postgres', + "SELECT slotsync_skip_count FROM pg_stat_replication_slots WHERE slot_name = 'slot_sync'" +); +is($result, '1', "check slot sync skip count"); + +# Detach injection point +$standby->safe_psql( + 'postgres', q{ + SELECT injection_points_detach('slot-sync-skip'); + SELECT injection_points_wakeup('slot-sync-skip'); +}); + +$standby_psql->quit; + +done_testing(); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 372a2188c22..adda7f425e2 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2151,9 +2151,11 @@ pg_stat_replication_slots| SELECT s.slot_name, s.mem_exceeded_count, s.total_txns, s.total_bytes, + s.slotsync_skip_count, + s.slotsync_last_skip_at, s.stats_reset FROM pg_replication_slots r, - LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, stats_reset) + LATERAL pg_stat_get_replication_slot((r.slot_name)::text) s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, mem_exceeded_count, total_txns, total_bytes, slotsync_skip_count, slotsync_last_skip_at, stats_reset) WHERE (r.datoid IS NOT NULL); pg_stat_slru| SELECT name, blks_zeroed, -- 2.34.1