From 0a30fab6610a1197f58124d276369ac89f7cba99 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 22 Mar 2024 07:54:21 +0000 Subject: [PATCH v14 6/6] Add inactive_timeout based replication slot invalidation --- doc/src/sgml/func.sgml | 12 +- doc/src/sgml/system-views.sgml | 10 +- .../replication/logical/logicalfuncs.c | 4 +- src/backend/replication/logical/slotsync.c | 8 +- src/backend/replication/slot.c | 240 ++++++++++++++++-- src/backend/replication/slotfuncs.c | 27 +- src/backend/replication/walsender.c | 12 +- src/backend/tcop/postgres.c | 2 +- src/backend/utils/adt/pg_upgrade_support.c | 4 +- src/include/replication/slot.h | 11 +- src/test/recovery/meson.build | 1 + src/test/recovery/t/050_invalidate_slots.pl | 170 +++++++++++++ 12 files changed, 455 insertions(+), 46 deletions(-) create mode 100644 src/test/recovery/t/050_invalidate_slots.pl diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 22c8e0d39c..4826e45c7d 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28393,8 +28393,8 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, inactive_timeout, when set to a non-zero value, specifies the amount of time in seconds the slot is - allowed to be inactive. This function corresponds to the replication - protocol command + allowed to be inactive before getting invalidated. + This function corresponds to the replication protocol command CREATE_REPLICATION_SLOT ... PHYSICAL. @@ -28439,12 +28439,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset failover, when set to true, specifies that this slot is enabled to be synced to the standbys so that logical replication can be resumed after - failover. The optional sixth parameter, + failover. The optional sixth parameter, inactive_timeout, when set to a non-zero value, specifies the amount of time in seconds the slot is - allowed to be inactive. A call to this function has the same effect as - the replication protocol command - CREATE_REPLICATION_SLOT ... LOGICAL. + allowed to be inactive before getting invalidated. + A call to this function has the same effect as the replication protocol + command CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index f8838b1a23..8e7d9c9105 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2563,6 +2563,13 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx perform logical decoding. It is set only for logical slots. + + + inactive_timeout means that the slot has been + inactive for the duration specified by slot's + inactive_timeout parameter. + + @@ -2767,7 +2774,8 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx inactive_timeout integer - The amount of time in seconds the slot is allowed to be inactive. + The amount of time in seconds the slot is allowed to be inactive before + getting invalidated. diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index b4dd5cce75..53cf8bbd42 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin else end_of_wal = GetXLogReplayRecPtr(NULL); - ReplicationSlotAcquire(NameStr(*name), true); + ReplicationSlotAcquire(NameStr(*name), true, true); PG_TRY(); { @@ -309,7 +309,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin /* free context, call shutdown callback */ FreeDecodingContext(ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); InvalidateSystemCaches(); } PG_CATCH(); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index c01876ceeb..5aba117e2b 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -319,7 +319,7 @@ drop_local_obsolete_slots(List *remote_slot_list) if (synced_slot) { - ReplicationSlotAcquire(NameStr(local_slot->data.name), true); + ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false); ReplicationSlotDropAcquired(); } @@ -529,7 +529,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) * InvalidatePossiblyObsoleteSlot() where it invalidates slot directly * if the slot is not acquired by other processes. */ - ReplicationSlotAcquire(remote_slot->name, true); + ReplicationSlotAcquire(remote_slot->name, true, false); Assert(slot == MyReplicationSlot); @@ -554,7 +554,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) /* Skip the sync of an invalidated slot */ if (slot->data.invalidated != RS_INVAL_NONE) { - ReplicationSlotRelease(); + ReplicationSlotRelease(false); return slot_updated; } @@ -640,7 +640,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) slot_updated = true; } - ReplicationSlotRelease(); + ReplicationSlotRelease(false); return slot_updated; } diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 3680a608c3..0acf1d1960 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -107,10 +107,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_WAL_REMOVED] = "wal_removed", [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", + [RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL +#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -158,6 +159,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static bool InvalidateSlotForInactiveTimeout(ReplicationSlot *slot, + bool need_control_lock, + bool need_mutex); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -233,7 +237,7 @@ ReplicationSlotShmemExit(int code, Datum arg) { /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Also cleanup all the temporary slots. */ ReplicationSlotCleanup(); @@ -424,7 +428,19 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->candidate_restart_valid = InvalidXLogRecPtr; slot->candidate_restart_lsn = InvalidXLogRecPtr; slot->last_saved_confirmed_flush = InvalidXLogRecPtr; - slot->last_inactive_at = 0; + + /* + * We set last_inactive_at after creation of the slot so that the + * inactive_timeout if set is honored. + * + * There's no point in allowing failover slots to get invalidated based on + * slot's inactive_timeout parameter on standby. The failover slots simply + * get synced from the primary on the standby. + */ + if (!(RecoveryInProgress() && slot->data.failover)) + slot->last_inactive_at = GetCurrentTimestamp(); + else + slot->last_inactive_at = 0; /* * Create the slot on disk. We haven't actually marked the slot allocated @@ -550,9 +566,14 @@ ReplicationSlotName(int index, Name name) * * An error is raised if nowait is true and the slot is currently in use. If * nowait is false, we sleep until the slot is released by the owning process. + * + * If check_for_invalidation is true, the slot is checked for invalidation + * based on its inactive_timeout parameter and an error is raised after making + * the slot ours. */ void -ReplicationSlotAcquire(const char *name, bool nowait) +ReplicationSlotAcquire(const char *name, bool nowait, + bool check_for_invalidation) { ReplicationSlot *s; int active_pid; @@ -630,6 +651,42 @@ retry: /* We made this slot active, so it's ours now. */ MyReplicationSlot = s; + /* + * Check if the given slot can be invalidated based on its + * inactive_timeout parameter. If yes, persist the invalidated state to + * disk and then error out. We do this only after making the slot ours to + * avoid anyone else acquiring it while we check for its invalidation. + */ + if (check_for_invalidation) + { + /* The slot is ours by now */ + Assert(s->active_pid == MyProcPid); + + /* + * Well, the slot is not yet ours really unless we check for the + * invalidation below. + */ + s->active_pid = 0; + if (InvalidateReplicationSlotForInactiveTimeout(s, true, true, true)) + { + /* + * If the slot has been invalidated, recalculate the resource + * limits. + */ + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + + /* Might need it for slot clean up on error, so restore it */ + s->active_pid = MyProcPid; + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot acquire invalidated replication slot \"%s\"", + NameStr(MyReplicationSlot->data.name)), + errdetail("This slot has been invalidated because of its inactive_timeout parameter."))); + } + s->active_pid = MyProcPid; + } + /* * The call to pgstat_acquire_replslot() protects against stats for a * different slot, from before a restart or such, being present during @@ -663,7 +720,7 @@ retry: * Resources this slot requires will be preserved. */ void -ReplicationSlotRelease(void) +ReplicationSlotRelease(bool set_last_inactive_at) { ReplicationSlot *slot = MyReplicationSlot; char *slotname = NULL; /* keep compiler quiet */ @@ -714,11 +771,20 @@ ReplicationSlotRelease(void) ConditionVariableBroadcast(&slot->active_cv); } - if (slot->data.persistency == RS_PERSISTENT) + if (set_last_inactive_at && + slot->data.persistency == RS_PERSISTENT) { - SpinLockAcquire(&slot->mutex); - slot->last_inactive_at = GetCurrentTimestamp(); - SpinLockRelease(&slot->mutex); + /* + * There's no point in allowing failover slots to get invalidated + * based on slot's inactive_timeout parameter on standby. The failover + * slots simply get synced from the primary on the standby. + */ + if (!(RecoveryInProgress() && slot->data.failover)) + { + SpinLockAcquire(&slot->mutex); + slot->last_inactive_at = GetCurrentTimestamp(); + SpinLockRelease(&slot->mutex); + } } MyReplicationSlot = NULL; @@ -788,7 +854,7 @@ ReplicationSlotDrop(const char *name, bool nowait) { Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, nowait); + ReplicationSlotAcquire(name, nowait, false); /* * Do not allow users to drop the slots which are currently being synced @@ -813,7 +879,7 @@ ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout) Assert(MyReplicationSlot == NULL); - ReplicationSlotAcquire(name, false); + ReplicationSlotAcquire(name, false, true); if (SlotIsPhysical(MyReplicationSlot)) ereport(ERROR, @@ -889,7 +955,7 @@ ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout) ReplicationSlotSave(); } - ReplicationSlotRelease(); + ReplicationSlotRelease(true); } @@ -1542,6 +1608,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires wal_level >= logical on the primary server.")); break; + case RS_INVAL_INACTIVE_TIMEOUT: + appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by slot's inactive_timeout parameter.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1655,6 +1724,10 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, if (SlotIsLogical(s)) invalidation_cause = cause; break; + case RS_INVAL_INACTIVE_TIMEOUT: + if (InvalidateReplicationSlotForInactiveTimeout(s, false, false, false)) + invalidation_cause = cause; + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1781,7 +1854,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* Make sure the invalidated state persists across server restart */ ReplicationSlotMarkDirty(); ReplicationSlotSave(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, @@ -1808,6 +1881,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ @@ -1859,6 +1933,110 @@ restart: return invalidated; } +/* + * Invalidate given slot based on its inactive_timeout parameter. + * + * Returns true if the slot has got invalidated. + * + * NB - this function also runs as part of checkpoint, so avoid raising errors + * if possible. + */ +bool +InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot, + bool need_control_lock, + bool need_mutex, + bool persist_state) +{ + if (!InvalidateSlotForInactiveTimeout(slot, need_control_lock, need_mutex)) + return false; + + Assert(slot->active_pid == 0); + + SpinLockAcquire(&slot->mutex); + slot->data.invalidated = RS_INVAL_INACTIVE_TIMEOUT; + + /* Make sure the invalidated state persists across server restart */ + slot->just_dirtied = true; + slot->dirty = true; + SpinLockRelease(&slot->mutex); + + if (persist_state) + { + char path[MAXPGPATH]; + + sprintf(path, "pg_replslot/%s", NameStr(slot->data.name)); + ReplicationSlotSaveToPath(slot, path, ERROR); + } + + ReportSlotInvalidation(RS_INVAL_INACTIVE_TIMEOUT, false, 0, + slot->data.name, InvalidXLogRecPtr, + InvalidXLogRecPtr, InvalidTransactionId); + + return true; +} + +/* + * Helper for InvalidateReplicationSlotForInactiveTimeout + */ +static bool +InvalidateSlotForInactiveTimeout(ReplicationSlot *slot, + bool need_control_lock, + bool need_mutex) +{ + ReplicationSlotInvalidationCause inavidation_cause = RS_INVAL_NONE; + + if (slot->last_inactive_at == 0 || + slot->data.inactive_timeout == 0) + return false; + + /* inactive_timeout is only tracked for permanent slots */ + if (slot->data.persistency != RS_PERSISTENT) + return false; + + /* + * There's no point in allowing failover slots to get invalidated based on + * slot's inactive_timeout parameter on standby. The failover slots simply + * get synced from the primary on the standby. + */ + if (RecoveryInProgress() && slot->data.failover) + return false; + + if (need_control_lock) + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); + + /* + * Check if the slot needs to be invalidated due to inactive_timeout. We + * do this with the spinlock held to avoid race conditions -- for example + * the restart_lsn could move forward, or the slot could be dropped. + */ + if (need_mutex) + SpinLockAcquire(&slot->mutex); + + if (slot->last_inactive_at > 0 && + slot->data.inactive_timeout > 0) + { + TimestampTz now; + + /* last_inactive_at is only tracked for inactive slots */ + Assert(slot->active_pid == 0); + + now = GetCurrentTimestamp(); + if (TimestampDifferenceExceeds(slot->last_inactive_at, now, + slot->data.inactive_timeout * 1000)) + inavidation_cause = RS_INVAL_INACTIVE_TIMEOUT; + } + + if (need_mutex) + SpinLockRelease(&slot->mutex); + + if (need_control_lock) + LWLockRelease(ReplicationSlotControlLock); + + return (inavidation_cause == RS_INVAL_INACTIVE_TIMEOUT); +} + /* * Flush all replication slots to disk. * @@ -1871,6 +2049,7 @@ void CheckPointReplicationSlots(bool is_shutdown) { int i; + bool invalidated = false; elog(DEBUG1, "performing replication slot checkpoint"); @@ -1892,10 +2071,11 @@ CheckPointReplicationSlots(bool is_shutdown) continue; /* - * Save the slot to disk, locking is handled in - * ReplicationSlotSaveToPath. + * Here's an opportunity to invalidate inactive replication slots + * based on timeout, so let's do it. */ - sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); + if (InvalidateReplicationSlotForInactiveTimeout(s, true, true, false)) + invalidated = true; /* * Slot's data is not flushed each time the confirmed_flush LSN is @@ -1920,9 +2100,21 @@ CheckPointReplicationSlots(bool is_shutdown) SpinLockRelease(&s->mutex); } + /* + * Save the slot to disk, locking is handled in + * ReplicationSlotSaveToPath. + */ + sprintf(path, "pg_replslot/%s", NameStr(s->data.name)); ReplicationSlotSaveToPath(s, path, LOG); } LWLockRelease(ReplicationSlotAllocationLock); + + /* If the slot has been invalidated, recalculate the resource limits */ + if (invalidated) + { + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } } /* @@ -2404,7 +2596,21 @@ RestoreSlotFromDisk(const char *name) slot->in_use = true; slot->active_pid = 0; - slot->last_inactive_at = 0; + + /* + * We set last_inactive_at only if inactive_timeout of the slot is + * specified so that the timeout is honored after the slot is restored + * from the disk. + * + * There's no point in allowing failover slots to get invalidated + * based on slot's inactive_timeout parameter on standby. The failover + * slots simply get synced from the primary on the standby. + */ + if (slot->data.inactive_timeout > 0 && + !(RecoveryInProgress() && slot->data.failover)) + slot->last_inactive_at = GetCurrentTimestamp(); + else + slot->last_inactive_at = 0; restored = true; break; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index d6ef14fba6..7cc5c8bdf6 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -111,7 +111,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -224,7 +224,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* ok, slot is now fully created, mark it as persistent if needed */ if (!temporary) ReplicationSlotPersist(); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } @@ -257,6 +257,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; + bool invalidated = false; /* * We don't require any special permission to see this function's data @@ -287,6 +288,13 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) slot_contents = *slot; SpinLockRelease(&slot->mutex); + /* + * Here's an opportunity to invalidate inactive replication slots + * based on timeout, so let's do it. + */ + if (InvalidateReplicationSlotForInactiveTimeout(slot, false, true, true)) + invalidated = true; + memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); @@ -465,6 +473,15 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) LWLockRelease(ReplicationSlotControlLock); + /* + * If the slot has been invalidated, recalculate the resource limits + */ + if (invalidated) + { + ReplicationSlotsComputeRequiredXmin(false); + ReplicationSlotsComputeRequiredLSN(); + } + return (Datum) 0; } @@ -667,7 +684,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) moveto = Min(moveto, GetXLogReplayRecPtr(NULL)); /* Acquire the slot so we "own" it */ - ReplicationSlotAcquire(NameStr(*slotname), true); + ReplicationSlotAcquire(NameStr(*slotname), true, true); /* A slot whose restart_lsn has never been reserved cannot be advanced */ if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) @@ -710,7 +727,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS) ReplicationSlotsComputeRequiredXmin(false); ReplicationSlotsComputeRequiredLSN(); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* Return the reached position. */ values[1] = LSNGetDatum(endlsn); @@ -954,7 +971,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) tuple = heap_form_tuple(tupdesc, values, nulls); result = HeapTupleGetDatum(tuple); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_DATUM(result); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 0420274247..b6795048cc 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -334,7 +334,7 @@ WalSndErrorCleanup(void) wal_segment_close(xlogreader); if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); ReplicationSlotCleanup(); @@ -846,7 +846,7 @@ StartReplication(StartReplicationCmd *cmd) if (cmd->slotname) { - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); if (SlotIsLogical(MyReplicationSlot)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -992,7 +992,7 @@ StartReplication(StartReplicationCmd *cmd) } if (cmd->slotname) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* * Copy is finished now. Send a single-row result set indicating the next @@ -1407,7 +1407,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) do_tup_output(tstate, values, nulls); end_tup_output(tstate); - ReplicationSlotRelease(); + ReplicationSlotRelease(false); } /* @@ -1483,7 +1483,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) Assert(!MyReplicationSlot); - ReplicationSlotAcquire(cmd->slotname, true); + ReplicationSlotAcquire(cmd->slotname, true, true); /* * Force a disconnect, so that the decoding code doesn't need to care @@ -1545,7 +1545,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) WalSndLoop(XLogSendLogical); FreeDecodingContext(logical_decoding_ctx); - ReplicationSlotRelease(); + ReplicationSlotRelease(true); replication_active = false; if (got_STOPPING) diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index fd4199a098..749de2741e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -4407,7 +4407,7 @@ PostgresMain(const char *dbname, const char *username) * callback ensuring correct cleanup on FATAL errors. */ if (MyReplicationSlot != NULL) - ReplicationSlotRelease(); + ReplicationSlotRelease(true); /* We also want to cleanup temporary slots on error. */ ReplicationSlotCleanup(); diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index c54b08fe18..d56ecf4137 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -299,7 +299,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) slot_name = PG_GETARG_NAME(0); /* Acquire the given slot */ - ReplicationSlotAcquire(NameStr(*slot_name), true); + ReplicationSlotAcquire(NameStr(*slot_name), true, false); Assert(SlotIsLogical(MyReplicationSlot)); @@ -310,7 +310,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal); /* Clean up */ - ReplicationSlotRelease(); + ReplicationSlotRelease(false); PG_RETURN_BOOL(!found_pending_wal); } diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 9cd4bf98e5..bd4ad48ce8 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -53,6 +53,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_HORIZON, /* wal_level insufficient for slot */ RS_INVAL_WAL_LEVEL, + /* inactive slot timeout has occurred */ + RS_INVAL_INACTIVE_TIMEOUT, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -249,8 +251,9 @@ extern void ReplicationSlotDropAcquired(void); extern void ReplicationSlotAlter(const char *name, bool failover, int inactive_timeout); -extern void ReplicationSlotAcquire(const char *name, bool nowait); -extern void ReplicationSlotRelease(void); +extern void ReplicationSlotAcquire(const char *name, bool nowait, + bool check_for_invalidation); +extern void ReplicationSlotRelease(bool set_last_inactive_at); extern void ReplicationSlotCleanup(void); extern void ReplicationSlotSave(void); extern void ReplicationSlotSaveToPath(ReplicationSlot *slot, const char *dir, @@ -270,6 +273,10 @@ extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon); +extern bool InvalidateReplicationSlotForInactiveTimeout(ReplicationSlot *slot, + bool need_control_lock, + bool need_mutex, + bool persist_state); extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock); extern int ReplicationSlotIndex(ReplicationSlot *slot); extern bool ReplicationSlotName(int index, Name name); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index b1eb77b1ec..708a2a3798 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -51,6 +51,7 @@ tests += { 't/040_standby_failover_slots_sync.pl', 't/041_checkpoint_at_promote.pl', 't/042_low_level_backup.pl', + 't/050_invalidate_slots.pl', ], }, } diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl new file mode 100644 index 0000000000..6adaa1d648 --- /dev/null +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -0,0 +1,170 @@ +# Copyright (c) 2024, PostgreSQL Global Development Group + +# Test for replication slots invalidation +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use PostgreSQL::Test::Cluster; +use Test::More; +use Time::HiRes qw(usleep); + +# Check for invalidation of slot in server log. +sub check_slots_invalidation_in_server_log +{ + my ($node, $slot_name, $offset) = @_; + my $invalidated = 0; + + for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) + { + $node->safe_psql('postgres', "CHECKPOINT"); + if ($node->log_contains( + "invalidating obsolete replication slot \"$slot_name\"", $offset)) + { + $invalidated = 1; + last; + } + usleep(100_000); + } + ok($invalidated, "check that slot $slot_name invalidation has been logged"); +} + +# ============================================================================= +# Testcase start: Invalidate streaming standby's slot due to inactive_timeout +# + +# Initialize primary node +my $primary = PostgreSQL::Test::Cluster->new('primary'); +$primary->init(allows_streaming => 'logical'); + +# Avoid checkpoint during the test, otherwise, the test can get unpredictable +$primary->append_conf( + 'postgresql.conf', q{ +checkpoint_timeout = 1h +autovacuum = off +}); +$primary->start; + +# Take backup +my $backup_name = 'my_backup'; +$primary->backup($backup_name); + +# Create a standby linking to the primary using the replication slot +my $standby1 = PostgreSQL::Test::Cluster->new('standby1'); +$standby1->init_from_backup($primary, $backup_name, has_streaming => 1); +$standby1->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb1_slot' +}); + +# Set timeout so that the slot when inactive will get invalidated after the +# timeout. +my $inactive_timeout = 1; +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb1_slot', inactive_timeout := $inactive_timeout); +]); + +$standby1->start; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby1); + +# The inactive replication slot info should be null when the slot is active +my $result = $primary->safe_psql( + 'postgres', qq[ + SELECT last_inactive_at IS NULL, inactive_timeout = $inactive_timeout + FROM pg_replication_slots WHERE slot_name = 'sb1_slot'; +]); +is($result, "t|t", + 'check the inactive replication slot info for an active slot'); + +my $logstart = -s $primary->logfile; + +# Stop standby to make the replication slot on primary inactive +$standby1->stop; + +# Wait for the inactive replication slot info to be updated +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL + AND slot_name = 'sb1_slot' + AND inactive_timeout = $inactive_timeout; +]) + or die + "Timed out while waiting for inactive replication slot info to be updated"; + +check_slots_invalidation_in_server_log($primary, 'sb1_slot', $logstart); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb1_slot' AND + invalidation_reason = 'inactive_timeout'; +]) + or die + "Timed out while waiting for inactive replication slot sb1_slot to be invalidated"; + +# Testcase end: Invalidate streaming standby's slot due to inactive_timeout +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical subscriber's slot due to inactive_timeout +my $publisher = $primary; + +# Create subscriber node +my $subscriber = PostgreSQL::Test::Cluster->new('sub'); +$subscriber->init; +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl (id int)"); + +# Insert some data +$subscriber->safe_psql('postgres', + "INSERT INTO test_tbl VALUES (generate_series(1, 5));"); + +# Setup logical replication +my $publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub WITH (slot_name = 'lsub1_slot')" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub'); + +$result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl"); + +is($result, qq(5), "check initial copy was done"); + +# Alter slot to set inactive_timeout +$publisher->safe_psql( + 'postgres', qq[ + SELECT pg_alter_replication_slot(slot_name := 'lsub1_slot', inactive_timeout := $inactive_timeout); +]); + +$logstart = -s $publisher->logfile; + +# Stop subscriber to make the replication slot on publisher inactive +$subscriber->stop; + +# Wait for the inactive replication slot info to be updated +$publisher->poll_query_until( + 'postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL + AND slot_name = 'lsub1_slot' + AND inactive_timeout = $inactive_timeout; +]) + or die + "Timed out while waiting for inactive replication slot info to be updated"; + +check_slots_invalidation_in_server_log($publisher, 'lsub1_slot', $logstart); + +# Testcase end: Invalidate logical subscriber's slot due to inactive_timeout +# ============================================================================= + +done_testing(); -- 2.34.1