From 1924142ec8194dc661fd1f230e77bed6d3f9d201 Mon Sep 17 00:00:00 2001 From: vignesh Date: Thu, 1 Apr 2021 20:50:10 +0530 Subject: [PATCH v1 4/4] Handle overwriting of replication slot statistic issue. There is a remote scenario where one of the replication slots is dropped and the drop slot statistics message is not received by the statistic collector process, now if the max_replication_slots is reduced to the actual number of replication slots that are in use and the publisher is re-started then the statistics process will not be aware of this and the statistic collector process will write beyond the slots available, fixed it by skipping the replication slot statistic that are after max_replication_slot. --- src/backend/postmaster/pgstat.c | 24 +++++++++- src/test/subscription/t/020_repl_stats.pl | 57 +++++++++++++++++++++-- 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b497fbed37..c65f57889e 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -5699,6 +5699,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) */ for (;;) { + PgStat_ReplSlotStats replSlotStat; switch (fgetc(fpin)) { /* @@ -5787,15 +5788,34 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) * slot follows. */ case 'R': - if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin) + if (fread(&replSlotStat, 1, sizeof(PgStat_ReplSlotStats), fpin) != sizeof(PgStat_ReplSlotStats)) { ereport(pgStatRunningInCollector ? LOG : WARNING, (errmsg("corrupted statistics file \"%s\"", statfile))); - memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); goto done; } + + /* + * There is a remote scenario where one of the replication slots + * is dropped and the drop slot statistics message is not + * received by the statistic collector process, now if the + * max_replication_slots is reduced to the actual number of + * replication slots that are in use and the publisher is + * re-started then the statistics process will not be aware of + * this, to avoid writing beyond the max_replication_slots + * these replication slot statistic information will be skipped. + */ + if (max_replication_slots == nReplSlotStats) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("skipping \"%s\" replication slot's statistic as the statistic collector process does not have enough statistic slots", + replSlotStat.slotname))); + goto done; + } + + memcpy(&replSlotStats[nReplSlotStats], &replSlotStat, sizeof(PgStat_ReplSlotStats)); nReplSlotStats++; break; diff --git a/src/test/subscription/t/020_repl_stats.pl b/src/test/subscription/t/020_repl_stats.pl index 4be58d417a..b0674f2d41 100644 --- a/src/test/subscription/t/020_repl_stats.pl +++ b/src/test/subscription/t/020_repl_stats.pl @@ -2,9 +2,10 @@ # view for logical replication. use strict; use warnings; +use File::Path qw(rmtree); use PostgresNode; use TestLib; -use Test::More tests => 5; +use Test::More tests => 7; # Create publisher node. my $node_publisher = get_new_node('publisher'); @@ -25,6 +26,10 @@ my $node_subscriber3 = get_new_node('subscriber3'); $node_subscriber3->init(allows_streaming => 'logical'); $node_subscriber3->start; +my $node_subscriber4 = get_new_node('subscriber4'); +$node_subscriber4->init(allows_streaming => 'logical'); +$node_subscriber4->start; + # Create table on publisher. $node_publisher->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b varchar)"); @@ -33,6 +38,7 @@ $node_publisher->safe_psql('postgres', $node_subscriber1->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); $node_subscriber2->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); $node_subscriber3->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); +$node_subscriber4->safe_psql('postgres', "CREATE TABLE test_tab (a int primary key, b text, c timestamptz DEFAULT now(), d bigint DEFAULT 999)"); # Setup logical replication. my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; @@ -62,9 +68,18 @@ $node_subscriber3->safe_psql('postgres', PUBLICATION tap_pub WITH (streaming = off)" ); +my $appname4 = 'tap_sub4'; +$node_subscriber4->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub4 + CONNECTION '$publisher_connstr + application_name=$appname4' + PUBLICATION tap_pub WITH (streaming = off)" +); + $node_publisher->wait_for_catchup($appname1); $node_publisher->wait_for_catchup($appname2); $node_publisher->wait_for_catchup($appname3); +$node_publisher->wait_for_catchup($appname4); # Interleave a pair of transactions, each exceeding the 64kB limit. my $in = ''; @@ -100,6 +115,7 @@ $h->finish; # errors make the next test fail, so ignore them here $node_publisher->wait_for_catchup($appname1); $node_publisher->wait_for_catchup($appname2); $node_publisher->wait_for_catchup($appname3); +$node_publisher->wait_for_catchup($appname4); # Verify data is replicated to the subscribers. my $result = @@ -114,6 +130,10 @@ $result = $node_subscriber3->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber'); +$result = + $node_subscriber4->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); +is($result, qq(3332|3332|3332), 'check publisher data is replicated to the subscriber'); + # Test to verify replication statistics data is updated in # pg_stat_replication_slots statistics view. $result = $node_publisher->safe_psql('postgres', @@ -129,13 +149,43 @@ $result = $node_publisher->safe_psql('postgres', ); is($result, qq(tap_sub1|f|f|f|t|t|t| tap_sub2|t|t|t|f|f|f| -tap_sub3|t|t|t|f|f|f|), 'check replication statistics are updated'); +tap_sub3|t|t|t|f|f|f| +tap_sub4|t|t|t|f|f|f|), 'check replication statistics are updated'); # Test to drop one of the subscribers and verify replication statistics data is # fine after publisher is restarted. -$node_subscriber3->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub3;"); +$node_subscriber4->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub4;"); + +$node_publisher->stop; +$node_publisher->start; + +# Verify statistics data present in pg_stat_replication_slots are sane after +# publisher is restarted +$result = $node_publisher->safe_psql('postgres', + "SELECT slot_name, + spill_txns > 0 AS spill_txns, + spill_count > 0 AS spill_count, + spill_bytes > 0 AS spill_bytes, + stream_txns > 0 AS stream_txns, + stream_count > 0 AS stream_count, + stream_bytes > 0 AS stream_bytes, + stats_reset + FROM pg_stat_replication_slots ORDER BY slot_name" +); +is($result, qq(tap_sub1|f|f|f|t|t|t| +tap_sub2|t|t|t|f|f|f| +tap_sub3|t|t|t|f|f|f|), 'check replication statistics are updated'); +# Test to remove one of the replication slots and adjust max_replication_slots +# accordingly to the number of slots and verify replication statistics data is +# fine after publisher is restarted. $node_publisher->stop; +my $publisher_data = $node_publisher->data_dir; +my $subscriber3_replslotdir = "$publisher_data/pg_replslot/tap_sub3"; + +rmtree($subscriber3_replslotdir); + +$node_publisher->append_conf('postgresql.conf', 'max_replication_slots = 2'); $node_publisher->start; # Verify statistics data present in pg_stat_replication_slots are sane after @@ -158,4 +208,5 @@ tap_sub2|t|t|t|f|f|f|), 'check replication statistics are updated'); $node_subscriber1->stop; $node_subscriber2->stop; $node_subscriber3->stop; +$node_subscriber4->stop; $node_publisher->stop; -- 2.25.1