From d6ee893d72931392d7fe01f2f2ce4b1340451f5d Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Sun, 27 Feb 2022 09:44:03 +0000 Subject: [PATCH v4] Add LOG messages when replication slots become active and inactive These logs will be extremely useful on production servers to debug and analyze inactive replication slot issues. --- doc/src/sgml/config.sgml | 6 +-- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/slot.c | 38 ++++++++++++++----- src/backend/replication/slotfuncs.c | 10 ++--- src/backend/replication/walsender.c | 12 +++--- src/backend/tcop/postgres.c | 2 +- src/backend/utils/misc/guc.c | 2 +- src/include/replication/slot.h | 4 +- 8 files changed, 48 insertions(+), 30 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 7ed8c82a9d..b4e616bf52 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -7322,9 +7322,9 @@ log_line_prefix = '%m [%p] %q%u@%d/%a ' - Causes each replication command to be logged in the server log. - See for more information about - replication command. The default value is off. + Causes each replication command and related activity to be logged in + the server log. See for more + information about replication command. The default value is off. Only superusers can change this setting. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 3bd770a3ba..87e73b37cc 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -216,7 +216,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name), true); + ReplicationSlotAcquire(NameStr(*name), true, true); PG_TRY(); { @@ -330,7 +330,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* free context, call shutdown callback */ FreeDecodingContext(ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); InvalidateSystemCaches(); } PG_CATCH(); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index caa6b29756..838e4019c1 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -183,7 +183,7 @@ ReplicationSlotShmemExit(int code, Datum arg) /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Also cleanup all the temporary slots. */ ReplicationSlotCleanup(); @@ -366,6 +366,10 @@ ReplicationSlotCreate(const char *name, bool db_specific, /* Let everybody know we've modified this slot */ ConditionVariableBroadcast(&slot->active_cv); + + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("created replication slot \"%s\"", + NameStr(slot->data.name)))); } /* @@ -406,7 +410,7 @@ SearchNamedReplicationSlot(const char *name, bool need_lock) * nowait is false, we sleep until the slot is released by the owning process. */ void -ReplicationSlotAcquire(const char *name, bool nowait) +ReplicationSlotAcquire(const char *name, bool nowait, bool msg_ok) { ReplicationSlot *s; int active_pid; @@ -486,6 +490,11 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; + + if (msg_ok) + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("acquired replication slot \"%s\"", + NameStr(s->data.name)))); } /* @@ -495,7 +504,7 @@ retry: * Resources this slot requires will be preserved. */ void -ReplicationSlotRelease(void) +ReplicationSlotRelease(bool msg_ok) { ReplicationSlot *slot = MyReplicationSlot; @@ -545,6 +554,11 @@ ReplicationSlotRelease(void) MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING; ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; LWLockRelease(ProcArrayLock); + + if (msg_ok) + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("released replication slot \"%s\"", + NameStr(slot->data.name)))); } /* @@ -602,7 +616,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + ReplicationSlotAcquire(name, nowait, false); ReplicationSlotDropAcquired(); } @@ -634,7 +648,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) char tmppath[MAXPGPATH]; /* temp debugging aid to analyze 019_replslot_limit failures */ - elog(DEBUG3, "replication slot drop: %s: begin", NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: begin", + NameStr(slot->data.name)))); /* * If some other backend ran this code concurrently with us, we might try @@ -686,8 +702,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) path, tmppath))); } - elog(DEBUG3, "replication slot drop: %s: removed on-disk", - NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: removed on-disk", + NameStr(slot->data.name)))); /* * The slot is definitely gone. Lock out concurrent scans of the array @@ -745,8 +762,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot) */ LWLockRelease(ReplicationSlotAllocationLock); - elog(DEBUG3, "replication slot drop: %s: done", - NameStr(slot->data.name)); + ereport(log_replication_commands ? LOG : DEBUG3, + (errmsg("replication slot drop: %s: done", + NameStr(slot->data.name)))); } /* @@ -1341,7 +1359,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN, /* Make sure the invalidated state persists across server restart */ ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 886899afd2..86572f9be9 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -100,7 +100,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -202,7 +202,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -630,7 +630,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + ReplicationSlotAcquire(NameStr(*slotname), true, true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) @@ -673,7 +673,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Return the reached position. */ values[1] = LSNGetDatum(endlsn); @@ -904,7 +904,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 5a718b1fe9..ebd036a8ab 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -305,7 +305,7 @@ WalSndErrorCleanup(void) wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ReplicationSlotCleanup(); @@ -696,7 +696,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -842,7 +842,7 @@ StartReplication(StartReplicationCmd *cmd) } if (cmd->slotname) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* * Copy is finished now. Send a single-row result set indicating the next @@ -1221,7 +1221,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) do_tup_output(tstate, values, nulls); end_tup_output(tstate); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); } /* @@ -1248,7 +1248,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) ereport(ERROR, @@ -1317,7 +1317,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndLoop(XLogSendLogical); FreeDecodingContext(logical_decoding_ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); replication_active = false; if (got_STOPPING) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 3c7d08209f..d159982df7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4273,7 +4273,7 @@ PostgresMain(const char *dbname, const char *username) * callback ensuring correct cleanup on FATAL errors. */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* We also want to cleanup temporary slots on error. */ ReplicationSlotCleanup(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 1e3650184b..3480890f0e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1378,7 +1378,7 @@ static struct config_bool ConfigureNamesBool[] = }, { {"log_replication_commands", PGC_SUSET, LOGGING_WHAT, - gettext_noop("Logs each replication command."), + gettext_noop("Logs each replication command and related activity."), NULL }, &log_replication_commands, diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 24b30210c3..2ec6032cbf 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -199,8 +199,8 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific, extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); -extern void ReplicationSlotAcquire(const char *name, bool nowait); -extern void ReplicationSlotRelease(void); +extern void ReplicationSlotAcquire(const char *name, bool nowait, bool msg_ok); +extern void ReplicationSlotRelease(bool msg_ok); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotMarkDirty(void); -- 2.25.1