From 4878b0e8b6294be817a6662af4ddb95eee80d046 Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Mon, 6 Jan 2025 15:56:23 +0800 Subject: [PATCH v28 6/7] Add a tap test to verify the management of the new replication slot --- src/test/subscription/meson.build | 1 + .../t/035_retain_conflict_info.pl | 163 ++++++++++++++++++ 2 files changed, 164 insertions(+) create mode 100644 src/test/subscription/t/035_retain_conflict_info.pl diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index d40b49714f6..8bf4a83ea67 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -41,6 +41,7 @@ tests += { 't/032_subscribe_use_index.pl', 't/033_run_as_table_owner.pl', 't/034_temporal.pl', + 't/035_retain_conflict_info.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/035_retain_conflict_info.pl b/src/test/subscription/t/035_retain_conflict_info.pl new file mode 100644 index 00000000000..75539b2cba9 --- /dev/null +++ b/src/test/subscription/t/035_retain_conflict_info.pl @@ -0,0 +1,163 @@ + +# Copyright (c) 2025, PostgreSQL Global Development Group + +# Test the management of the replication slot 'pg_conflict_detection'. +use strict; +use warnings FATAL => 'all'; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $subname_AB = 'tap_sub_a_b'; +my $subname_BA = 'tap_sub_b_a'; + +############################################################################### +# Setup a bidirectional logical replication between node_A & node_B +############################################################################### + +# Initialize nodes. + +# node_A. Increase the log_min_messages setting to DEBUG2 to debug test +# failures. Disable autovacuum to avoid generating xid that could affect the +# replication slot's xmin value. +my $node_A = PostgreSQL::Test::Cluster->new('node_A'); +$node_A->init(allows_streaming => 'logical'); +$node_A->append_conf( + 'postgresql.conf', + qq{autovacuum = off + log_min_messages = 'debug2'}); +$node_A->start; + +# node_B +my $node_B = PostgreSQL::Test::Cluster->new('node_B'); +$node_B->init(allows_streaming => 'logical'); +$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on"); +$node_B->start; + +# Create table on node_A +$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +# Create the same table on node_B +$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)"); + +# Setup logical replication +# node_A (pub) -> node_B (sub) +my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; +$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); +$node_B->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_BA + CONNECTION '$node_A_connstr application_name=$subname_BA' + PUBLICATION tap_pub_A + WITH (origin = none, retain_conflict_info = true)"); + +# node_B (pub) -> node_A (sub) +my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; +$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); +$node_A->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $subname_AB + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = off, retain_conflict_info = true)"); + +# Wait for initial table sync to finish +$node_A->wait_for_subscription_sync($node_B, $subname_AB); +$node_B->wait_for_subscription_sync($node_A, $subname_BA); + +is(1, 1, 'Bidirectional replication setup is complete'); + +# Confirm that the additional replication slot is created on both nodes and the +# xmin value is valid. + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node A"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is valid on Node B"); + +my $result = $node_A->safe_psql('postgres', + "SELECT retain_conflict_info FROM pg_stat_subscription;"); +is($result, qq(t), 'worker on node A retains conflict information'); +$result = $node_B->safe_psql('postgres', + "SELECT retain_conflict_info FROM pg_stat_subscription;"); +is($result, qq(t), 'worker on node B retains conflict information'); + +############################################################################### +# Check that dead tuples on node A cannot be cleaned by VACUUM until the +# concurrent transactions on Node B have been applied and flushed on Node A. +############################################################################### + +# Insert a record +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);"); +$node_A->wait_for_catchup($subname_BA); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab;"); +is($result, qq(1|1 +2|2), 'check replicated insert on node B'); + +# Disable the logical replication from node B to node A +$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE"); + +my $log_location = -s $node_B->logfile; + +$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;"); +$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;"); + +$node_A->wait_for_catchup($subname_BA); + +my ($cmdret, $stdout, $stderr) = $node_A->psql( + 'postgres', qq(VACUUM (verbose) public.tab;) +); + +ok( $stderr =~ + qr/1 are dead but not yet removable/, + 'the deleted column is non-removable'); + +# Remember the next transaction ID to be assigned +my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;"); + +$node_A->safe_psql( + 'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;"); +$node_B->wait_for_catchup($subname_AB); + +# Account for the transaction ID increment caused by enabling the subscription +$next_xid++; + +# Confirm that the xmin value is updated +ok( $node_A->poll_query_until( + 'postgres', + "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the xmin value of slot 'pg_conflict_detection' is updated on Node A"); + +############################################################################### +# Check that the replication slot pg_conflict_detection is dropped after +# removing all the subscriptions. +############################################################################### + +$node_B->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_BA"); + +ok( $node_B->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node B"); + +$node_A->safe_psql( + 'postgres', "DROP SUBSCRIPTION $subname_AB"); + +ok( $node_A->poll_query_until( + 'postgres', + "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'" + ), + "the slot 'pg_conflict_detection' has been dropped on Node A"); + +done_testing(); -- 2.30.0.windows.2