From 9339d8203cafc03f7590c6770e7966321b8e3ea5 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Wed, 20 Mar 2024 22:37:54 +0000 Subject: [PATCH v13 3/6] Allow setting inactive_timeout for replication slots via SQL API --- contrib/test_decoding/expected/slot.out | 102 ++++++++++++++++++ contrib/test_decoding/sql/slot.sql | 34 ++++++ doc/src/sgml/func.sgml | 18 ++-- doc/src/sgml/system-views.sgml | 9 ++ src/backend/catalog/system_functions.sql | 2 + src/backend/catalog/system_views.sql | 3 +- src/backend/replication/logical/slotsync.c | 17 ++- src/backend/replication/slot.c | 20 +++- src/backend/replication/slotfuncs.c | 31 +++++- src/backend/replication/walsender.c | 4 +- src/bin/pg_upgrade/info.c | 6 +- src/bin/pg_upgrade/pg_upgrade.c | 5 +- src/bin/pg_upgrade/pg_upgrade.h | 2 + src/include/catalog/pg_proc.dat | 22 ++-- src/include/replication/slot.h | 5 +- .../t/040_standby_failover_slots_sync.pl | 11 +- src/test/regress/expected/rules.out | 5 +- 17 files changed, 257 insertions(+), 39 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 349ab2d380..6771520afb 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -466,3 +466,105 @@ SELECT pg_drop_replication_slot('physical_slot'); (1 row) +-- Test negative value for inactive_timeout option for slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300); -- error +ERROR: "inactive_timeout" must not be negative +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600); -- error +ERROR: "inactive_timeout" must not be negative +-- Test inactive_timeout option for temporary slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', temporary := true, inactive_timeout := 300); -- error +ERROR: cannot set inactive_timeout for a temporary replication slot +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', temporary := true, inactive_timeout := 600); -- error +ERROR: cannot set inactive_timeout for a temporary replication slot +-- Test inactive_timeout option of physical slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2'); + ?column? +---------- + init +(1 row) + +-- Copy physical slot with inactive_timeout option set. +SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3'); + ?column? +---------- + copy +(1 row) + +SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1; + slot_name | slot_type | inactive_timeout +--------------+-----------+------------------ + it_phy_slot1 | physical | 300 + it_phy_slot2 | physical | 0 + it_phy_slot3 | physical | 300 +(3 rows) + +SELECT pg_drop_replication_slot('it_phy_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('it_phy_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('it_phy_slot3'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +-- Test inactive_timeout option of logical slots. +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- Copy logical slot with inactive_timeout option set. +SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3'); + ?column? +---------- + copy +(1 row) + +SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1; + slot_name | slot_type | inactive_timeout +--------------+-----------+------------------ + it_log_slot1 | logical | 600 + it_log_slot2 | logical | 0 + it_log_slot3 | logical | 600 +(3 rows) + +SELECT pg_drop_replication_slot('it_log_slot1'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('it_log_slot2'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('it_log_slot3'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 580e3ae3be..443e91da07 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -190,3 +190,37 @@ SELECT pg_drop_replication_slot('failover_true_slot'); SELECT pg_drop_replication_slot('failover_false_slot'); SELECT pg_drop_replication_slot('failover_default_slot'); SELECT pg_drop_replication_slot('physical_slot'); + +-- Test negative value for inactive_timeout option for slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', inactive_timeout := -300); -- error +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', inactive_timeout := -600); -- error + +-- Test inactive_timeout option for temporary slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_fail_slot', temporary := true, inactive_timeout := 300); -- error +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_fail_slot', plugin := 'test_decoding', temporary := true, inactive_timeout := 600); -- error + +-- Test inactive_timeout option of physical slots. +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot1', immediately_reserve := true, inactive_timeout := 300); +SELECT 'init' FROM pg_create_physical_replication_slot(slot_name := 'it_phy_slot2'); + +-- Copy physical slot with inactive_timeout option set. +SELECT 'copy' FROM pg_copy_physical_replication_slot(src_slot_name := 'it_phy_slot1', dst_slot_name := 'it_phy_slot3'); + +SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1; + +SELECT pg_drop_replication_slot('it_phy_slot1'); +SELECT pg_drop_replication_slot('it_phy_slot2'); +SELECT pg_drop_replication_slot('it_phy_slot3'); + +-- Test inactive_timeout option of logical slots. +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot1', plugin := 'test_decoding', inactive_timeout := 600); +SELECT 'init' FROM pg_create_logical_replication_slot(slot_name := 'it_log_slot2', plugin := 'test_decoding'); + +-- Copy logical slot with inactive_timeout option set. +SELECT 'copy' FROM pg_copy_logical_replication_slot(src_slot_name := 'it_log_slot1', dst_slot_name := 'it_log_slot3'); + +SELECT slot_name, slot_type, inactive_timeout FROM pg_replication_slots ORDER BY 1; + +SELECT pg_drop_replication_slot('it_log_slot1'); +SELECT pg_drop_replication_slot('it_log_slot2'); +SELECT pg_drop_replication_slot('it_log_slot3'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 030ea8affd..9467ff86b3 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -28163,7 +28163,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_physical_replication_slot - pg_create_physical_replication_slot ( slot_name name , immediately_reserve boolean, temporary boolean ) + pg_create_physical_replication_slot ( slot_name name , immediately_reserve boolean, temporary boolean, inactive_timeout integer ) record ( slot_name name, lsn pg_lsn ) @@ -28180,9 +28180,12 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset parameter, temporary, when set to true, specifies that the slot should not be permanently stored to disk and is only meant for use by the current session. Temporary slots are also - released upon any error. This function corresponds - to the replication protocol command CREATE_REPLICATION_SLOT - ... PHYSICAL. + released upon any error. The optional fourth + parameter, inactive_timeout, when set to a + non-zero value, specifies the amount of time in seconds the slot is + allowed to be inactive. This function corresponds to the replication + protocol command + CREATE_REPLICATION_SLOT ... PHYSICAL. @@ -28207,7 +28210,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean, inactive_timeout integer ) record ( slot_name name, lsn pg_lsn ) @@ -28226,7 +28229,10 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset failover, when set to true, specifies that this slot is enabled to be synced to the standbys so that logical replication can be resumed after - failover. A call to this function has the same effect as + failover. The optional sixth parameter, + inactive_timeout, when set to a + non-zero value, specifies the amount of time in seconds the slot is + allowed to be inactive. A call to this function has the same effect as the replication protocol command CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 95355743ca..ec60c43038 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2750,6 +2750,15 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx ID of role + + + + inactive_timeout integer + + + The amount of time in seconds the slot is allowed to be inactive. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index fe2bb50f46..af27616657 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -469,6 +469,7 @@ AS 'pg_logical_emit_message_bytea'; CREATE OR REPLACE FUNCTION pg_create_physical_replication_slot( IN slot_name name, IN immediately_reserve boolean DEFAULT false, IN temporary boolean DEFAULT false, + IN inactive_timeout int DEFAULT 0, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL @@ -480,6 +481,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, IN failover boolean DEFAULT false, + IN inactive_timeout int DEFAULT 0, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 2fa4272006..a43048ae93 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1027,7 +1027,8 @@ CREATE VIEW pg_replication_slots AS L.failover, L.synced, L.invalidation_reason, - L.last_inactive_at + L.last_inactive_at, + L.inactive_timeout FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 30480960c5..c01876ceeb 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -131,6 +131,7 @@ typedef struct RemoteSlot char *database; bool two_phase; bool failover; + int inactive_timeout; XLogRecPtr restart_lsn; XLogRecPtr confirmed_lsn; TransactionId catalog_xmin; @@ -167,7 +168,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) remote_slot->two_phase == slot->data.two_phase && remote_slot->failover == slot->data.failover && remote_slot->confirmed_lsn == slot->data.confirmed_flush && - strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0) + strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0 && + remote_slot->inactive_timeout == slot->data.inactive_timeout) return false; /* Avoid expensive operations while holding a spinlock. */ @@ -182,6 +184,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) slot->data.confirmed_flush = remote_slot->confirmed_lsn; slot->data.catalog_xmin = remote_slot->catalog_xmin; slot->effective_catalog_xmin = remote_slot->catalog_xmin; + slot->data.inactive_timeout = remote_slot->inactive_timeout; SpinLockRelease(&slot->mutex); if (xmin_changed) @@ -607,7 +610,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) ReplicationSlotCreate(remote_slot->name, true, RS_TEMPORARY, remote_slot->two_phase, remote_slot->failover, - true); + true, 0); /* For shorter lines. */ slot = MyReplicationSlot; @@ -627,6 +630,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) SpinLockAcquire(&slot->mutex); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; + slot->data.inactive_timeout = remote_slot->inactive_timeout; SpinLockRelease(&slot->mutex); ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); @@ -652,9 +656,9 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) static bool synchronize_slots(WalReceiverConn *wrconn) { -#define SLOTSYNC_COLUMN_COUNT 9 +#define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, - LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID}; + LSNOID, XIDOID, BOOLOID, BOOLOID, TEXTOID, TEXTOID, INT4OID}; WalRcvExecResult *res; TupleTableSlot *tupslot; @@ -663,7 +667,7 @@ synchronize_slots(WalReceiverConn *wrconn) bool started_tx = false; const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," " restart_lsn, catalog_xmin, two_phase, failover," - " database, invalidation_reason" + " database, invalidation_reason, inactive_timeout" " FROM pg_catalog.pg_replication_slots" " WHERE failover and NOT temporary"; @@ -743,6 +747,9 @@ synchronize_slots(WalReceiverConn *wrconn) remote_slot->invalidated = isnull ? RS_INVAL_NONE : GetSlotInvalidationCause(TextDatumGetCString(d)); + remote_slot->inactive_timeout = DatumGetInt32(slot_getattr(tupslot, ++col, + &isnull)); + /* Sanity check */ Assert(col == SLOTSYNC_COLUMN_COUNT); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 146f0fbf84..195771920f 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -129,7 +129,7 @@ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 5 /* version for new files */ +#define SLOT_VERSION 6 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -304,11 +304,14 @@ ReplicationSlotValidateName(const char *name, int elevel) * failover: If enabled, allows the slot to be synced to standbys so * that logical replication can be resumed after failover. * synced: True if the slot is synchronized from the primary server. + * inactive_timeout: The amount of time in seconds the slot is allowed to be + * inactive. */ void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase, bool failover, bool synced) + bool two_phase, bool failover, bool synced, + int inactive_timeout) { ReplicationSlot *slot = NULL; int i; @@ -345,6 +348,18 @@ ReplicationSlotCreate(const char *name, bool db_specific, errmsg("cannot enable failover for a temporary replication slot")); } + if (inactive_timeout > 0) + { + /* + * Do not allow users to set inactive_timeout for temporary slots, + * because temporary slots will not be saved to the disk. + */ + if (persistency == RS_TEMPORARY) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot set inactive_timeout for a temporary replication slot")); + } + /* * If some other backend ran this code concurrently with us, we'd likely * both allocate the same slot, and that would be bad. We'd also be at @@ -398,6 +413,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.two_phase_at = InvalidXLogRecPtr; slot->data.failover = failover; slot->data.synced = synced; + slot->data.inactive_timeout = inactive_timeout; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 2c33cc0c16..55ff73cc78 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -38,14 +38,15 @@ */ static void create_physical_replication_slot(char *name, bool immediately_reserve, - bool temporary, XLogRecPtr restart_lsn) + bool temporary, int inactive_timeout, + XLogRecPtr restart_lsn) { Assert(!MyReplicationSlot); /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, temporary ? RS_TEMPORARY : RS_PERSISTENT, false, - false, false); + false, false, inactive_timeout); if (immediately_reserve) { @@ -71,6 +72,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) Name name = PG_GETARG_NAME(0); bool immediately_reserve = PG_GETARG_BOOL(1); bool temporary = PG_GETARG_BOOL(2); + int inactive_timeout = PG_GETARG_INT32(3); Datum values[2]; bool nulls[2]; TupleDesc tupdesc; @@ -84,9 +86,15 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) CheckSlotRequirements(); + if (inactive_timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"inactive_timeout\" must not be negative"))); + create_physical_replication_slot(NameStr(*name), immediately_reserve, temporary, + inactive_timeout, InvalidXLogRecPtr); values[0] = NameGetDatum(&MyReplicationSlot->data.name); @@ -120,7 +128,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, - bool failover, + bool failover, int inactive_timeout, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -138,7 +146,7 @@ create_logical_replication_slot(char *name, char *plugin, */ ReplicationSlotCreate(name, true, temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, - failover, false); + failover, false, inactive_timeout); /* * Create logical decoding context to find start point or, if we don't @@ -177,6 +185,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); bool failover = PG_GETARG_BOOL(4); + int inactive_timeout = PG_GETARG_INT32(5); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -190,11 +199,17 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) CheckLogicalDecodingRequirements(); + if (inactive_timeout < 0) + ereport(ERROR, + (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE), + errmsg("\"inactive_timeout\" must not be negative"))); + create_logical_replication_slot(NameStr(*name), NameStr(*plugin), temporary, two_phase, failover, + inactive_timeout, InvalidXLogRecPtr, true); @@ -239,7 +254,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 19 +#define PG_GET_REPLICATION_SLOTS_COLS 20 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -441,6 +456,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + values[i++] = Int32GetDatum(slot_contents.data.inactive_timeout); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -720,6 +737,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + int inactive_timeout; char *plugin; Datum values[2]; bool nulls[2]; @@ -776,6 +794,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; + inactive_timeout = first_slot_contents.data.inactive_timeout; /* Check type of replication slot */ if (src_islogical != logical_slot) @@ -823,6 +842,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) temporary, false, false, + inactive_timeout, src_restart_lsn, false); } @@ -830,6 +850,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) create_physical_replication_slot(NameStr(*dst_name), true, temporary, + inactive_timeout, src_restart_lsn); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc40c454de..5315c08650 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1221,7 +1221,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false, false, false); + false, false, false, 0); if (reserve_wal) { @@ -1252,7 +1252,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase, failover, false); + two_phase, failover, false, 0); /* * Do options check early so that we can bail before calling the diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 34a157f792..6817e9be67 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -676,7 +676,8 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * removed. */ res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " - "%s as caught_up, invalidation_reason IS NOT NULL as invalid " + "%s as caught_up, invalidation_reason IS NOT NULL as invalid, " + "inactive_timeout " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " "database = current_database() AND " @@ -696,6 +697,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) int i_failover; int i_caught_up; int i_invalid; + int i_inactive_timeout; slotinfos = (LogicalSlotInfo *) pg_malloc(sizeof(LogicalSlotInfo) * num_slots); @@ -705,6 +707,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) i_failover = PQfnumber(res, "failover"); i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); + i_inactive_timeout = PQfnumber(res, "inactive_timeout"); for (int slotnum = 0; slotnum < num_slots; slotnum++) { @@ -716,6 +719,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); + curr->inactive_timeout = atooid(PQgetvalue(res, slotnum, i_inactive_timeout)); } } diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index f6143b6bc4..2656056103 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -931,9 +931,10 @@ create_logical_replication_slots(void) appendPQExpBuffer(query, ", "); appendStringLiteralConn(query, slot_info->plugin, conn); - appendPQExpBuffer(query, ", false, %s, %s);", + appendPQExpBuffer(query, ", false, %s, %s, %d);", slot_info->two_phase ? "true" : "false", - slot_info->failover ? "true" : "false"); + slot_info->failover ? "true" : "false", + slot_info->inactive_timeout); PQclear(executeQueryOrDie(conn, "%s", query->data)); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index 92bcb693fb..eb86d000b1 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -162,6 +162,8 @@ typedef struct bool invalid; /* if true, the slot is unusable */ bool failover; /* is the slot designated to be synced to the * physical standby? */ + int inactive_timeout; /* The amount of time in seconds the slot + * is allowed to be inactive. */ } LogicalSlotInfo; typedef struct diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d89a223a60..80c281c8a5 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11105,10 +11105,10 @@ # replication slots { oid => '3779', descr => 'create a physical replication slot', proname => 'pg_create_physical_replication_slot', provolatile => 'v', - proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool', - proallargtypes => '{name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,o,o}', - proargnames => '{slot_name,immediately_reserve,temporary,slot_name,lsn}', + proparallel => 'u', prorettype => 'record', proargtypes => 'name bool bool int4', + proallargtypes => '{name,bool,bool,int4,name,pg_lsn}', + proargmodes => '{i,i,i,i,o,o}', + proargnames => '{slot_name,immediately_reserve,temporary,inactive_timeout,slot_name,lsn}', prosrc => 'pg_create_physical_replication_slot' }, { oid => '4220', descr => 'copy a physical replication slot, changing temporality', @@ -11133,17 +11133,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason,last_inactive_at}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool,bool,bool,text,timestamptz,int4}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting,failover,synced,invalidation_reason,last_inactive_at,inactive_timeout}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool bool', - proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', + proargtypes => 'name name bool bool bool int4', + proallargtypes => '{name,name,bool,bool,bool,int4,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,inactive_timeout,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index b4bb7f5e99..ff62542b03 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -127,6 +127,9 @@ typedef struct ReplicationSlotPersistentData * for logical slots on the primary server. */ bool failover; + + /* The amount of time in seconds the slot is allowed to be inactive. */ + int inactive_timeout; } ReplicationSlotPersistentData; /* @@ -239,7 +242,7 @@ extern void ReplicationSlotsShmemInit(void); extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, bool two_phase, bool failover, - bool synced); + bool synced, int inactive_timeout); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); extern void ReplicationSlotDropAcquired(void); diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index f47bfd78eb..e4e244effb 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -153,7 +153,7 @@ $primary->append_conf('postgresql.conf', "log_min_messages = 'debug2'"); $primary->reload; $primary->psql('postgres', - q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true);} + q{SELECT pg_create_logical_replication_slot('lsub2_slot', 'test_decoding', false, false, true, 3600);} ); $primary->psql('postgres', @@ -190,6 +190,15 @@ is( $standby1->safe_psql( "t", 'logical slots have synced as true on standby'); +# Confirm that the synced slot on the standby has got inactive_timeout from the +# primary. +is( $standby1->safe_psql( + 'postgres', + q{SELECT inactive_timeout FROM pg_replication_slots WHERE slot_name = 'lsub2_slot' AND synced AND NOT temporary;} + ), + "3600", + 'synced logical slot has got inactive_timeout on standby'); + ################################################## # Test that the synchronized slot will be dropped if the corresponding remote # slot on the primary server has been dropped. diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 88fbd6a53c..1c683ceaca 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name, l.failover, l.synced, l.invalidation_reason, - l.last_inactive_at - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason, last_inactive_at) + l.last_inactive_at, + l.inactive_timeout + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting, failover, synced, invalidation_reason, last_inactive_at, inactive_timeout) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.34.1