From cb8b90ee61abf99e02007d378c38849f63114d4d Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Fri, 13 May 2022 14:50:30 +0800 Subject: [PATCH v54 3/5] Test streaming parallel option in tap test Change all TAP tests using the SUBSCRIPTION "streaming" parameter, so they now test both 'on' and 'parallel' values. --- src/test/subscription/meson.build | 1 + src/test/subscription/t/015_stream.pl | 232 ++++++--- src/test/subscription/t/016_stream_subxact.pl | 143 ++++-- src/test/subscription/t/017_stream_ddl.pl | 221 ++++++--- .../subscription/t/018_stream_subxact_abort.pl | 233 ++++++--- .../subscription/t/019_stream_subxact_ddl_abort.pl | 134 +++-- src/test/subscription/t/022_twophase_cascade.pl | 394 +++++++++------ src/test/subscription/t/023_twophase_stream.pl | 540 ++++++++++++--------- .../subscription/t/032_stream_parallel_conflict.pl | 152 ++++++ 9 files changed, 1405 insertions(+), 645 deletions(-) create mode 100644 src/test/subscription/t/032_stream_parallel_conflict.pl diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build index 85d1dd9..b951d97 100644 --- a/src/test/subscription/meson.build +++ b/src/test/subscription/meson.build @@ -36,6 +36,7 @@ tests += { 't/029_on_error.pl', 't/030_origin.pl', 't/031_column_list.pl', + 't/032_stream_parallel_conflict.pl', 't/100_bugs.pl', ], }, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index cbaa327..38941ff 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -8,6 +8,149 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + # Interleave a pair of transactions, each exceeding the 64kB limit. + my $in = ''; + my $out = ''; + + my $offset = 0; + + my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); + + my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was applied using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $in .= q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + }; + $h->pump_nb; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); + DELETE FROM test_tab WHERE a > 5000; + COMMIT; + }); + + $in .= q{ + COMMIT; + \q + }; + $h->finish; # errors make the next test fail, so ignore them here + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'check extra columns contain local defaults'); + + # Test the streaming in binary mode + $node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(6667|6667|6667), + 'check extra columns contain local defaults'); + + # Change the local values of the extra columns on the subscriber, + # update publisher, and check that subscriber retains the expected + # values. This is to ensure that non-streaming transactions behave + # properly after a streaming transaction. + $node_subscriber->safe_psql('postgres', + "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql('postgres', + "UPDATE test_tab SET b = md5(a::text)"); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" + ); + is($result, qq(6667|6667|6667), + 'check extra columns contain locally changed data'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +180,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,82 +196,25 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Interleave a pair of transactions, each exceeding the 64kB limit. -my $in = ''; -my $out = ''; - -my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); - -my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, - on_error_stop => 0); - -$in .= q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -}; -$h->pump_nb; - -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 9999) s(i); -DELETE FROM test_tab WHERE a > 5000; -COMMIT; -}); - -$in .= q{ -COMMIT; -\q -}; -$h->finish; # errors make the next test fail, so ignore them here - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), 'check extra columns contain local defaults'); +test_streaming($node_publisher, $node_subscriber, $appname, 0); -# Test the streaming in binary mode -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET (binary = on)"); - -# Insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001, 10000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(6667|6667|6667), 'check extra columns contain local defaults'); +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" +); -# Change the local values of the extra columns on the subscriber, -# update publisher, and check that subscriber retains the expected -# values. This is to ensure that non-streaming transactions behave -# properly after a streaming transaction. $node_subscriber->safe_psql('postgres', - "UPDATE test_tab SET c = 'epoch'::timestamptz + 987654321 * interval '1s'" -); -$node_publisher->safe_psql('postgres', - "UPDATE test_tab SET b = md5(a::text)"); + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)"); -$node_publisher->wait_for_catchup($appname); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(extract(epoch from c) = 987654321), count(d = 999) FROM test_tab" -); -is($result, qq(6667|6667|6667), - 'check extra columns contain locally changed data'); +test_streaming($node_publisher, $node_subscriber, $appname, 1); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index bc0a9cd..36a1803 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -8,6 +8,94 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was applied using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Insert, update and delete enough rows to exceed 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 500) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + SAVEPOINT s4; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(1667|1667|1667), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' + ); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +125,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,41 +141,26 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -# Insert, update and delete enough rows to exceed 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series( 3, 500) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501, 1000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001, 1500) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501, 2000) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001, 2500) s(i); -UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; -DELETE FROM test_tab WHERE mod(a,3) = 0; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(1667|1667|1667), - 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/017_stream_ddl.pl b/src/test/subscription/t/017_stream_ddl.pl index 866f151..5e71682 100644 --- a/src/test/subscription/t/017_stream_ddl.pl +++ b/src/test/subscription/t/017_stream_ddl.pl @@ -8,6 +8,138 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # a small (non-streamed) transaction with DDL and DML + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (3, md5(3::text)); + ALTER TABLE test_tab ADD COLUMN c INT; + SAVEPOINT s1; + INSERT INTO test_tab VALUES (4, md5(4::text), -4); + COMMIT; + }); + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was applied using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # large (streamed) transaction with DDL and DML + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); + ALTER TABLE test_tab ADD COLUMN d INT; + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); + COMMIT; + }); + + # a small (non-streamed) transaction with DDL and DML + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); + ALTER TABLE test_tab ADD COLUMN e INT; + SAVEPOINT s1; + INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); + is($result, qq(2002|1999|1002|1), + 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # A large (streamed) transaction with DDL and DML. One of the DDL is performed + # after DML to ensure that we invalidate the schema sent for test_tab so that + # the next transaction has to send the schema again. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i); + ALTER TABLE test_tab ADD COLUMN f INT; + COMMIT; + }); + + # A small transaction that won't get streamed. This is just to ensure that we + # send the schema again to reflect the last column added in the previous test. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab" + ); + is($result, qq(5005|5002|4005|3004|5), + 'check data was copied to subscriber for both streaming and non-streaming transactions' + ); + + # Cleanup the test data + $node_publisher->safe_psql( + 'postgres', q{ + DELETE FROM test_tab WHERE (a > 2); + ALTER TABLE test_tab DROP COLUMN c, DROP COLUMN d, DROP COLUMN e, DROP COLUMN f; + }); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +169,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,77 +185,26 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|0|0), 'check initial data was copied to subscriber'); -# a small (non-streamed) transaction with DDL and DML -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (3, md5(3::text)); -ALTER TABLE test_tab ADD COLUMN c INT; -SAVEPOINT s1; -INSERT INTO test_tab VALUES (4, md5(4::text), -4); -COMMIT; -}); - -# large (streamed) transaction with DDL and DML -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(5, 1000) s(i); -ALTER TABLE test_tab ADD COLUMN d INT; -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001, 2000) s(i); -COMMIT; -}); - -# a small (non-streamed) transaction with DDL and DML -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab VALUES (2001, md5(2001::text), -2001, 2*2001); -ALTER TABLE test_tab ADD COLUMN e INT; -SAVEPOINT s1; -INSERT INTO test_tab VALUES (2002, md5(2002::text), -2002, 2*2002, -3*2002); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d), count(e) FROM test_tab"); -is($result, qq(2002|1999|1002|1), - 'check data was copied to subscriber in streaming mode and extra columns contain local defaults' -); +test_streaming($node_publisher, $node_subscriber, $appname, 0); -# A large (streamed) transaction with DDL and DML. One of the DDL is performed -# after DML to ensure that we invalidate the schema sent for test_tab so that -# the next transaction has to send the schema again. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(2003,5000) s(i); -ALTER TABLE test_tab ADD COLUMN f INT; -COMMIT; -}); - -# A small transaction that won't get streamed. This is just to ensure that we -# send the schema again to reflect the last column added in the previous test. -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i, 4*i FROM generate_series(5001,5005) s(i); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d), count(e), count(f) FROM test_tab"); -is($result, qq(5005|5002|4005|3004|5), - 'check data was copied to subscriber for both streaming and non-streaming transactions' +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/018_stream_subxact_abort.pl b/src/test/subscription/t/018_stream_subxact_abort.pl index 551f16d..b404fed 100644 --- a/src/test/subscription/t/018_stream_subxact_abort.pl +++ b/src/test/subscription/t/018_stream_subxact_abort.pl @@ -8,6 +8,145 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was applied using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # large (streamed) transaction with DDL, DML and ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); + ROLLBACK TO s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); + ROLLBACK TO s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); + SAVEPOINT s4; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i); + SAVEPOINT s5; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(2000|0), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' + ); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # large (streamed) transaction with subscriber receiving out of order + # subtransaction ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i); + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i); + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i); + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i); + RELEASE s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i); + ROLLBACK TO s1; + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(2500|0), + 'check rollback to savepoint was reflected on subscriber'); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # large (streamed) transaction with subscriber receiving rollback + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i); + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i); + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i); + ROLLBACK; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'ABORT'); + + $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(2500|0), 'check rollback was reflected on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE (a > 2)"); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -36,6 +175,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -48,81 +191,25 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# large (streamed) transaction with DDL, DML and ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(501,1000) s(i); -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1001,1500) s(i); -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1501,2000) s(i); -ROLLBACK TO s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2001,2500) s(i); -ROLLBACK TO s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(2501,3000) s(i); -SAVEPOINT s4; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3001,3500) s(i); -SAVEPOINT s5; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3501,4000) s(i); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2000|0), - 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -# large (streamed) transaction with subscriber receiving out of order -# subtransaction ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(4001,4500) s(i); -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(5001,5500) s(i); -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(6001,6500) s(i); -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(7001,7500) s(i); -RELEASE s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8001,8500) s(i); -ROLLBACK TO s1; -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2500|0), - 'check rollback to savepoint was reflected on subscriber'); - -# large (streamed) transaction with subscriber receiving rollback -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(8501,9000) s(i); -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9001,9500) s(i); -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(9501,10000) s(i); -ROLLBACK; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(2500|0), 'check rollback was reflected on subscriber'); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +test_streaming($node_publisher, $node_subscriber, $appname, 1); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl index 4d7da82..c05d882 100644 --- a/src/test/subscription/t/019_stream_subxact_ddl_abort.pl +++ b/src/test/subscription/t/019_stream_subxact_ddl_abort.pl @@ -9,6 +9,91 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was applied using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # large (streamed) transaction with DDL, DML and ROLLBACKs + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); + ALTER TABLE test_tab ADD COLUMN c INT; + SAVEPOINT s1; + INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); + ALTER TABLE test_tab ADD COLUMN d INT; + SAVEPOINT s2; + INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); + ALTER TABLE test_tab ADD COLUMN e INT; + SAVEPOINT s3; + INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); + ALTER TABLE test_tab DROP COLUMN c; + ROLLBACK TO s1; + INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); + COMMIT; + }); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'COMMIT'); + + my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c) FROM test_tab"); + is($result, qq(1000|500), + 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' + ); + + # Cleanup the test data + $node_publisher->safe_psql( + 'postgres', q{ + DELETE FROM test_tab WHERE (a > 2); + ALTER TABLE test_tab DROP COLUMN c; + }); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); @@ -37,6 +122,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" ); @@ -49,35 +138,26 @@ my $result = "SELECT count(*), count(c) FROM test_tab"); is($result, qq(2|0), 'check initial data was copied to subscriber'); -# large (streamed) transaction with DDL, DML and ROLLBACKs -$node_publisher->safe_psql( - 'postgres', q{ -BEGIN; -INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3,500) s(i); -ALTER TABLE test_tab ADD COLUMN c INT; -SAVEPOINT s1; -INSERT INTO test_tab SELECT i, md5(i::text), -i FROM generate_series(501,1000) s(i); -ALTER TABLE test_tab ADD COLUMN d INT; -SAVEPOINT s2; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i FROM generate_series(1001,1500) s(i); -ALTER TABLE test_tab ADD COLUMN e INT; -SAVEPOINT s3; -INSERT INTO test_tab SELECT i, md5(i::text), -i, 2*i, -3*i FROM generate_series(1501,2000) s(i); -ALTER TABLE test_tab DROP COLUMN c; -ROLLBACK TO s1; -INSERT INTO test_tab SELECT i, md5(i::text), i FROM generate_series(501,1000) s(i); -COMMIT; -}); - -$node_publisher->wait_for_catchup($appname); - -$result = - $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c) FROM test_tab"); -is($result, qq(1000|500), - 'check rollback to savepoint was reflected on subscriber and extra columns contain local defaults' +test_streaming($node_publisher, $node_subscriber, $appname, 0); + +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); + +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; + +test_streaming($node_publisher, $node_subscriber, $appname, 1); + $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 7a797f3..d76fd83 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -11,6 +11,239 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $streaming_mode, $type) = @_; + + if ($streaming_mode eq 'parallel') + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" parameter +# so the same code can be run both for the streaming=on and streaming=parallel +# cases. +sub test_streaming +{ + my ($node_A, $node_B, $node_C, $appname_B, $appname_C, $streaming_mode) = + @_; + + my $offset_B = 0; + my $offset_C = 0; + + my $oldpid_B = $node_A->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';"); + my $oldpid_C = $node_B->safe_psql( + 'postgres', " + SELECT pid FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';"); + + # Setup logical replication streaming mode + + $node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_B + SET (streaming = $streaming_mode);"); + $node_C->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_C + SET (streaming = $streaming_mode)"); + + # Wait for subscribers to finish initialization + + $node_A->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_B FROM pg_stat_replication + WHERE application_name = '$appname_B' AND state = 'streaming';" + ) or die "Timed out while waiting for apply to restart"; + $node_B->poll_query_until( + 'postgres', " + SELECT pid != $oldpid_C FROM pg_stat_replication + WHERE application_name = '$appname_C' AND state = 'streaming';" + ) or die "Timed out while waiting for apply to restart"; + + ############################### + # Test 2PC PREPARE / COMMIT PREPARED. + # 1. Data is streamed as a 2PC transaction. + # 2. Then do commit prepared. + # + # Expect all data is replicated on subscriber(s) after the commit. + ############################### + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was prepared using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($streaming_mode eq 'parallel') + { + $node_B->append_conf('postgresql.conf', "log_min_messages = debug1"); + $node_B->reload; + + $node_C->append_conf('postgresql.conf', "log_min_messages = debug1"); + $node_C->reload; + } + + # Check the subscriber log from now on. + $offset_B = -s $node_B->logfile; + $offset_C = -s $node_C->logfile; + + # Insert, update and delete enough rows to exceed the 64kB limit. + # Then 2PC PREPARE + $node_A->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + check_parallel_log($node_B, $offset_B, $streaming_mode, 'PREPARE'); + check_parallel_log($node_C, $offset_C, $streaming_mode, 'PREPARE'); + + # check the transaction state is prepared on subscriber(s) + my $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber C'); + + # 2PC COMMIT + $node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + # check that transaction was committed on subscriber(s) + $result = $node_B->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' + ); + $result = $node_C->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' + ); + + # check the transaction state is ended on subscriber(s) + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber C'); + + ############################### + # Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. + # 0. Cleanup from previous test leaving only 2 rows. + # 1. Insert one more row. + # 2. Record a SAVEPOINT. + # 3. Data is streamed using 2PC. + # 4. Do rollback to SAVEPOINT prior to the streamed inserts. + # 5. Then COMMIT PREPARED. + # + # Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). + ############################### + + # First, delete the data except for 2 rows (delete will be replicated) + $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset_B = -s $node_B->logfile; + $offset_C = -s $node_C->logfile; + + # 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT + $node_A->safe_psql( + 'postgres', " + BEGIN; + INSERT INTO test_tab VALUES (9999, 'foobar'); + SAVEPOINT sp_inner; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + ROLLBACK TO SAVEPOINT sp_inner; + PREPARE TRANSACTION 'outer'; + "); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + check_parallel_log($node_B, $offset_B, $streaming_mode, 'PREPARE'); + check_parallel_log($node_C, $offset_C, $streaming_mode, 'PREPARE'); + + # check the transaction state prepared on subscriber(s) + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber C'); + + # 2PC COMMIT + $node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); + + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + # check the transaction state is ended on subscriber + $result = + $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is ended on subscriber B'); + $result = + $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is ended on subscriber C'); + + # check inserts are visible at subscriber(s). + # All the streamed data (prior to the SAVEPOINT) should be rolled back. + # (9999, 'foobar') should be committed. + $result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); + is($result, qq(1), 'Rows committed are present on subscriber B'); + $result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); + is($result, qq(3), 'Rows committed are present on subscriber B'); + $result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM test_tab where b = 'foobar';"); + is($result, qq(1), 'Rows committed are present on subscriber C'); + $result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); + is($result, qq(3), 'Rows committed are present on subscriber C'); + + # Cleanup the test data + $node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); + $node_A->wait_for_catchup($appname_B); + $node_B->wait_for_catchup($appname_C); + + # Reset the log verbosity. + if ($streaming_mode eq 'parallel') + { + $node_B->append_conf('postgresql.conf', "log_min_messages = warning"); + $node_B->reload; + + $node_C->append_conf('postgresql.conf', "log_min_messages = warning"); + $node_C->reload; + } +} + ############################### # Setup a cascade of pub/sub nodes. # node_A -> node_B -> node_C @@ -260,160 +493,15 @@ is($result, qq(21), 'Rows committed are present on subscriber C'); # 2PC + STREAMING TESTS # --------------------- -my $oldpid_B = $node_A->safe_psql( - 'postgres', " - SELECT pid FROM pg_stat_replication - WHERE application_name = '$appname_B' AND state = 'streaming';"); -my $oldpid_C = $node_B->safe_psql( - 'postgres', " - SELECT pid FROM pg_stat_replication - WHERE application_name = '$appname_C' AND state = 'streaming';"); - -# Setup logical replication (streaming = on) - -$node_B->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_B - SET (streaming = on);"); -$node_C->safe_psql( - 'postgres', " - ALTER SUBSCRIPTION tap_sub_C - SET (streaming = on)"); - -# Wait for subscribers to finish initialization - -$node_A->poll_query_until( - 'postgres', " - SELECT pid != $oldpid_B FROM pg_stat_replication - WHERE application_name = '$appname_B' AND state = 'streaming';" -) or die "Timed out while waiting for apply to restart"; -$node_B->poll_query_until( - 'postgres', " - SELECT pid != $oldpid_C FROM pg_stat_replication - WHERE application_name = '$appname_C' AND state = 'streaming';" -) or die "Timed out while waiting for apply to restart"; - -############################### -# Test 2PC PREPARE / COMMIT PREPARED. -# 1. Data is streamed as a 2PC transaction. -# 2. Then do commit prepared. -# -# Expect all data is replicated on subscriber(s) after the commit. -############################### - -# Insert, update and delete enough rows to exceed the 64kB limit. -# Then 2PC PREPARE -$node_A->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); +################################ +# Test using streaming mode 'on' +################################ +test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'on'); -# check the transaction state is prepared on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber C'); - -# 2PC COMMIT -$node_A->safe_psql('postgres', "COMMIT PREPARED 'test_prepared_tab';"); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check that transaction was committed on subscriber(s) -$result = $node_B->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber B, and extra columns have local defaults' -); -$result = $node_C->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber C, and extra columns have local defaults' -); - -# check the transaction state is ended on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber C'); - -############################### -# Test 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT. -# 0. Cleanup from previous test leaving only 2 rows. -# 1. Insert one more row. -# 2. Record a SAVEPOINT. -# 3. Data is streamed using 2PC. -# 4. Do rollback to SAVEPOINT prior to the streamed inserts. -# 5. Then COMMIT PREPARED. -# -# Expect data after the SAVEPOINT is aborted leaving only 3 rows (= 2 original + 1 from step 1). -############################### - -# First, delete the data except for 2 rows (delete will be replicated) -$node_A->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# 2PC PREPARE with a nested ROLLBACK TO SAVEPOINT -$node_A->safe_psql( - 'postgres', " - BEGIN; - INSERT INTO test_tab VALUES (9999, 'foobar'); - SAVEPOINT sp_inner; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - ROLLBACK TO SAVEPOINT sp_inner; - PREPARE TRANSACTION 'outer'; - "); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check the transaction state prepared on subscriber(s) -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber C'); - -# 2PC COMMIT -$node_A->safe_psql('postgres', "COMMIT PREPARED 'outer';"); - -$node_A->wait_for_catchup($appname_B); -$node_B->wait_for_catchup($appname_C); - -# check the transaction state is ended on subscriber -$result = - $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is ended on subscriber B'); -$result = - $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is ended on subscriber C'); - -# check inserts are visible at subscriber(s). -# All the streamed data (prior to the SAVEPOINT) should be rolled back. -# (9999, 'foobar') should be committed. -$result = $node_B->safe_psql('postgres', - "SELECT count(*) FROM test_tab where b = 'foobar';"); -is($result, qq(1), 'Rows committed are present on subscriber B'); -$result = $node_B->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); -is($result, qq(3), 'Rows committed are present on subscriber B'); -$result = $node_C->safe_psql('postgres', - "SELECT count(*) FROM test_tab where b = 'foobar';"); -is($result, qq(1), 'Rows committed are present on subscriber C'); -$result = $node_C->safe_psql('postgres', "SELECT count(*) FROM test_tab;"); -is($result, qq(3), 'Rows committed are present on subscriber C'); +###################################### +# Test using streaming mode 'parallel' +###################################### +test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'parallel'); ############################### # check all the cleanup diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index 9b45410..c966ac9 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -8,6 +8,308 @@ use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +# Check the log that the streamed transaction was completed successfully +# reported by parallel apply worker. +sub check_parallel_log +{ + my ($node_subscriber, $offset, $is_parallel, $type) = @_; + + if ($is_parallel) + { + $node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? finished processing the STREAM $type command/, + $offset); + } +} + +# Encapsulate all the common test steps which are related to "streaming" +# parameter so the same code can be run both for the streaming=on and +# streaming=parallel cases. +sub test_streaming +{ + my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + + my $offset = 0; + + ############################### + # Test 2PC PREPARE / COMMIT PREPARED. + # 1. Data is streamed as a 2PC transaction. + # 2. Then do commit prepared. + # + # Expect all data is replicated on subscriber side after the commit. + ############################### + + # If "streaming" parameter is specified as "parallel", we need to check + # that streamed transaction was prepared using a parallel apply worker. + # We have to look for the DEBUG1 log messages about that, so bump up the + # log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = debug1"); + $node_subscriber->reload; + } + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # check that 2PC gets replicated to subscriber + # Insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + my $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + ############################### + # Test 2PC PREPARE / ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. Do rollback prepared. + # + # Expect data rolls back leaving only the original 2 rows. + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(2|2|2), + 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Check that 2PC COMMIT PREPARED is decoded properly on crash restart. + # 1. insert, update and delete enough rows to exceed the 64kB limit. + # 2. Then server crashes before the 2PC transaction is committed. + # 3. After servers are restarted the pending transaction is committed. + # + # Expect all data is replicated on subscriber side after the commit. + # Note: both publisher and subscriber do crash/restart. + ############################### + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_subscriber->stop('immediate'); + $node_publisher->stop('immediate'); + + $node_publisher->start; + $node_subscriber->start; + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # commit post the restart + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + $node_publisher->wait_for_catchup($appname); + + # check inserts are visible + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3334|3334|3334), + 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' + ); + + ############################### + # Do INSERT after the PREPARE but before ROLLBACK PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a ROLLBACK PREPARED. + # + # Expect the 2PC data rolls back leaving only 3 rows on the subscriber + # (the original 2 + inserted 1). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets aborted + $node_publisher->safe_psql('postgres', + "ROLLBACK PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is aborted on subscriber, + # but the extra INSERT outside of the 2PC still was replicated + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3|3|3), + 'check the outside insert was copied to subscriber'); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is aborted on subscriber'); + + ############################### + # Do INSERT after the PREPARE but before COMMIT PREPARED. + # 1. Table is deleted back to 2 rows which are replicated on subscriber. + # 2. Data is streamed using 2PC. + # 3. A single row INSERT is done which is after the PREPARE. + # 4. Then do a COMMIT PREPARED. + # + # Expect 2PC data + the extra row are on the subscriber + # (the 3334 + inserted 1 = 3335). + ############################### + + # First, delete the data except for 2 rows (will be replicated) + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + + # Check the subscriber log from now on. + $offset = -s $node_subscriber->logfile; + + # Then insert, update and delete enough rows to exceed the 64kB limit. + $node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); + UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; + DELETE FROM test_tab WHERE mod(a,3) = 0; + PREPARE TRANSACTION 'test_prepared_tab';}); + + $node_publisher->wait_for_catchup($appname); + + check_parallel_log($node_subscriber, $offset, $is_parallel, 'PREPARE'); + + # check that transaction is in prepared state on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(1), 'transaction is prepared on subscriber'); + + # Insert a different record (now we are outside of the 2PC transaction) + # Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key + $node_publisher->safe_psql('postgres', + "INSERT INTO test_tab VALUES (99999, 'foobar')"); + + # 2PC transaction gets committed + $node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'test_prepared_tab';"); + + $node_publisher->wait_for_catchup($appname); + + # check that transaction is committed on subscriber + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*), count(c), count(d = 999) FROM test_tab"); + is($result, qq(3335|3335|3335), + 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' + ); + + $result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); + is($result, qq(0), 'transaction is committed on subscriber'); + + # Cleanup the test data + $node_publisher->safe_psql('postgres', + "DELETE FROM test_tab WHERE a > 2;"); + $node_publisher->wait_for_catchup($appname); + + # Reset the log verbosity. + if ($is_parallel) + { + $node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); + $node_subscriber->reload; + } +} + ############################### # Setup ############################### @@ -48,6 +350,10 @@ $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); my $appname = 'tap_sub'; + +################################ +# Test using streaming mode 'on' +################################ $node_subscriber->safe_psql( 'postgres', " CREATE SUBSCRIPTION tap_sub @@ -64,236 +370,30 @@ my $twophase_query = $node_subscriber->poll_query_until('postgres', $twophase_query) or die "Timed out while waiting for subscriber to enable twophase"; -############################### # Check initial data was copied to subscriber -############################### my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -############################### -# Test 2PC PREPARE / COMMIT PREPARED. -# 1. Data is streamed as a 2PC transaction. -# 2. Then do commit prepared. -# -# Expect all data is replicated on subscriber side after the commit. -############################### +test_streaming($node_publisher, $node_subscriber, $appname, 0); -# check that 2PC gets replicated to subscriber -# Insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' +###################################### +# Test using streaming mode 'parallel' +###################################### +my $oldpid = $node_publisher->safe_psql('postgres', + "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); - -############################### -# Test 2PC PREPARE / ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. Do rollback prepared. -# -# Expect data rolls back leaving only the original 2 rows. -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(2|2|2), - 'Rows inserted by 2PC are rolled back, leaving only the original 2 rows'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Check that 2PC COMMIT PREPARED is decoded properly on crash restart. -# 1. insert, update and delete enough rows to exceed the 64kB limit. -# 2. Then server crashes before the 2PC transaction is committed. -# 3. After servers are restarted the pending transaction is committed. -# -# Expect all data is replicated on subscriber side after the commit. -# Note: both publisher and subscriber do crash/restart. -############################### - -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_subscriber->stop('immediate'); -$node_publisher->stop('immediate'); - -$node_publisher->start; -$node_subscriber->start; - -# commit post the restart -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); -$node_publisher->wait_for_catchup($appname); - -# check inserts are visible -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3334|3334|3334), - 'Rows inserted by 2PC have committed on subscriber, and extra columns contain local defaults' -); - -############################### -# Do INSERT after the PREPARE but before ROLLBACK PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a ROLLBACK PREPARED. -# -# Expect the 2PC data rolls back leaving only 3 rows on the subscriber -# (the original 2 + inserted 1). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separate primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); - -# 2PC transaction gets aborted -$node_publisher->safe_psql('postgres', - "ROLLBACK PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is aborted on subscriber, -# but the extra INSERT outside of the 2PC still was replicated -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3|3|3), 'check the outside insert was copied to subscriber'); - -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is aborted on subscriber'); - -############################### -# Do INSERT after the PREPARE but before COMMIT PREPARED. -# 1. Table is deleted back to 2 rows which are replicated on subscriber. -# 2. Data is streamed using 2PC. -# 3. A single row INSERT is done which is after the PREPARE. -# 4. Then do a COMMIT PREPARED. -# -# Expect 2PC data + the extra row are on the subscriber -# (the 3334 + inserted 1 = 3335). -############################### - -# First, delete the data except for 2 rows (will be replicated) -$node_publisher->safe_psql('postgres', "DELETE FROM test_tab WHERE a > 2;"); - -# Then insert, update and delete enough rows to exceed the 64kB limit. -$node_publisher->safe_psql( - 'postgres', q{ - BEGIN; - INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); - UPDATE test_tab SET b = md5(b) WHERE mod(a,2) = 0; - DELETE FROM test_tab WHERE mod(a,3) = 0; - PREPARE TRANSACTION 'test_prepared_tab';}); - -$node_publisher->wait_for_catchup($appname); - -# check that transaction is in prepared state on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(1), 'transaction is prepared on subscriber'); - -# Insert a different record (now we are outside of the 2PC transaction) -# Note: the 2PC transaction still holds row locks so make sure this insert is for a separare primary key -$node_publisher->safe_psql('postgres', - "INSERT INTO test_tab VALUES (99999, 'foobar')"); - -# 2PC transaction gets committed -$node_publisher->safe_psql('postgres', - "COMMIT PREPARED 'test_prepared_tab';"); - -$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql('postgres', + "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); -# check that transaction is committed on subscriber -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*), count(c), count(d = 999) FROM test_tab"); -is($result, qq(3335|3335|3335), - 'Rows inserted by 2PC (as well as outside insert) have committed on subscriber, and extra columns contain local defaults' -); +$node_publisher->poll_query_until('postgres', + "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" + ) + or die + "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -$result = $node_subscriber->safe_psql('postgres', - "SELECT count(*) FROM pg_prepared_xacts;"); -is($result, qq(0), 'transaction is committed on subscriber'); +test_streaming($node_publisher, $node_subscriber, $appname, 1); ############################### # check all the cleanup diff --git a/src/test/subscription/t/032_stream_parallel_conflict.pl b/src/test/subscription/t/032_stream_parallel_conflict.pl new file mode 100644 index 0000000..10efd2d --- /dev/null +++ b/src/test/subscription/t/032_stream_parallel_conflict.pl @@ -0,0 +1,152 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test for deadlock in streaming mode "parallel" in logical replication. + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; + +# Check if any streaming chunks are applied using the parallel apply worker. We +# have to look for the DEBUG1 log messages about that, so bump up the log +# verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); + +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', "CREATE TABLE test_tab (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab(a)"); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub + WITH (streaming = parallel, copy_data = false)"); + +$node_publisher->wait_for_catchup($appname); + +# Interleave a pair of transactions, each exceeding the 64kB limit. +my $in = ''; +my $out = ''; + +my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); + +my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + +# ============================================================================ +# Confirm if a deadlock between the leader apply worker and the parallel apply +# worker can be detected. +# ============================================================================ + +$in .= q{ +BEGIN; +INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +# Ensure that the parallel apply worker executes the insert command before the +# leader worker. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab values(1)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. +$node_publisher->wait_for_catchup($appname); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab on test_tab(a)"); + +# ============================================================================ +# Confirm if a deadlock between two parallel apply workers can be detected. +# ============================================================================ + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +# Ensure that the first parallel apply worker executes the insert command +# before the second one. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? applied [0-9]+ changes in the streaming chunk/, + $offset); + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? deadlock detected/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab"); + +# Wait for this streaming transaction to be applied in the apply worker. +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(10000), 'data replicated to subscriber after dropping index'); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); -- 2.7.2.windows.1