From 60a8101d03fe5780402b563f166ace619ddad374 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Wed, 24 Jan 2024 20:14:52 +0800 Subject: [PATCH v68 1/8] Add the failover property to replication slot. This commit adds the failover property to the replication slot. The failover property indicates whether the slot will be synced to the standby servers, enabling the resumption of corresponding logical replication after failover. But note that this commit does not yet include the capability to actually sync the replication slot; the subsequent commit will address that. In addition, a new parameter 'failover' is added to the pg_create_logical_replication_slot function. The value of the 'failover' flag is displayed as part of pg_replication_slots view. --- contrib/test_decoding/expected/slot.out | 58 ++++++++++++++++++++++++ contrib/test_decoding/sql/slot.sql | 13 ++++++ doc/src/sgml/func.sgml | 11 +++-- doc/src/sgml/system-views.sgml | 11 +++++ src/backend/catalog/system_functions.sql | 1 + src/backend/catalog/system_views.sql | 3 +- src/backend/replication/slot.c | 8 +++- src/backend/replication/slotfuncs.c | 16 +++++-- src/backend/replication/walsender.c | 4 +- src/bin/pg_upgrade/info.c | 5 +- src/bin/pg_upgrade/pg_upgrade.c | 6 ++- src/bin/pg_upgrade/pg_upgrade.h | 2 + src/include/catalog/pg_proc.dat | 14 +++--- src/include/replication/slot.h | 8 +++- src/test/regress/expected/rules.out | 5 +- 15 files changed, 141 insertions(+), 24 deletions(-) diff --git a/contrib/test_decoding/expected/slot.out b/contrib/test_decoding/expected/slot.out index 63a9940f73..261d8886d3 100644 --- a/contrib/test_decoding/expected/slot.out +++ b/contrib/test_decoding/expected/slot.out @@ -406,3 +406,61 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp'); (1 row) +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); + ?column? +---------- + init +(1 row) + +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + ?column? +---------- + init +(1 row) + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + slot_name | slot_type | failover +-----------------------+-----------+---------- + failover_true_slot | logical | t + failover_false_slot | logical | f + failover_default_slot | logical | f + physical_slot | physical | f +(4 rows) + +SELECT pg_drop_replication_slot('failover_true_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_false_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('failover_default_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_drop_replication_slot('physical_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/slot.sql b/contrib/test_decoding/sql/slot.sql index 1aa27c5667..45aeae7fd5 100644 --- a/contrib/test_decoding/sql/slot.sql +++ b/contrib/test_decoding/sql/slot.sql @@ -176,3 +176,16 @@ ORDER BY o.slot_name, c.slot_name; SELECT pg_drop_replication_slot('orig_slot2'); SELECT pg_drop_replication_slot('copied_slot2_no_change'); SELECT pg_drop_replication_slot('copied_slot2_notemp'); + +-- Test failover option of slots. +SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false); +SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false); +SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot'); + +SELECT slot_name, slot_type, failover FROM pg_replication_slots; + +SELECT pg_drop_replication_slot('failover_true_slot'); +SELECT pg_drop_replication_slot('failover_false_slot'); +SELECT pg_drop_replication_slot('failover_default_slot'); +SELECT pg_drop_replication_slot('physical_slot'); diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml index 210c7c0b02..0ee76835bc 100644 --- a/doc/src/sgml/func.sgml +++ b/doc/src/sgml/func.sgml @@ -27655,7 +27655,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset pg_create_logical_replication_slot - pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean ) + pg_create_logical_replication_slot ( slot_name name, plugin name , temporary boolean, twophase boolean, failover boolean ) record ( slot_name name, lsn pg_lsn ) @@ -27670,8 +27670,13 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset released upon any error. The optional fourth parameter, twophase, when set to true, specifies that the decoding of prepared transactions is enabled for this - slot. A call to this function has the same effect as the replication - protocol command CREATE_REPLICATION_SLOT ... LOGICAL. + slot. The optional fifth parameter, + failover, when set to true, + specifies that this slot is enabled to be synced to the + standbys so that logical replication can be resumed after + failover. A call to this function has the same effect as + the replication protocol command + CREATE_REPLICATION_SLOT ... LOGICAL. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 72d01fc624..88fe4b6341 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -2555,6 +2555,17 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx + + + + failover bool + + + True if this is a logical slot enabled to be synced to the standbys + so that logical replication can be resumed from the new primary + after failover. Always false for physical slots. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index f315fecf18..346cfb98a0 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -479,6 +479,7 @@ CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot( IN slot_name name, IN plugin name, IN temporary boolean DEFAULT false, IN twophase boolean DEFAULT false, + IN failover boolean DEFAULT false, OUT slot_name name, OUT lsn pg_lsn) RETURNS RECORD LANGUAGE INTERNAL diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index e43e36f5ac..d0f711a619 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1023,7 +1023,8 @@ CREATE VIEW pg_replication_slots AS L.wal_status, L.safe_wal_size, L.two_phase, - L.conflict_reason + L.conflict_reason, + L.failover FROM pg_get_replication_slots() AS L LEFT JOIN pg_database D ON (L.datoid = D.oid); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 52da694c79..02a14ec210 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -90,7 +90,7 @@ typedef struct ReplicationSlotOnDisk sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize #define SLOT_MAGIC 0x1051CA1 /* format identifier */ -#define SLOT_VERSION 3 /* version for new files */ +#define SLOT_VERSION 4 /* version for new files */ /* Control array for replication slot management */ ReplicationSlotCtlData *ReplicationSlotCtl = NULL; @@ -248,10 +248,13 @@ ReplicationSlotValidateName(const char *name, int elevel) * during getting changes, if the two_phase option is enabled it can skip * prepare because by that time start decoding point has been moved. So the * user will only get commit prepared. + * failover: If enabled, allows the slot to be synced to standbys so + * that logical replication can be resumed after failover. */ void ReplicationSlotCreate(const char *name, bool db_specific, - ReplicationSlotPersistency persistency, bool two_phase) + ReplicationSlotPersistency persistency, + bool two_phase, bool failover) { ReplicationSlot *slot = NULL; int i; @@ -311,6 +314,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, slot->data.persistency = persistency; slot->data.two_phase = two_phase; slot->data.two_phase_at = InvalidXLogRecPtr; + slot->data.failover = failover; /* and then data only present in shared memory */ slot->just_dirtied = false; diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index cad35dce7f..eb685089b3 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -42,7 +42,8 @@ create_physical_replication_slot(char *name, bool immediately_reserve, /* acquire replication slot, this will check for conflicting names */ ReplicationSlotCreate(name, false, - temporary ? RS_TEMPORARY : RS_PERSISTENT, false); + temporary ? RS_TEMPORARY : RS_PERSISTENT, false, + false); if (immediately_reserve) { @@ -117,6 +118,7 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS) static void create_logical_replication_slot(char *name, char *plugin, bool temporary, bool two_phase, + bool failover, XLogRecPtr restart_lsn, bool find_startpoint) { @@ -133,7 +135,8 @@ create_logical_replication_slot(char *name, char *plugin, * error as well. */ ReplicationSlotCreate(name, true, - temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase); + temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase, + failover); /* * Create logical decoding context to find start point or, if we don't @@ -171,6 +174,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) Name plugin = PG_GETARG_NAME(1); bool temporary = PG_GETARG_BOOL(2); bool two_phase = PG_GETARG_BOOL(3); + bool failover = PG_GETARG_BOOL(4); Datum result; TupleDesc tupdesc; HeapTuple tuple; @@ -188,6 +192,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) NameStr(*plugin), temporary, two_phase, + failover, InvalidXLogRecPtr, true); @@ -232,7 +237,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS) Datum pg_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_GET_REPLICATION_SLOTS_COLS 15 +#define PG_GET_REPLICATION_SLOTS_COLS 16 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; XLogRecPtr currlsn; int slotno; @@ -426,6 +431,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) } } + values[i++] = BoolGetDatum(slot_contents.data.failover); + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, @@ -693,6 +700,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; + bool failover; char *plugin; Datum values[2]; bool nulls[2]; @@ -748,6 +756,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) src_islogical = SlotIsLogical(&first_slot_contents); src_restart_lsn = first_slot_contents.data.restart_lsn; temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + failover = first_slot_contents.data.failover; plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; /* Check type of replication slot */ @@ -787,6 +796,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) plugin, temporary, false, + failover, src_restart_lsn, false); } diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 087031e9dc..aa80f3de20 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1212,7 +1212,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) { ReplicationSlotCreate(cmd->slotname, false, cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT, - false); + false, false); if (reserve_wal) { @@ -1243,7 +1243,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) */ ReplicationSlotCreate(cmd->slotname, true, cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL, - two_phase); + two_phase, false); /* * Do options check early so that we can bail before calling the diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 74e02b3f82..183c2f84eb 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -666,7 +666,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) * started and stopped several times causing any temporary slots to be * removed. */ - res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, " + res = executeQueryOrDie(conn, "SELECT slot_name, plugin, two_phase, failover, " "%s as caught_up, conflict_reason IS NOT NULL as invalid " "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " @@ -684,6 +684,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) int i_slotname; int i_plugin; int i_twophase; + int i_failover; int i_caught_up; int i_invalid; @@ -692,6 +693,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) i_slotname = PQfnumber(res, "slot_name"); i_plugin = PQfnumber(res, "plugin"); i_twophase = PQfnumber(res, "two_phase"); + i_failover = PQfnumber(res, "failover"); i_caught_up = PQfnumber(res, "caught_up"); i_invalid = PQfnumber(res, "invalid"); @@ -702,6 +704,7 @@ get_old_cluster_logical_slot_infos(DbInfo *dbinfo, bool live_check) curr->slotname = pg_strdup(PQgetvalue(res, slotnum, i_slotname)); curr->plugin = pg_strdup(PQgetvalue(res, slotnum, i_plugin)); curr->two_phase = (strcmp(PQgetvalue(res, slotnum, i_twophase), "t") == 0); + curr->failover = (strcmp(PQgetvalue(res, slotnum, i_failover), "t") == 0); curr->caught_up = (strcmp(PQgetvalue(res, slotnum, i_caught_up), "t") == 0); curr->invalid = (strcmp(PQgetvalue(res, slotnum, i_invalid), "t") == 0); } diff --git a/src/bin/pg_upgrade/pg_upgrade.c b/src/bin/pg_upgrade/pg_upgrade.c index 14a36f0503..10c94a6c1f 100644 --- a/src/bin/pg_upgrade/pg_upgrade.c +++ b/src/bin/pg_upgrade/pg_upgrade.c @@ -916,8 +916,10 @@ create_logical_replication_slots(void) appendStringLiteralConn(query, slot_info->slotname, conn); appendPQExpBuffer(query, ", "); appendStringLiteralConn(query, slot_info->plugin, conn); - appendPQExpBuffer(query, ", false, %s);", - slot_info->two_phase ? "true" : "false"); + + appendPQExpBuffer(query, ", false, %s, %s);", + slot_info->two_phase ? "true" : "false", + slot_info->failover ? "true" : "false"); PQclear(executeQueryOrDie(conn, "%s", query->data)); diff --git a/src/bin/pg_upgrade/pg_upgrade.h b/src/bin/pg_upgrade/pg_upgrade.h index a1d08c3dab..d9a848cbfd 100644 --- a/src/bin/pg_upgrade/pg_upgrade.h +++ b/src/bin/pg_upgrade/pg_upgrade.h @@ -160,6 +160,8 @@ typedef struct bool two_phase; /* can the slot decode 2PC? */ bool caught_up; /* has the slot caught up to latest changes? */ bool invalid; /* if true, the slot is unusable */ + bool failover; /* is the slot designated to be synced to the + * physical standby? */ } LogicalSlotInfo; typedef struct diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index ad74e07dbb..e6e4bbdfb9 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11123,17 +11123,17 @@ proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', prorettype => 'record', proargtypes => '', - proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text}', - proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason}', + proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,text,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflict_reason,failover}', prosrc => 'pg_get_replication_slots' }, { oid => '3786', descr => 'set up a logical replication slot', proname => 'pg_create_logical_replication_slot', provolatile => 'v', proparallel => 'u', prorettype => 'record', - proargtypes => 'name name bool bool', - proallargtypes => '{name,name,bool,bool,name,pg_lsn}', - proargmodes => '{i,i,i,i,o,o}', - proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}', + proargtypes => 'name name bool bool bool', + proallargtypes => '{name,name,bool,bool,bool,name,pg_lsn}', + proargmodes => '{i,i,i,i,i,o,o}', + proargnames => '{slot_name,plugin,temporary,twophase,failover,slot_name,lsn}', prosrc => 'pg_create_logical_replication_slot' }, { oid => '4222', descr => 'copy a logical replication slot, changing temporality and plugin', diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h index 9e39aaf303..f2501be553 100644 --- a/src/include/replication/slot.h +++ b/src/include/replication/slot.h @@ -111,6 +111,12 @@ typedef struct ReplicationSlotPersistentData /* plugin name */ NameData plugin; + + /* + * Is this a failover slot (sync candidate for standbys)? Only + * relevant for logical slots on the primary server. + */ + bool failover; } ReplicationSlotPersistentData; /* @@ -218,7 +224,7 @@ extern void ReplicationSlotsShmemInit(void); /* management of individual slots */ extern void ReplicationSlotCreate(const char *name, bool db_specific, ReplicationSlotPersistency persistency, - bool two_phase); + bool two_phase, bool failover); extern void ReplicationSlotPersist(void); extern void ReplicationSlotDrop(const char *name, bool nowait); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 55f2e95352..57884d3fec 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1473,8 +1473,9 @@ pg_replication_slots| SELECT l.slot_name, l.wal_status, l.safe_wal_size, l.two_phase, - l.conflict_reason - FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason) + l.conflict_reason, + l.failover + FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflict_reason, failover) LEFT JOIN pg_database d ON ((l.datoid = d.oid))); pg_roles| SELECT pg_authid.rolname, pg_authid.rolsuper, -- 2.30.0.windows.2