From 0258212ea524a5e0250939407f705121e0597a10 Mon Sep 17 00:00:00 2001 From: Hayato Kuroda Date: Mon, 24 Apr 2023 11:03:28 +0000 Subject: [PATCH v15 4/4] Change the method used to check logical replication slots during the live check When a live check is requested, there is a possibility of additional changes occurring, which may cause the current WAL position to exceed the confirmed_flush_lsn of the slot. As a result, we check the confirmed_flush_lsn of each logical slot instead. This is sufficient as all the WAL records will be sent during thepublisher's shutdown. Author: Hayato Kuroda Reviewed-by: Wang Wei --- src/bin/pg_upgrade/check.c | 68 ++++++++++++++++++- .../t/003_logical_replication_slots.pl | 66 +++++++++++++++++- 2 files changed, 130 insertions(+), 4 deletions(-) diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 391144570f..b3da2e9193 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -35,6 +35,7 @@ static void check_for_new_tablespace_dir(ClusterInfo *new_cluster); static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster); static void check_for_parameter_settings(ClusterInfo *new_cluster); static void check_for_confirmed_flush_lsn(ClusterInfo *cluster); +static void check_are_logical_slots_active(ClusterInfo *cluster); /* * fix_path_separator @@ -112,7 +113,19 @@ check_and_dump_old_cluster(bool live_check) check_for_reg_data_type_usage(&old_cluster); check_for_isn_and_int8_passing_mismatch(&old_cluster); if (user_opts.include_logical_slots) - check_for_confirmed_flush_lsn(&old_cluster); + { + /* + * The method used to check logical replication slots is dependent on + * the value of the live_check parameter. This change was implemented + * because, during a live check, it is possible for additional changes + * to occur at the old node, which could cause the current WAL position + * to exceed the confirmed_flush_lsn of the slot. + */ + if (live_check) + check_are_logical_slots_active(&old_cluster); + else + check_for_confirmed_flush_lsn(&old_cluster); + } /* * PG 16 increased the size of the 'aclitem' type, which breaks the @@ -1479,6 +1492,59 @@ check_for_parameter_settings(ClusterInfo *new_cluster) check_ok(); } +/* + * Verify that all logical replication slots are active + */ +static void +check_are_logical_slots_active(ClusterInfo *cluster) +{ + int i, + ntups, + i_slotname; + bool is_error = false; + PGresult *res; + DbInfo *active_db = &cluster->dbarr.dbs[0]; + PGconn *conn = connectToServer(cluster, active_db->db_name); + + Assert(user_opts.include_logical_slots); + + /* --include-logical-replication-slots can be used since PG16. */ + if (GET_MAJOR_VERSION(cluster->major_version) <= 1500) + return; + + prep_status("Checking for logical replication slots"); + + res = executeQueryOrDie(conn, + "SELECT slot_name FROM pg_catalog.pg_replication_slots " + "WHERE active IS FALSE " + "AND temporary = false AND wal_status IN ('reserved', 'extended');"); + + ntups = PQntuples(res); + i_slotname = PQfnumber(res, "slot_name"); + + for (i = 0; i < ntups; i++) + { + char *slotname; + + is_error = true; + + slotname = PQgetvalue(res, i, i_slotname); + + pg_log(PG_WARNING, + "\nWARNING: logical replication slot \"%s\" is not active", + slotname); + } + + PQclear(res); + PQfinish(conn); + + if (is_error) + pg_fatal("--include-logical-replication-slots with --check requires that " + "all logical replication slots are active"); + + check_ok(); +} + /* * Verify that all logical replication slots consumed all WALs, except a * CHECKPOINT_SHUTDOWN record. diff --git a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl index 21fefca084..d7fd864bd7 100644 --- a/src/bin/pg_upgrade/t/003_logical_replication_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_replication_slots.pl @@ -19,6 +19,10 @@ my $old_node = PostgreSQL::Test::Cluster->new('old_node'); $old_node->init(allows_streaming => 'logical'); $old_node->start; +# Initialize subscriber, which will be used only for --check +my $subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$subscriber->init(allows_streaming => 'logical'); + # Initialize new node my $new_node = PostgreSQL::Test::Cluster->new('new_node'); $new_node->init(allows_streaming => 1); @@ -76,15 +80,71 @@ rmtree($new_node->data_dir . "/pg_upgrade_output.d"); # non-zero value to succeed the pg_upgrade $new_node->append_conf('postgresql.conf', "max_replication_slots = 10"); -# Create a slot on old node, and generate WALs +# Setup logical replication $old_node->start; +$old_node->safe_psql('postgres', + "CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a"); +$old_node->safe_psql('postgres', "CREATE PUBLICATION pub FOR ALL TABLES"); +my $old_connstr = $old_node->connstr . ' dbname=postgres'; + +$subscriber->start; +$subscriber->safe_psql('postgres', "CREATE TABLE tbl (a int)"); +$subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION sub CONNECTION '$old_connstr' PUBLICATION pub WITH (copy_data = true)" +); + +# Wait for initial table sync to finish +$subscriber->wait_for_subscription_sync($old_node, 'sub'); + +my $result = $subscriber->safe_psql('postgres', "SELECT count(*) FROM tbl"); +is($result, qq(10), 'check initial rows on subscriber'); + +# Start a background session and open a transaction (not committed yet) +my $bsession = $old_node->background_psql('postgres'); +$bsession->query_safe( + q{ +BEGIN; +INSERT INTO tbl VALUES (generate_series(11, 20)) +}); + +$result = $old_node->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots WHERE pg_current_wal_insert_lsn() > confirmed_flush_lsn" +); +is($result, qq(1), + 'check the current WAL position exceeds confirmed_flush_lsn'); + +# Run pg_upgrade --check. In the command the status of each logical slots will +# be checked and then this will be succeeded. +command_ok( + [ + 'pg_upgrade', '--no-sync', + '-d', $old_node->data_dir, + '-D', $new_node->data_dir, + '-b', $bindir, + '-B', $bindir, + '-s', $new_node->host, + '-p', $old_node->port, + '-P', $new_node->port, + $mode, '--include-logical-replication-slots', + '--check' + ], + 'run of pg_upgrade of old node'); +ok( !-d $old_node->data_dir . "/pg_upgrade_output.d", + "pg_upgrade_output.d/ removed after pg_upgrade success"); + +# Cleanup +$bsession->query_safe("ABORT"); +$bsession->quit; +$subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub"); + +# Create a slot on old node, and generate WALs $old_node->safe_psql( 'postgres', qq[ SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, true); - CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a; + INSERT INTO tbl VALUES (generate_series(11, 20)); ]); -my $result = $old_node->safe_psql('postgres', +$result = $old_node->safe_psql('postgres', "SELECT count(*) FROM pg_logical_slot_peek_changes('test_slot', NULL, NULL)" ); -- 2.27.0