From 44d6a8528e22e5882319d1308c74946b95922bfd Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 24 Mar 2023 03:23:15 +0000 Subject: [PATCH v9] Add LOG messages when replication slots become active and inactive These logs will be extremely useful on production servers to debug and analyze inactive replication slot issues. --- doc/src/sgml/config.sgml | 7 +++-- src/backend/replication/slot.c | 49 +++++++++++++++++++++++++++++ src/backend/replication/walsender.c | 15 +++++++++ src/backend/utils/misc/guc_tables.c | 2 +- src/include/replication/slot.h | 2 ++ 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 481f93cea1..f9b0d3c9a9 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -7593,9 +7593,10 @@ log_line_prefix = '%m [%p] %q%u@%d/%a ' - Causes each replication command to be logged in the server log. - See for more information about - replication command. The default value is off. + Causes each replication command and related activity to be logged in + the server log. See for more + information about replication command. The default value is + off. Only superusers and users with the appropriate SET privilege can change this setting. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 2293c0c6fc..3e363eaad8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -180,7 +180,14 @@ ReplicationSlotShmemExit(int code, Datum arg) { /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) + { + char *slotname; + + slotname = pstrdup(NameStr(MyReplicationSlot->data.name)); ReplicationSlotRelease(); + LogReplicationSlotRelease(SlotIsPhysical(MyReplicationSlot), slotname); + pfree(slotname); + } /* Also cleanup all the temporary slots. */ ReplicationSlotCleanup(); @@ -445,6 +452,9 @@ ReplicationSlotName(int index, Name name) * * An error is raised if nowait is true and the slot is currently in use. If * nowait is false, we sleep until the slot is released by the owning process. + * + * Note: use LogReplicationSlotAcquire() if needed, to log a message after + * acquiring the replication slot. */ void ReplicationSlotAcquire(const char *name, bool nowait) @@ -542,6 +552,9 @@ retry: * * This or another backend can re-acquire the slot later. * Resources this slot requires will be preserved. + * + * Note: use LogReplicationSlotRelease() if needed, to log a message after + * releasing the replication slot. */ void ReplicationSlotRelease(void) @@ -596,6 +609,42 @@ ReplicationSlotRelease(void) LWLockRelease(ProcArrayLock); } +/* + * Helper function to log a message after replication slot acquisition. + */ +void +LogReplicationSlotAcquire(bool is_physical, const char *slotname) +{ + Assert(MyReplicationSlot != NULL); + + if (is_physical) + ereport(log_replication_commands ? LOG : DEBUG3, + errmsg("physical replication slot \"%s\" is acquired", + slotname)); + else + ereport(log_replication_commands ? LOG : DEBUG3, + errmsg("logical replication slot \"%s\" is acquired", + slotname)); +} + +/* + * Helper function to log a message after replication slot release. + */ +void +LogReplicationSlotRelease(bool is_physical, const char *slotname) +{ + Assert(MyReplicationSlot == NULL); + + if (is_physical) + ereport(log_replication_commands ? LOG : DEBUG3, + errmsg("physical replication slot \"%s\" is released", + slotname)); + else + ereport(log_replication_commands ? LOG : DEBUG3, + errmsg("logical replication slot \"%s\" is released", + slotname)); +} + /* * Cleanup all temporary slots created in current session. */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 75e8363e24..97ecb0b7d5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -322,7 +322,14 @@ WalSndErrorCleanup(void) wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) + { + char *slotname; + + slotname = pstrdup(NameStr(MyReplicationSlot->data.name)); ReplicationSlotRelease(); + LogReplicationSlotRelease(SlotIsPhysical(MyReplicationSlot), slotname); + pfree(slotname); + } ReplicationSlotCleanup(); @@ -703,6 +710,8 @@ StartReplication(StartReplicationCmd *cmd) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot use a logical replication slot for physical replication"))); + LogReplicationSlotAcquire(true, cmd->slotname); + /* * We don't need to verify the slot's restart_lsn here; instead we * rely on the caller requesting the starting point to use. If the @@ -843,7 +852,10 @@ StartReplication(StartReplicationCmd *cmd) } if (cmd->slotname) + { ReplicationSlotRelease(); + LogReplicationSlotRelease(true, cmd->slotname); + } /* * Copy is finished now. Send a single-row result set indicating the next @@ -1260,6 +1272,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) cmd->slotname), errdetail("This slot has been invalidated because it exceeded the maximum reserved size."))); + LogReplicationSlotAcquire(false, cmd->slotname); + /* * Force a disconnect, so that the decoding code doesn't need to care * about an eventual switch from running in recovery, to running in a @@ -1321,6 +1335,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) FreeDecodingContext(logical_decoding_ctx); ReplicationSlotRelease(); + LogReplicationSlotRelease(false, cmd->slotname); replication_active = false; if (got_STOPPING) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 1c0583fe26..ea41ccc7e2 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1203,7 +1203,7 @@ struct config_bool ConfigureNamesBool[] = }, { {"log_replication_commands", PGC_SUSET, LOGGING_WHAT, - gettext_noop("Logs each replication command."), + gettext_noop("Logs each replication command and related activity."), NULL }, &log_replication_commands, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8872c80cdf..d5faf89368 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -202,6 +202,8 @@ extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotAcquire(const char *name, bool nowait); extern void ReplicationSlotRelease(void); +extern void LogReplicationSlotAcquire(bool is_physical, const char *slotname); +extern void LogReplicationSlotRelease(bool is_physical, const char *slotname); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); -- 2.34.1