From 635bdfb7ac1d0fe57a8a0949290d34e5d2a35427 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 29 Apr 2022 09:58:33 +0000 Subject: [PATCH v5] Add LOG messages when replication slots become active and inactive These logs will be extremely useful on production servers to debug and analyze inactive replication slot issues. --- doc/src/sgml/config.sgml | 7 ++-- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/slot.c | 38 ++++++++++++++----- src/backend/replication/slotfuncs.c | 10 ++--- src/backend/replication/walsender.c | 12 +++--- src/backend/tcop/postgres.c | 2 +- src/backend/utils/misc/guc.c | 2 +- src/include/replication/slot.h | 4 +- 8 files changed, 49 insertions(+), 30 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 03986946a8..b6b7b4d838 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -7461,9 +7461,10 @@ log_line_prefix = '%m [%p] %q%u@%d/%a ' - Causes each replication command to be logged in the server log. - See for more information about - replication command. The default value is off. + Causes each replication command and related activity to be logged in + the server log. See for more + information about replication command. The default value is + off. Only superusers and users with the appropriate SET privilege can change this setting. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 6058d36e0d..11e66a5821 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -201,7 +201,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name), true); + ReplicationSlotAcquire(NameStr(*name), true, true); PG_TRY(); { @@ -315,7 +315,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* free context, call shutdown callback */ FreeDecodingContext(ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); InvalidateSystemCaches(); } PG_CATCH(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 5c778f5333..d9246e2ebc 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -183,7 +183,7 @@ ReplicationSlotShmemExit(int code, Datum arg) /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Also cleanup all the temporary slots. */ ReplicationSlotCleanup(); @@ -366,6 +366,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* Let everybody know we've modified this slot */ ConditionVariableBroadcast(&slot->active_cv); + + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("created replication slot \"%s\"", + NameStr(slot->data.name)))); } /* @@ -422,7 +426,7 @@ ReplicationSlotIndex(ReplicationSlot *slot) * nowait is false, we sleep until the slot is released by the owning process. */ void -ReplicationSlotAcquire(const char *name, bool nowait) +ReplicationSlotAcquire(const char *name, bool nowait, bool msg_ok) { ReplicationSlot *s; int active_pid; @@ -503,6 +507,11 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; + if (msg_ok) + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("acquired replication slot \"%s\"", + NameStr(s->data.name)))); + /* * The call to pgstat_acquire_replslot() protects against stats for * a different slot, from before a restart or such, being present during @@ -519,7 +528,7 @@ retry: * Resources this slot requires will be preserved. */ void -ReplicationSlotRelease(void) +ReplicationSlotRelease(bool msg_ok) { ReplicationSlot *slot = MyReplicationSlot; @@ -569,6 +578,11 @@ ReplicationSlotRelease(void) MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; LWLockRelease(ProcArrayLock); + + if (msg_ok) + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("released replication slot \"%s\"", + NameStr(slot->data.name)))); } /* @@ -626,7 +640,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + ReplicationSlotAcquire(name, nowait, false); ReplicationSlotDropAcquired(); } @@ -658,7 +672,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) char tmppath[MAXPGPATH]; /* temp debugging aid to analyze 019_replslot_limit failures */ - elog(DEBUG3, "replication slot drop: %s: begin", NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: begin", + NameStr(slot->data.name)))); /* * If some other backend ran this code concurrently with us, we might try @@ -710,8 +726,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) path, tmppath))); } - elog(DEBUG3, "replication slot drop: %s: removed on-disk", - NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: removed on-disk", + NameStr(slot->data.name)))); /* * The slot is definitely gone. Lock out concurrent scans of the array @@ -768,8 +785,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) */ LWLockRelease(ReplicationSlotAllocationLock); - elog(DEBUG3, "replication slot drop: %s: done", - NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: done", + NameStr(slot->data.name)))); } /* @@ -1364,7 +1382,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, /* Make sure the invalidated state persists across server restart */ ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index ca945994ef..368ab444b4 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -100,7 +100,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -202,7 +202,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -605,7 +605,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + ReplicationSlotAcquire(NameStr(*slotname), true, true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) @@ -648,7 +648,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Return the reached position. */ values[1] = LSNGetDatum(endlsn); @@ -879,7 +879,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 63a818140b..e4879a43f5 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -322,7 +322,7 @@ WalSndErrorCleanup(void) wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ReplicationSlotCleanup(); @@ -713,7 +713,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -859,7 +859,7 @@ StartReplication(StartReplicationCmd *cmd) } if (cmd->slotname) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* * Copy is finished now. Send a single-row result set indicating the next @@ -1237,7 +1237,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) do_tup_output(tstate, values, nulls); end_tup_output(tstate); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); } /* @@ -1264,7 +1264,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, @@ -1333,7 +1333,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndLoop(XLogSendLogical); FreeDecodingContext(logical_decoding_ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); replication_active = false; if (got_STOPPING) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 304cce135a..144d081715 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4295,7 +4295,7 @@ PostgresMain(const char *dbname, const char *username) * callback ensuring correct cleanup on FATAL errors. */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* We also want to cleanup temporary slots on error. */ ReplicationSlotCleanup(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 8e9b71375c..50dfd5cd3c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1424,7 +1424,7 @@ static struct config_bool ConfigureNamesBool[] = }, { {"log_replication_commands", PGC_SUSET, LOGGING_WHAT, - gettext_noop("Logs each replication command."), + gettext_noop("Logs each replication command and related activity."), NULL }, &log_replication_commands, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 1ee63c4cf4..70871c6785 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -199,8 +199,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name, bool nowait); -extern void ReplicationSlotRelease(void); +extern void ReplicationSlotAcquire(const char *name, bool nowait, bool msg_ok); +extern void ReplicationSlotRelease(bool msg_ok); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); -- 2.25.1