From 5c965e485f0abb3e7c55484b136a986597975ef6 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Tue, 6 Feb 2024 16:27:12 +0000 Subject: [PATCH v4 4/4] Add inactive_timeout based replication slot invalidation Currently postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set a timeout of say 1 or 2 or 3 days, after which the inactive slots get dropped. To achieve the above, postgres uses replication slot metric inactive_at (the time at which the slot became inactive), and a new GUC inactive_replication_slot_timeout. The checkpointer then looks at all replication slots invalidating the inactive slots based on the timeout set. --- doc/src/sgml/config.sgml | 18 +++++ src/backend/access/transam/xlog.c | 10 +++ src/backend/replication/slot.c | 19 ++++++ src/backend/replication/slotfuncs.c | 3 + src/backend/utils/misc/guc_tables.c | 12 ++++ src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/meson.build | 1 + src/test/recovery/t/050_invalidate_slots.pl | 68 +++++++++++++++++++ 9 files changed, 135 insertions(+) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index bc8c039b06..0ae3a15400 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4426,6 +4426,24 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + inactive_replication_slot_timeout (integer) + + inactive_replication_slot_timeout configuration parameter + + + + + Invalidate replication slots that are inactive for longer than this + amount of time at the next checkpoint. If this value is specified + without units, it is taken as seconds. A value of zero (which is + default) disables the timeout mechanism. This parameter can only be + set in the postgresql.conf file or on the server + command line. + + + + track_commit_timestamp (boolean) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index dbf2fa5911..4f5ee71638 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7056,6 +7056,11 @@ CreateCheckPoint(int flags) InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, InvalidOid, InvalidTransactionId); + /* Invalidate inactive replication slots based on timeout */ + if (inactive_replication_slot_timeout > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Delete old log files, those no longer needed for last checkpoint to * prevent the disk holding the xlog from growing full. @@ -7505,6 +7510,11 @@ CreateRestartPoint(int flags) InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, 0, InvalidOid, InvalidTransactionId); + /* Invalidate inactive replication slots based on timeout */ + if (inactive_replication_slot_timeout > 0) + InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, + InvalidOid, InvalidTransactionId); + /* * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 112b52a6dc..94b232189b 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -102,6 +102,7 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ int max_slot_xid_age = 0; +int inactive_replication_slot_timeout = 0; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropAcquired(void); @@ -1369,6 +1370,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_XID_AGE: appendStringInfoString(&err_detail, _("The replication slot's xmin or catalog_xmin reached the age specified by max_slot_xid_age.")); break; + case RS_INVAL_INACTIVE_TIMEOUT: + appendStringInfoString(&err_detail, _("The slot has been inactive for more than the time specified by inactive_replication_slot_timeout.")); + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1503,6 +1507,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, } } break; + case RS_INVAL_INACTIVE_TIMEOUT: + if (s->data.last_inactive_at > 0) + { + TimestampTz now; + + Assert(s->data.persistency == RS_PERSISTENT); + Assert(s->active_pid == 0); + + now = GetCurrentTimestamp(); + if (TimestampDifferenceExceeds(s->data.last_inactive_at, now, + inactive_replication_slot_timeout * 1000)) + conflict = cause; + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1649,6 +1667,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age + * - RS_INVAL_INACTIVE_TIMEOUT: inactive slot timeout occurs * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index c402fa4c82..5cc6752265 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -429,6 +429,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) case RS_INVAL_XID_AGE: values[i++] = CStringGetTextDatum("xid_aged"); break; + case RS_INVAL_INACTIVE_TIMEOUT: + values[i++] = CStringGetTextDatum("inactive_timeout"); + break; } values[i++] = BoolGetDatum(slot_contents.data.failover); diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 2a6ad9abbb..2232e62e4b 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2912,6 +2912,18 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"inactive_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the amount of time to wait before invalidating an " + "inactive replication slot."), + NULL, + GUC_UNIT_S + }, + &inactive_replication_slot_timeout, + 0, 0, INT_MAX, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6bd8959849..a0b4f309fc 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -252,6 +252,7 @@ #recovery_prefetch = try # prefetch pages referenced in the WAL? #wal_decode_buffer_size = 512kB # lookahead window used for prefetching # (change requires restart) +#inactive_replication_slot_timeout = 0 # in seconds; 0 disables # - Archiving - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 0de54c3d65..c14ae5f6c0 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -52,6 +52,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, /* slot's xmin or catalog_xmin has reached the age */ RS_INVAL_XID_AGE, + /* inactive slot timeout has occurred */ + RS_INVAL_INACTIVE_TIMEOUT, } ReplicationSlotInvalidationCause; /* @@ -225,6 +227,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; /* GUCs */ extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT int max_slot_xid_age; +extern PGDLLIMPORT int inactive_replication_slot_timeout; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/meson.build b/src/test/recovery/meson.build index bf087ac2a9..e07b941d73 100644 --- a/src/test/recovery/meson.build +++ b/src/test/recovery/meson.build @@ -46,6 +46,7 @@ tests += { 't/038_save_logical_slots_shutdown.pl', 't/039_end_of_wal.pl', 't/040_standby_failover_slots_sync.pl', + 't/050_invalidate_slots.pl', ], }, } diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl index e2d0cb5993..3c9b3e6a7e 100644 --- a/src/test/recovery/t/050_invalidate_slots.pl +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -98,4 +98,72 @@ $primary->poll_query_until('postgres', qq[ invalidation_reason = 'xid_aged'; ]) or die "Timed out while waiting for replication slot sb1_slot to be invalidated"; +$primary->safe_psql('postgres', qq[ + SELECT pg_create_physical_replication_slot('sb2_slot'); +]); + +$primary->safe_psql('postgres', qq[ + ALTER SYSTEM SET max_slot_xid_age = 0; +]); +$primary->reload; + +# Create a standby linking to the primary using the replication slot +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, $backup_name, + has_streaming => 1); +$standby2->append_conf('postgresql.conf', q{ +primary_slot_name = 'sb2_slot' +}); +$standby2->start; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby2); + +# The inactive replication slot info should be null when the slot is active +my $result = $primary->safe_psql('postgres', qq[ + SELECT last_inactive_at IS NULL, inactive_count = 0 AS OK + FROM pg_replication_slots WHERE slot_name = 'sb2_slot'; +]); +is($result, "t|t", 'check the inactive replication slot info for an active slot'); + +# Set timeout so that the next checkpoint will invalidate the inactive +# replication slot. +$primary->safe_psql('postgres', qq[ + ALTER SYSTEM SET inactive_replication_slot_timeout TO '1s'; +]); +$primary->reload; + +$logstart = -s $primary->logfile; + +# Stop standby to make the replication slot on primary inactive +$standby2->stop; + +# Wait for the inactive replication slot info to be updated +$primary->poll_query_until('postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE last_inactive_at IS NOT NULL AND + inactive_count = 1 AND slot_name = 'sb2_slot'; +]) or die "Timed out while waiting for inactive replication slot info to be updated"; + +$invalidated = 0; +for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++) +{ + $primary->safe_psql('postgres', "CHECKPOINT"); + if ($primary->log_contains( + 'invalidating obsolete replication slot "sb2_slot"', $logstart)) + { + $invalidated = 1; + last; + } + usleep(100_000); +} +ok($invalidated, 'check that slot sb2_slot invalidation has been logged'); + +# Wait for the inactive replication slots to be invalidated. +$primary->poll_query_until('postgres', qq[ + SELECT COUNT(slot_name) = 1 FROM pg_replication_slots + WHERE slot_name = 'sb2_slot' AND + invalidation_reason = 'inactive_timeout'; +]) or die "Timed out while waiting for inactive replication slot sb2_slot to be invalidated"; + done_testing(); -- 2.34.1