From c6cee7b246583c05e55b1ed5b14d4d786c2d8ddd Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Fri, 12 Apr 2024 15:00:05 +0000 Subject: [PATCH v39 2/2] Add XID age based replication slot invalidation. Till now, postgres has the ability to invalidate inactive replication slots based on the amount of WAL (set via max_slot_wal_keep_size GUC) that will be needed for the slots in case they become active. However, choosing a default value for max_slot_wal_keep_size is tricky. Because the amount of WAL a customer generates, and their allocated storage will vary greatly in production, making it difficult to pin down a one-size-fits-all value. It is often easy for developers to set an XID age (age of slot's xmin or catalog_xmin) of say 1 or 1.5 billion, after which the slots get invalidated. To achieve the above, postgres introduces a GUC allowing users set slot XID age. The replication slots whose xmin or catalog_xmin has reached the age specified by this setting get invalidated. The invalidation check happens at various locations to help being as latest as possible, these locations include the following: - Whenever the slot is acquired and the slot acquisition errors out if invalidated. - During checkpoint Author: Bharath Rupireddy Reviewed-by: Bertrand Drouvot, Amit Kapila, Shveta Malik Discussion: https://www.postgresql.org/message-id/CALj2ACW4aUe-_uFQOjdWCEN-xXoLGhmvRFnL8SNw_TZ5nJe+aw@mail.gmail.com Discussion: https://www.postgresql.org/message-id/20240327150557.GA3994937%40nathanxps13 Discussion: https://www.postgresql.org/message-id/CA%2BTgmoaRECcnyqxAxUhP5dk2S4HX%3DpGh-p-PkA3uc%2BjG_9hiMw%40mail.gmail.com --- doc/src/sgml/config.sgml | 26 ++ doc/src/sgml/system-views.sgml | 8 + src/backend/replication/slot.c | 160 +++++++++- src/backend/utils/misc/guc_tables.c | 10 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/replication/slot.h | 3 + src/test/recovery/t/050_invalidate_slots.pl | 296 +++++++++++++++++- 7 files changed, 490 insertions(+), 14 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a73677b98b..f7aee4663f 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4580,6 +4580,32 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"' # Windows + + replication_slot_xid_age (integer) + + replication_slot_xid_age configuration parameter + + + + + Invalidate replication slots whose xmin (the oldest + transaction that this slot needs the database to retain) or + catalog_xmin (the oldest transaction affecting the + system catalogs that this slot needs the database to retain) has reached + the age specified by this setting. A value of zero (which is default) + disables this feature. Users can set this value anywhere from zero to + two billion. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + This invalidation check happens either when the slot is acquired + for use or during a checkpoint. + + + + track_commit_timestamp (boolean) diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 063638beda..05a11a0fe3 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2587,6 +2587,14 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx parameter. + + + xid_aged means that the slot's + xmin or catalog_xmin + has reached the age specified by + parameter. + + diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 7cfbc2dfff..2029efe5a6 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -108,10 +108,11 @@ const char *const SlotInvalidationCauses[] = { [RS_INVAL_HORIZON] = "rows_removed", [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", [RS_INVAL_INACTIVE_TIMEOUT] = "inactive_timeout", + [RS_INVAL_XID_AGE] = "xid_aged", }; /* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_INACTIVE_TIMEOUT +#define RS_INVAL_MAX_CAUSES RS_INVAL_XID_AGE StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -142,6 +143,7 @@ ReplicationSlot *MyReplicationSlot = NULL; int max_replication_slots = 10; /* the maximum number of replication * slots */ int replication_slot_inactive_timeout = 0; +int replication_slot_xid_age = 0; /* * This GUC lists streaming replication standby server slot names that @@ -160,6 +162,9 @@ static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr; static void ReplicationSlotShmemExit(int code, Datum arg); static void ReplicationSlotDropPtr(ReplicationSlot *slot); +static bool ReplicationSlotIsXIDAged(ReplicationSlot *slot, + TransactionId *xmin, + TransactionId *catalog_xmin); static bool InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReplicationSlot *s, @@ -636,8 +641,8 @@ retry: * it gets invalidated now or has been invalidated previously, because * there's no use in acquiring the invalidated slot. * - * XXX: Currently we check for inactive_timeout invalidation here. We - * might need to check for other invalidations too. + * XXX: Currently we check for inactive_timeout and xid_aged invalidations + * here. We might need to check for other invalidations too. */ if (check_for_invalidation) { @@ -648,6 +653,22 @@ retry: InvalidTransactionId, &invalidated); + if (!invalidated && released_lock) + { + /* The slot is still ours */ + Assert(s->active_pid == MyProcPid); + + /* Reacquire the ControlLock */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + released_lock = false; + } + + if (!invalidated) + released_lock = InvalidatePossiblyObsoleteSlot(RS_INVAL_XID_AGE, + s, 0, InvalidOid, + InvalidTransactionId, + &invalidated); + /* * If the slot has been invalidated, recalculate the resource limits. */ @@ -657,7 +678,8 @@ retry: ReplicationSlotsComputeRequiredLSN(); } - if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT) + if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT || + s->data.invalidated == RS_INVAL_XID_AGE) { /* * Release the lock if it's not yet to keep the cleanup path on @@ -665,7 +687,10 @@ retry: */ if (!released_lock) LWLockRelease(ReplicationSlotControlLock); + } + if (s->data.invalidated == RS_INVAL_INACTIVE_TIMEOUT) + { Assert(s->inactive_since > 0); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -675,6 +700,20 @@ retry: timestamptz_to_str(s->inactive_since), replication_slot_inactive_timeout))); } + + if (s->data.invalidated == RS_INVAL_XID_AGE) + { + Assert(TransactionIdIsValid(s->data.xmin) || + TransactionIdIsValid(s->data.catalog_xmin)); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("can no longer get changes from replication slot \"%s\"", + NameStr(s->data.name)), + errdetail("The slot's xmin %u or catalog_xmin %u has reached the age %d specified by replication_slot_xid_age.", + s->data.xmin, + s->data.catalog_xmin, + replication_slot_xid_age))); + } } if (!released_lock) @@ -1542,7 +1581,9 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, TransactionId snapshotConflictHorizon, - TimestampTz inactive_since) + TimestampTz inactive_since, + TransactionId xmin, + TransactionId catalog_xmin) { StringInfoData err_detail; bool hint = false; @@ -1579,6 +1620,27 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, timestamptz_to_str(inactive_since), replication_slot_inactive_timeout); break; + case RS_INVAL_XID_AGE: + Assert(TransactionIdIsValid(xmin) || + TransactionIdIsValid(catalog_xmin)); + + if (TransactionIdIsValid(xmin)) + { + appendStringInfo(&err_detail, _("The slot's xmin %u has reached the age %d specified by replication_slot_xid_age."), + xmin, + replication_slot_xid_age); + break; + } + + if (TransactionIdIsValid(catalog_xmin)) + { + appendStringInfo(&err_detail, _("The slot's catalog_xmin %u has reached the age %d specified by replication_slot_xid_age."), + catalog_xmin, + replication_slot_xid_age); + break; + } + + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1623,6 +1685,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; TimestampTz inactive_since = 0; + TransactionId aged_xmin = InvalidTransactionId; + TransactionId aged_catalog_xmin = InvalidTransactionId; for (;;) { @@ -1739,6 +1803,16 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, Assert(s->active_pid == 0); } break; + case RS_INVAL_XID_AGE: + if (ReplicationSlotIsXIDAged(s, &aged_xmin, &aged_catalog_xmin)) + { + Assert(TransactionIdIsValid(aged_xmin) || + TransactionIdIsValid(aged_catalog_xmin)); + + invalidation_cause = cause; + break; + } + break; case RS_INVAL_NONE: pg_unreachable(); } @@ -1827,7 +1901,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon, - inactive_since); + inactive_since, aged_xmin, + aged_catalog_xmin); if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, @@ -1874,7 +1949,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, oldestLSN, snapshotConflictHorizon, - inactive_since); + inactive_since, aged_xmin, + aged_catalog_xmin); /* done with this slot for now */ break; @@ -1898,6 +1974,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * db; dboid may be InvalidOid for shared relations * - RS_INVAL_WAL_LEVEL: is logical * - RS_INVAL_INACTIVE_TIMEOUT: inactive timeout occurs + * - RS_INVAL_XID_AGE: slot's xmin or catalog_xmin has reached the age * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ @@ -2031,14 +2108,20 @@ CheckPointReplicationSlots(bool is_shutdown) * * - Avoid saving slot info to disk two times for each invalidated slot. * - * XXX: Should we move inactive_timeout inavalidation check closer to - * wal_removed in CreateCheckPoint and CreateRestartPoint? + * XXX: Should we move inactive_timeout and xid_aged inavalidation checks + * closer to wal_removed in CreateCheckPoint and CreateRestartPoint? */ invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_INACTIVE_TIMEOUT, 0, InvalidOid, InvalidTransactionId); + if (!invalidated) + invalidated = InvalidateObsoleteReplicationSlots(RS_INVAL_XID_AGE, + 0, + InvalidOid, + InvalidTransactionId); + if (invalidated) { /* @@ -2050,6 +2133,65 @@ CheckPointReplicationSlots(bool is_shutdown) } } +/* + * Returns true if the given replication slot's xmin or catalog_xmin age is + * more than replication_slot_xid_age. + * + * Note that the caller must hold the replication slot's spinlock to avoid + * race conditions while this function reads xmin and catalog_xmin. + */ +static bool +ReplicationSlotIsXIDAged(ReplicationSlot *slot, TransactionId *xmin, + TransactionId *catalog_xmin) +{ + TransactionId cutoff; + TransactionId curr; + + if (replication_slot_xid_age == 0) + return false; + + curr = ReadNextTransactionId(); + + /* + * Replication slot's xmin and catalog_xmin can never be larger than the + * current transaction id even in the case of transaction ID wraparound. + */ + Assert(slot->data.xmin <= curr); + Assert(slot->data.catalog_xmin <= curr); + + /* + * The cutoff can tell how far we can go back from the current transaction + * id till the age. And then, we check whether or not the xmin or + * catalog_xmin falls within the cutoff; if yes, return true, otherwise + * false. + */ + cutoff = curr - replication_slot_xid_age; + + if (!TransactionIdIsNormal(cutoff)) + { + cutoff = FirstNormalTransactionId; + } + + *xmin = InvalidTransactionId; + *catalog_xmin = InvalidTransactionId; + + if (TransactionIdIsNormal(slot->data.xmin) && + TransactionIdPrecedesOrEquals(slot->data.xmin, cutoff)) + { + *xmin = slot->data.xmin; + return true; + } + + if (TransactionIdIsNormal(slot->data.catalog_xmin) && + TransactionIdPrecedesOrEquals(slot->data.catalog_xmin, cutoff)) + { + *catalog_xmin = slot->data.catalog_xmin; + return true; + } + + return false; +} + /* * Load all replication slots from disk into memory at server startup. This * needs to be run before we start crash recovery. diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 79e7637ec9..ea70e83350 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -2994,6 +2994,16 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replication_slot_xid_age", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Age of the transaction ID at which a replication slot gets invalidated."), + gettext_noop("The transaction is the oldest transaction (including the one affecting the system catalogs) that a replication slot needs the database to retain.") + }, + &replication_slot_xid_age, + 0, 0, 2000000000, + NULL, NULL, NULL + }, + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 819310b0a7..a2387ebd33 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -336,6 +336,7 @@ #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) #replication_slot_inactive_timeout = 0 # in seconds; 0 disables +#replication_slot_xid_age = 0 # - Primary Server - diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 8727b7b58b..19e5dbfb36 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -55,6 +55,8 @@ typedef enum ReplicationSlotInvalidationCause RS_INVAL_WAL_LEVEL, /* inactive slot timeout has occurred */ RS_INVAL_INACTIVE_TIMEOUT, + /* slot's xmin or catalog_xmin has reached the age */ + RS_INVAL_XID_AGE, } ReplicationSlotInvalidationCause; extern PGDLLIMPORT const char *const SlotInvalidationCauses[]; @@ -233,6 +235,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot; extern PGDLLIMPORT int max_replication_slots; extern PGDLLIMPORT char *standby_slot_names; extern PGDLLIMPORT int replication_slot_inactive_timeout; +extern PGDLLIMPORT int replication_slot_xid_age; /* shmem initialization functions */ extern Size ReplicationSlotsShmemSize(void); diff --git a/src/test/recovery/t/050_invalidate_slots.pl b/src/test/recovery/t/050_invalidate_slots.pl index 4663019c16..da05350df4 100644 --- a/src/test/recovery/t/050_invalidate_slots.pl +++ b/src/test/recovery/t/050_invalidate_slots.pl @@ -89,7 +89,7 @@ $primary->reload; # that nobody has acquired that slot yet, so due to # replication_slot_inactive_timeout setting above it must get invalidated. wait_for_slot_invalidation($primary, 'lsub1_sync_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Set timeout on the standby also to check the synced slots don't get # invalidated due to timeout on the standby. @@ -129,7 +129,7 @@ $standby1->stop; # Wait for the standby's replication slot to become inactive wait_for_slot_invalidation($primary, 'sb1_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Testcase end: Invalidate streaming standby's slot as well as logical failover # slot on primary due to replication_slot_inactive_timeout. Also, check the @@ -197,15 +197,280 @@ $subscriber->stop; # Wait for the replication slot to become inactive and then invalidated due to # timeout. wait_for_slot_invalidation($publisher, 'lsub1_slot', $logstart, - $inactive_timeout); + $inactive_timeout, 'inactive_timeout'); # Testcase end: Invalidate logical subscriber's slot due to # replication_slot_inactive_timeout. # ============================================================================= +# ============================================================================= +# Testcase start: Invalidate streaming standby's slot due to replication_slot_xid_age +# GUC. + +# Prepare for the next test +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_inactive_timeout TO '0'; +]); +$primary->reload; + +# Create a standby linking to the primary using the replication slot +my $standby2 = PostgreSQL::Test::Cluster->new('standby2'); +$standby2->init_from_backup($primary, $backup_name, has_streaming => 1); + +# Enable hs_feedback. The slot should gain an xmin. We set the status interval +# so we'll see the results promptly. +$standby2->append_conf( + 'postgresql.conf', q{ +primary_slot_name = 'sb2_slot' +hot_standby_feedback = on +wal_receiver_status_interval = 1 +}); + +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb2_slot', immediately_reserve := true); +]); + +$standby2->start; + +# Create some content on primary to move xmin +$primary->safe_psql('postgres', + "CREATE TABLE tab_int AS SELECT generate_series(1,10) AS a"); + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby2); + +$primary->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NOT NULL AND catalog_xmin IS NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'sb2_slot'; +]) or die "Timed out waiting for slot sb2_slot xmin to advance"; + +$primary->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$primary->reload; + +# Stop standby to make the replication slot's xmin on primary to age +$standby2->stop; + +$logstart = -s $primary->logfile; + +# Do some work to advance xids on primary +advance_xids($primary, 'tab_int'); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($primary, 'sb2_slot', $logstart, 0, 'xid_aged'); + +# Testcase end: Invalidate streaming standby's slot due to replication_slot_xid_age +# GUC. +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical subscriber's slot due to +# replication_slot_xid_age GUC. + +$publisher = $primary; +$publisher->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$publisher->reload; + +$subscriber->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +wal_receiver_status_interval = 1 +)); +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl2 (id int)"); + +# Insert some data +$publisher->safe_psql('postgres', + "INSERT INTO test_tbl2 VALUES (generate_series(1, 5));"); + +# Setup logical replication +$publisher_connstr = $publisher->connstr . ' dbname=postgres'; +$publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE test_tbl2"); + +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2 WITH (slot_name = 'lsub2_slot')" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub2'); + +$result = + $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl2"); + +is($result, qq(5), "check initial copy was done"); + +$publisher->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NULL AND catalog_xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'lsub2_slot'; +]) or die "Timed out waiting for slot lsub2_slot catalog_xmin to advance"; + +$logstart = -s $publisher->logfile; + +# Stop subscriber to make the replication slot on publisher inactive +$subscriber->stop; + +# Do some work to advance xids on publisher +advance_xids($publisher, 'test_tbl2'); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($publisher, 'lsub2_slot', $logstart, 0, + 'xid_aged'); + +# Testcase end: Invalidate logical subscriber's slot due to +# replication_slot_xid_age GUC. +# ============================================================================= + +# ============================================================================= +# Testcase start: Invalidate logical slot on standby that's being synced from +# the primary due to replication_slot_xid_age GUC. + +$publisher = $primary; + +# Prepare for the next test +$publisher->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 0; +]); +$publisher->reload; + +# Create a standby linking to the primary using the replication slot +my $standby3 = PostgreSQL::Test::Cluster->new('standby3'); +$standby3->init_from_backup($primary, $backup_name, has_streaming => 1); + +$standby3->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +primary_slot_name = 'sb3_slot' +primary_conninfo = '$connstr_1 dbname=postgres' +)); + +$primary->safe_psql( + 'postgres', qq[ + SELECT pg_create_physical_replication_slot(slot_name := 'sb3_slot', immediately_reserve := true); +]); + +$standby3->start; + +my $standby3_logstart = -s $standby3->logfile; + +# Wait until standby has replayed enough data +$primary->wait_for_catchup($standby3); + +$subscriber->append_conf( + 'postgresql.conf', qq( +hot_standby_feedback = on +wal_receiver_status_interval = 1 +)); +$subscriber->start; + +# Create tables +$publisher->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)"); +$subscriber->safe_psql('postgres', "CREATE TABLE test_tbl3 (id int)"); + +# Insert some data +$publisher->safe_psql('postgres', + "INSERT INTO test_tbl3 VALUES (generate_series(1, 5));"); + +# Setup logical replication +$publisher->safe_psql('postgres', + "CREATE PUBLICATION pub3 FOR TABLE test_tbl3"); + +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub3 CONNECTION '$publisher_connstr' PUBLICATION pub3 WITH (slot_name = 'lsub3_sync_slot', failover = true)" +); + +$subscriber->wait_for_subscription_sync($publisher, 'sub3'); + +$result = + $subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tbl3"); + +is($result, qq(5), "check initial copy was done"); + +$publisher->poll_query_until( + 'postgres', qq[ + SELECT xmin IS NULL AND catalog_xmin IS NOT NULL + FROM pg_catalog.pg_replication_slots + WHERE slot_name = 'lsub3_sync_slot'; +]) + or die "Timed out waiting for slot lsub3_sync_slot catalog_xmin to advance"; + +# Synchronize the primary server slots to the standby +$standby3->safe_psql('postgres', "SELECT pg_sync_replication_slots();"); + +# Confirm that the logical failover slot is created on the standby and is +# flagged as 'synced' and has got catalog_xmin from the primary. +is( $standby3->safe_psql( + 'postgres', + q{SELECT count(*) = 1 FROM pg_replication_slots + WHERE slot_name = 'lsub3_sync_slot' AND synced AND NOT temporary AND + xmin IS NULL AND catalog_xmin IS NOT NULL;} + ), + "t", + 'logical slot has synced as true on standby'); + +my $primary_catalog_xmin = $primary->safe_psql('postgres', + "SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;" +); + +my $stabdby3_catalog_xmin = $standby3->safe_psql('postgres', + "SELECT catalog_xmin FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND catalog_xmin IS NOT NULL;" +); + +is($primary_catalog_xmin, $stabdby3_catalog_xmin, + "check catalog_xmin are same for primary slot and synced slot"); + +# Enable XID age based invalidation on the standby. Note that we disabled the +# same on the primary to check if the invalidation occurs for synced slot on +# the standby. +$standby3->safe_psql( + 'postgres', qq[ + ALTER SYSTEM SET replication_slot_xid_age = 500; +]); +$standby3->reload; + +$logstart = -s $standby3->logfile; + +# Do some work to advance xids on primary +advance_xids($primary, 'test_tbl3'); + +# Wait for standby to catch up with the above work +$primary->wait_for_catchup($standby3); + +# Wait for the replication slot to become inactive and then invalidated due to +# XID age. +wait_for_slot_invalidation($standby3, 'lsub3_sync_slot', $logstart, 0, + 'xid_aged'); + +# Note that the replication slot on the primary is still active +$result = $primary->safe_psql('postgres', + "SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = 'lsub3_sync_slot' AND invalidation_reason IS NULL;" +); + +is($result, 't', "check lsub3_sync_slot is still active on primary"); + +# Testcase end: Invalidate logical slot on standby that's being synced from +# the primary due to replication_slot_xid_age GUC. +# ============================================================================= + sub wait_for_slot_invalidation { - my ($node, $slot_name, $offset, $inactive_timeout) = @_; + my ($node, $slot_name, $offset, $inactive_timeout, $reason) = @_; my $name = $node->name; # Wait for the replication slot to become inactive @@ -238,7 +503,7 @@ sub wait_for_slot_invalidation 'postgres', qq[ SELECT COUNT(slot_name) = 1 FROM pg_replication_slots WHERE slot_name = '$slot_name' AND - invalidation_reason = 'inactive_timeout'; + invalidation_reason = '$reason'; ]) or die "Timed out while waiting for inactive slot $slot_name to be invalidated on node $name"; @@ -283,4 +548,25 @@ sub check_for_slot_invalidation_in_server_log ); } +# Do some work for advancing xids on a given node +sub advance_xids +{ + my ($node, $table_name) = @_; + + $node->safe_psql( + 'postgres', qq[ + do \$\$ + begin + for i in 10000..11000 loop + -- use an exception block so that each iteration eats an XID + begin + insert into $table_name values (i); + exception + when division_by_zero then null; + end; + end loop; + end\$\$; + ]); +} + done_testing(); -- 2.34.1