From 7f6995a26b15ffd5220536cee023ad7472d7e6cb Mon Sep 17 00:00:00 2001 From: Amit Khandekar Date: Mon, 4 Mar 2019 12:22:50 +0530 Subject: [PATCH 3/3] New TAP test for logical decoding on standby. new file: recovery/t/016_logical_decoding_on_replica.pl Tests originally written by Craig Ringer, with some WIP changes from Amit Khandekar. --- .../recovery/t/016_logical_decoding_on_replica.pl | 358 +++++++++++++++++++++ 1 file changed, 358 insertions(+) create mode 100644 src/test/recovery/t/016_logical_decoding_on_replica.pl diff --git a/src/test/recovery/t/016_logical_decoding_on_replica.pl b/src/test/recovery/t/016_logical_decoding_on_replica.pl new file mode 100644 index 0000000..8cc029b --- /dev/null +++ b/src/test/recovery/t/016_logical_decoding_on_replica.pl @@ -0,0 +1,358 @@ +# Demonstrate that logical can follow timeline switches. +# +# Test logical decoding on a standby. +# +use strict; +use warnings; +use 5.8.0; + +use PostgresNode; +use TestLib; +use Test::More tests => 52; +use RecursiveCopy; +use File::Copy; + +my ($stdin, $stdout, $stderr, $ret, $handle, $return); +my $backup_name; + +# Initialize master node +my $node_master = get_new_node('master'); +$node_master->init(allows_streaming => 1, has_archiving => 1); +$node_master->append_conf('postgresql.conf', q{ +wal_level = 'logical' +max_replication_slots = 4 +max_wal_senders = 4 +log_min_messages = 'debug2' +log_error_verbosity = verbose +# send status rapidly so we promptly advance xmin on master +wal_receiver_status_interval = 1 +# very promptly terminate conflicting backends +max_standby_streaming_delay = '2s' +}); +$node_master->dump_info; +$node_master->start; + +$node_master->psql('postgres', q[CREATE DATABASE testdb]); + +$node_master->safe_psql('testdb', q[SELECT * FROM pg_create_physical_replication_slot('decoding_standby');]); +$backup_name = 'b1'; +my $backup_dir = $node_master->backup_dir . "/" . $backup_name; +TestLib::system_or_bail('pg_basebackup', '-D', $backup_dir, '-d', $node_master->connstr('testdb'), '--slot=decoding_standby'); + +sub print_phys_xmin +{ + my $slot = $node_master->slot('decoding_standby'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +my ($xmin, $catalog_xmin) = print_phys_xmin(); +# After slot creation, xmins must be null +is($xmin, '', "xmin null"); +is($catalog_xmin, '', "catalog_xmin null"); + +my $node_replica = get_new_node('replica'); +$node_replica->init_from_backup( + $node_master, $backup_name, + has_streaming => 1, + has_restoring => 1); +$node_replica->append_conf('postgresql.conf', + q[primary_slot_name = 'decoding_standby']); + +$node_replica->start; +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# with hot_standby_feedback off, xmin and catalog_xmin must still be null +($xmin, $catalog_xmin) = print_phys_xmin(); +is($xmin, '', "xmin null after replica join"); +is($catalog_xmin, '', "catalog_xmin null after replica join"); + +$node_replica->append_conf('postgresql.conf',q[ +hot_standby_feedback = on +]); +$node_replica->restart; +sleep(2); # ensure walreceiver feedback sent + +# If no slot on standby exists to hold down catalog_xmin it must follow xmin, +# (which is nextXid when no xacts are running on the standby). +($xmin, $catalog_xmin) = print_phys_xmin(); +ok($xmin, "xmin not null"); +is($xmin, $catalog_xmin, "xmin and catalog_xmin equal"); + +# We need catalog_xmin advance to take effect on the master and be replayed +# on standby. +$node_master->safe_psql('postgres', 'CHECKPOINT'); +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +# Create new slots on the replica, ignoring the ones on the master completely. +# +# This must succeed since we know we have a catalog_xmin reservation. We +# might've already sent hot standby feedback to advance our physical slot's +# catalog_xmin but not received the corresponding xlog for the catalog xmin +# advance, in which case we'll create a slot that isn't usable. The calling +# application can prevent this by creating a temporary slot on the master to +# lock in its catalog_xmin. For a truly race-free solution we'd need +# master-to-standby hot_standby_feedback replies. +# +# In this case it won't race because there's no concurrent activity on the +# master. +# +is($node_replica->psql('testdb', qq[SELECT * FROM pg_create_logical_replication_slot('standby_logical', 'test_decoding')]), + 0, 'logical slot creation on standby succeeded') + or BAIL_OUT('cannot continue if slot creation fails, see logs'); + +sub print_logical_xmin +{ + my $slot = $node_replica->slot('standby_logical'); + return ($slot->{'xmin'}, $slot->{'catalog_xmin'}); +} + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +($xmin, $catalog_xmin) = print_logical_xmin(); +is($xmin, '', "logical xmin null"); +isnt($catalog_xmin, '', "logical catalog_xmin not null"); + +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +$node_master->safe_psql('testdb', q[INSERT INTO test_table(blah) values ('itworks')]); +$node_master->safe_psql('testdb', 'DROP TABLE test_table'); +$node_master->safe_psql('testdb', 'VACUUM'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +($xmin, $catalog_xmin) = print_phys_xmin(); +isnt($xmin, '', "physical xmin not null"); +isnt($catalog_xmin, '', "physical catalog_xmin not null"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +# Should show the inserts even when the table is dropped on master +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($stderr, '', 'stderr is empty'); +is($ret, 0, 'replay from slot succeeded') + or BAIL_OUT('cannot continue if slot replay fails'); +is($stdout, q{BEGIN +table public.test_table: INSERT: id[integer]:1 blah[text]:'itworks' +COMMIT}, 'replay results match'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($physical_xmin, $physical_catalog_xmin) = print_phys_xmin(); +isnt($physical_xmin, '', "physical xmin not null"); +isnt($physical_catalog_xmin, '', "physical catalog_xmin not null"); + +my ($logical_xmin, $logical_catalog_xmin) = print_logical_xmin(); +is($logical_xmin, '', "logical xmin null"); +isnt($logical_catalog_xmin, '', "logical catalog_xmin not null"); + +# Ok, do a pile of tx's and make sure xmin advances. +# Ideally we'd just hold catalog_xmin, but since hs_feedback currently uses the slot, +# we hold down xmin. +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_1();]); +$node_master->safe_psql('testdb', 'CREATE TABLE test_table(id serial primary key, blah text)'); +for my $i (0 .. 2000) +{ + $node_master->safe_psql('testdb', qq[INSERT INTO test_table(blah) VALUES ('entry $i')]); +} +$node_master->safe_psql('testdb', qq[CREATE TABLE catalog_increase_2();]); +$node_master->safe_psql('testdb', 'VACUUM'); + +my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +cmp_ok($new_logical_catalog_xmin, "==", $logical_catalog_xmin, "logical slot catalog_xmin hasn't advanced before get_changes"); + +($ret, $stdout, $stderr) = $node_replica->psql('testdb', qq[SELECT data FROM pg_logical_slot_get_changes('standby_logical', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-timestamp', '0')]); +is($ret, 0, 'replay of big series succeeded'); +isnt($stdout, '', 'replayed some rows'); + +($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); +is($new_logical_xmin, '', "logical xmin null"); +isnt($new_logical_catalog_xmin, '', "logical slot catalog_xmin not null"); +cmp_ok($new_logical_catalog_xmin, ">", $logical_catalog_xmin, "logical slot catalog_xmin advanced after get_changes"); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); +isnt($new_physical_xmin, '', "physical xmin not null"); +# hot standby feedback should advance phys catalog_xmin now the standby's slot +# doesn't hold it down as far. +isnt($new_physical_catalog_xmin, '', "physical catalog_xmin not null"); +cmp_ok($new_physical_catalog_xmin, ">", $physical_catalog_xmin, "physical catalog_xmin advanced"); + +cmp_ok($new_physical_catalog_xmin, "<=", $new_logical_catalog_xmin, 'upstream physical slot catalog_xmin not past downstream catalog_xmin with hs_feedback on'); + +######################################################### +# Upstream catalog retention +######################################################### + +sub test_catalog_xmin_retention() +{ + # First burn some xids on the master in another DB, so we push the master's + # nextXid ahead. + foreach my $i (1 .. 100) + { + $node_master->safe_psql('postgres', 'SELECT txid_current()'); + } + + # Force vacuum freeze on the master and ensure its oldestXmin doesn't advance + # past our needed xmin. The only way we have visibility into that is to force + # a checkpoint. + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = true WHERE datname = 'template0'"); + foreach my $dbname ('template1', 'postgres', 'testdb', 'template0') + { + $node_master->safe_psql($dbname, 'VACUUM FREEZE'); + } + sleep(1); + $node_master->safe_psql('postgres', 'CHECKPOINT'); + IPC::Run::run(['pg_controldata', $node_master->data_dir()], '>', \$stdout) + or die "pg_controldata failed with $?"; + my @checkpoint = split('\n', $stdout); + my ($oldestXid, $oldestCatalogXmin, $nextXid) = ('', '', ''); + foreach my $line (@checkpoint) + { + if ($line =~ qr/^Latest checkpoint's NextXID:\s+\d+:(\d+)/) + { + $nextXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestXID:\s+(\d+)/) + { + $oldestXid = $1; + } + if ($line =~ qr/^Latest checkpoint's oldestCatalogXmin:\s*(\d+)/) + { + $oldestCatalogXmin = $1; + } + } + die 'no oldestXID found in checkpoint' unless $oldestXid; + + my ($new_physical_xmin, $new_physical_catalog_xmin) = print_phys_xmin(); + my ($new_logical_xmin, $new_logical_catalog_xmin) = print_logical_xmin(); + + print "upstream oldestXid $oldestXid, oldestCatalogXmin $oldestCatalogXmin, nextXid $nextXid, phys slot catalog_xmin $new_physical_catalog_xmin, downstream catalog_xmin $new_logical_catalog_xmin"; + + $node_master->safe_psql('postgres', "UPDATE pg_database SET datallowconn = false WHERE datname = 'template0'"); + + return ($oldestXid, $oldestCatalogXmin); +} + +my ($oldestXid, $oldestCatalogXmin) = test_catalog_xmin_retention(); + +cmp_ok($oldestXid, "<=", $new_logical_catalog_xmin, 'upstream oldestXid not past downstream catalog_xmin with hs_feedback on'); + +################################################## +# Drop slot +################################################## +# +is($node_replica->safe_psql('postgres', 'SHOW hot_standby_feedback'), 'on', 'hs_feedback is on'); + +($xmin, $catalog_xmin) = print_phys_xmin(); + +# Make sure slots on replicas are droppable, and properly clear the upstream's xmin +$node_replica->psql('testdb', q[SELECT pg_drop_replication_slot('standby_logical')]); + +is($node_replica->slot('standby_logical')->{'slot_type'}, '', 'slot on standby dropped manually'); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); +sleep(2); # ensure walreceiver feedback sent + +my ($new_xmin, $new_catalog_xmin) = print_phys_xmin(); +# We're now back to the old behaviour of hot_standby_feedback +# reporting nextXid for both thresholds +ok($new_catalog_xmin, "physical catalog_xmin still non-null"); +cmp_ok($new_catalog_xmin, '==', $new_xmin, + 'xmin and catalog_xmin equal after slot drop'); + + +################################################## +# Recovery: drop database drops idle slots +################################################## + +# Create a couple of slots on the DB to ensure they are dropped when we drop +# the DB on the upstream if they're on the right DB, or not dropped if on +# another DB. + +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb'), '-P', 'test_decoding', '-S', 'dodropslot', '--create-slot'], 'pg_recvlogical created dodropslot'); +# or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); +# TODO : Above, it bails out even when pg_recvlogical is successful, commented out BAIL_OUT +$node_replica->command_ok(['pg_recvlogical', '-v', '-d', $node_replica->connstr('postgres'), '-P', 'test_decoding', '-S', 'otherslot', '--create-slot'], 'pg_recvlogical created otherslot'); +# or BAIL_OUT('slot creation failed, subsequent results would be meaningless'); +# TODO : Above, it bails out even when pg_recvlogical is successful, commented out BAIL_OUT + +is($node_replica->slot('dodropslot')->{'slot_type'}, 'logical', 'slot dodropslot on standby created'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'slot otherslot on standby created'); + +# dropdb on the master to verify slots are dropped on standby +$node_master->safe_psql('postgres', q[DROP DATABASE testdb]); + +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); +is($node_replica->slot('otherslot')->{'slot_type'}, 'logical', 'otherslot on standby not dropped'); + + +################################################## +# Recovery: drop database drops in-use slots +################################################## + +# This time, have the slot in-use on the downstream DB when we drop it. +print "Testing dropdb when downstream slot is in-use"; +$node_master->psql('postgres', q[CREATE DATABASE testdb2]); + +print "creating slot dodropslot2"; +$node_replica->command_ok(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-P', 'test_decoding', '-S', 'dodropslot2', '--create-slot'], + 'pg_recvlogical created slot test_decoding'); +is($node_replica->slot('dodropslot2')->{'slot_type'}, 'logical', 'slot dodropslot2 on standby created'); + +# make sure the slot is in use +print "starting pg_recvlogical"; +$handle = IPC::Run::start(['pg_recvlogical', '-d', $node_replica->connstr('testdb2'), '-S', 'dodropslot2', '-f', '-', '--no-loop', '--start'], '>', \$stdout, '2>', \$stderr); +sleep(1); + +is($node_replica->slot('dodropslot2')->{'active'}, 't', 'slot on standby is active') + or BAIL_OUT("slot not active on standby, cannot continue. pg_recvlogical exited with '$stdout', '$stderr'"); + +# Master doesn't know the replica's slot is busy so dropdb should succeed +$node_master->safe_psql('postgres', q[DROP DATABASE testdb2]); +ok(1, 'dropdb finished'); + +while ($node_replica->slot('dodropslot2')->{'active_pid'}) +{ + sleep(1); + print "waiting for walsender to exit"; +} + +print "walsender exited, waiting for pg_recvlogical to exit"; + +# our client should've terminated in response to the walsender error +eval { + $handle->finish; +}; +$return = $?; +if ($return) { + is($return, 256, "pg_recvlogical terminated by server"); + like($stderr, qr/terminating connection due to conflict with recovery/, 'recvlogical recovery conflict'); + like($stderr, qr/User was connected to a database that must be dropped./, 'recvlogical recovery conflict db'); +} + +is($node_replica->slot('dodropslot2')->{'active_pid'}, '', 'walsender backend exited'); + +# The slot should be dropped by recovery now +$node_master->wait_for_catchup($node_replica, 'replay', $node_master->lsn('flush')); + +is($node_replica->safe_psql('postgres', q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb2')]), 'f', + 'database dropped on standby'); + +is($node_replica->slot('dodropslot2')->{'slot_type'}, '', 'slot on standby dropped'); -- 2.1.4