From b1e31a64e36ae27017813d7f1f602a0067c7cc39 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 20 Mar 2024 22:37:40 +0000 Subject: [PATCH v13 2/6] Track last_inactive_at for replication slots in shared memory --- src/backend/catalog/system_views.sql | 3 ++- src/backend/replication/slot.c | 16 ++++++++++++++++ src/backend/replication/slotfuncs.c | 7 ++++++- src/include/catalog/pg_proc.dat | 6 +++--- src/include/replication/slot.h | 3 +++ src/test/regress/expected/rules.out | 5 +++-- 6 files changed, 33 insertions(+), 7 deletions(-) diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index cd22dad959..2fa4272006 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1026,7 +1026,8 @@ CREATE VIEW pg_replication_slots AS L.conflicting, L.failover, L.synced, - L.invalidation_reason + L.invalidation_reason, + L.last_inactive_at FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index cdf0c450c5..146f0fbf84 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -409,6 +409,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; + slot->last_inactive_at = 0; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -622,6 +623,13 @@ retry: if (SlotIsLogical(s)) pgstat_acquire_replslot(s); + if (s->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&s->mutex); + s->last_inactive_at = 0; + SpinLockRelease(&s->mutex); + } + if (am_walsender) { ereport(log_replication_commands ? LOG : DEBUG1, @@ -691,6 +699,13 @@ ReplicationSlotRelease(void) ConditionVariableBroadcast(&slot->active_cv); } + if (slot->data.persistency == RS_PERSISTENT) + { + SpinLockAcquire(&slot->mutex); + slot->last_inactive_at = GetCurrentTimestamp(); + SpinLockRelease(&slot->mutex); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -2341,6 +2356,7 @@ RestoreSlotFromDisk(const char *name) slot->in_use = true; slot->active_pid = 0; + slot->last_inactive_at = 0; restored = true; break; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index dfaac999f1..2c33cc0c16 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -239,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 18 +#define PG_GET_REPLICATION_SLOTS_COLS 19 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -436,6 +436,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + if (slot_contents.last_inactive_at > 0) + values[i++] = TimestampTzGetDatum(slot_contents.last_inactive_at); + else + nulls[i++] = true; + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index cf116bc548..d89a223a60 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11133,9 +11133,9 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason,last_inactive_at}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 7f25a083ee..b4bb7f5e99 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -201,6 +201,9 @@ typedef struct ReplicationSlot * forcibly flushed or not. */ XLogRecPtr last_saved_confirmed_flush; + + /* When did this slot become inactive last time? */ + TimestampTz last_inactive_at; } ReplicationSlot; #define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid) diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 19c44c0cb7..88fbd6a53c 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1476,8 +1476,9 @@ pg_replication_slots| SELECT l.slot_name, l.conflicting, l.failover, l.synced, - l.invalidation_reason - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason) + l.invalidation_reason, + l.last_inactive_at + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason, last_inactive_at) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1