From 73444c7e6979e6493043f011600a3c49bcea2aa6 Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Thu, 19 Nov 2020 21:53:29 -0300 Subject: [PATCH 6/6] Overhaul tests --- src/test/subscription/t/020_messages.pl | 160 +++++++++++------------- 1 file changed, 75 insertions(+), 85 deletions(-) diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index 8e59e324e3..d9123ed3ef 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -1,158 +1,148 @@ -# Tests that logical decoding messages are emitted and that -# they do not break subscribers +# Tests that logical decoding messages use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 7; +use Test::More tests => 5; +# Create publisher node my $node_publisher = get_new_node('publisher'); $node_publisher->init(allows_streaming => 'logical'); $node_publisher->start; +# Create subscriber node my $node_subscriber = get_new_node('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; +# Create some preexisting content on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup structure on subscriber +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab_test (a int primary key)"); + +# Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; -# -$node_publisher->safe_psql('postgres', - "CREATE TABLE tab (a int PRIMARY KEY)"); +$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test"); + $node_subscriber->safe_psql('postgres', - "CREATE TABLE tab (a int PRIMARY KEY)"); -$node_publisher->safe_psql('postgres', - "CREATE PUBLICATION pub FOR TABLE tab"); -$node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION sub CONNECTION '$publisher_connstr' PUBLICATION pub" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub" ); -# ensure a transactional logical decoding message shows up on the slot -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +# Ensure a transactional logical decoding message shows up on the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); $node_publisher->safe_psql('postgres', - "select pg_logical_emit_message(true, 'a prefix', 'a transactional message')" + "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')" ); -my $slot_codes_with_message = $node_publisher->safe_psql( +my $result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') )); # 66 77 67 == B M C == BEGIN MESSAGE COMMIT -is($slot_codes_with_message, "66\n77\n67", +is($result, qq(66 +77 +67), 'messages on slot are B M C with message option'); -my $transactional_message_flags = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 1) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape') + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') - offset 1 limit 1 + OFFSET 1 LIMIT 1 )); -is($transactional_message_flags, "1", - "transactional message flags are set to 1"); +is($result, qq(1|pgoutput), + "flag transactional is set to 1 and prefix is pgoutput"); -my $slot_codes_without_message = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub') + 'publication_names', 'tap_pub') )); # 66 67 == B C == BEGIN COMMIT -is($slot_codes_without_message, "66\n67", - 'messages on slot are B C without message option'); +is($result, qq(66 +67), + 'option messages defaults to false so message (M) is not available on slot'); -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); # ensure a non-transactional logical decoding message shows up on the slot -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); -$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (3)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)"); my $message_lsn = $node_publisher->safe_psql('postgres', - "select pg_logical_emit_message(false, 'prefix', 'nontransactional')"); + "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')"); -$node_publisher->safe_psql('postgres', "INSERT INTO tab VALUES (4)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)"); -my $slot_message_code = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') - where lsn = '$message_lsn' and xid = 0 + WHERE lsn = '$message_lsn' AND xid = 0 )); -is($slot_message_code, "77", "non-transactional message on slot is M"); +is($result, qq(77|0), 'non-transactional message on slot is M'); -my $nontransactional_message_flags = $node_publisher->safe_psql( - 'postgres', qq( - select get_byte(data, 1) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, - 'proto_version', '1', - 'publication_names', 'pub', - 'messages', 'true') - offset 1 limit 1 -)); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); +$node_publisher->wait_for_catchup('tap_sub'); -is($nontransactional_message_flags, - "0", "non-transactional message flags are set to 0"); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub DISABLE"); -$node_subscriber->safe_psql('postgres', "checkpoint;"); +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); # wait for the replication connection to drop from the publisher $node_publisher->poll_query_until('postgres', - 'SELECT count(*) from pg_catalog.pg_stat_replication', 0); + 'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0); -# ensure a non-transactional logical decoding message shows up on the slot -# when it is emitted within an aborted transaction. the message won't emit -# until something advances the LSN, which we intentionally do here with a -# checkpoint. +# Ensure a non-transactional logical decoding message shows up on the slot when +# it is emitted within an aborted transaction. The message won't emit until +# something advances the LSN, hence, we intentionally forces the server to +# switch to a new WAL file. $node_publisher->safe_psql( 'postgres', qq( BEGIN; - SELECT pg_logical_emit_message(false, 'prefix', - 'nontransactional aborted 1'); - INSERT INTO tab VALUES (5); - SELECT pg_logical_emit_message(false, 'prefix', - 'nontransactional aborted 2'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 1'); + INSERT INTO tab_test VALUES (3); + SELECT pg_logical_emit_message(true, 'pgoutput', + 'a transactional message is not available if the transaction is aborted'); + SELECT pg_logical_emit_message(false, 'pgoutput', + 'a non-transactional message is available even if the transaction is aborted 2'); ROLLBACK; - CHECKPOINT; + SELECT pg_switch_wal(); )); -my $aborted_txn_message_codes = $node_publisher->safe_psql( +$result = $node_publisher->safe_psql( 'postgres', qq( - select get_byte(data, 0) - from pg_logical_slot_peek_binary_changes('sub', NULL, NULL, + SELECT get_byte(data, 0), get_byte(data, 1) + FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL, 'proto_version', '1', - 'publication_names', 'pub', + 'publication_names', 'tap_pub', 'messages', 'true') )); -is($aborted_txn_message_codes, "77\n77", - "non-transactional message on slot from aborted transaction is M"); - -$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION sub ENABLE"); -$node_publisher->wait_for_catchup('sub'); - -my $result = - $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab"); -is($result, qq(2), 'rows move'); +is($result, qq(77|0 +77|0), + 'non-transactional message on slot from aborted transaction is M'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.20.1