From 95ddb15af81bca46e9d0739b96351167cad06e6c Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Tue, 23 Sep 2025 16:43:33 +0530 Subject: [PATCH 2/2] Address second round of comments from Shveta Malik Add a test for plugin_filtered_bytes in logical replication case. We can not test exact number of bytes filtered because of the unavoidable background transaction activity which will be counted in the filtered bytes. --- doc/src/sgml/logicaldecoding.sgml | 13 ++++++------ src/backend/replication/logical/logical.c | 10 +++++----- .../replication/logical/logicalfuncs.c | 1 + src/backend/utils/activity/pgstat_replslot.c | 10 +++++----- src/backend/utils/adt/pgstatfuncs.c | 10 +++++----- src/include/pgstat.h | 10 +++++----- src/test/recovery/t/006_logical_decoding.pl | 6 +++--- .../t/035_standby_logical_decoding.pl | 4 ++-- src/test/subscription/t/001_rep_changes.pl | 11 ++++++++++ src/test/subscription/t/010_truncate.pl | 20 +++++++++++++++++++ src/test/subscription/t/028_row_filter.pl | 11 ++++++++++ 11 files changed, 75 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 3952f68e806..c02d4a88d57 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -956,12 +956,13 @@ typedef struct OutputPluginStats } OutputPluginStats; sentTxns is the number of transactions sent downstream - by the output plugin. sentBytes is the amount of data, in bytes, - sent downstream by the output plugin. - OutputPluginWrite will update this counter - if ctx->stats is initialized by the output plugin. - filteredBytes is the size of changes, in bytes, that are - filtered out by the output plugin. Function + by the output plugin. sentBytes is the amount of data, + in bytes, sent downstream by the output plugin. + filteredBytes is the size of changes, in bytes, that + are filtered out by the output plugin. + OutputPluginWrite will update + sentBytes if ctx->stats is + initialized by the output plugin. Function ReorderBufferChangeSize may be used to find the size of filtered ReorderBufferChange. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b26ac29e32f..1435873101f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1980,14 +1980,14 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.total_wal_txns = rb->totalTxns; + repSlotStat.total_wal_bytes = rb->totalBytes; if (stats) { repSlotStat.plugin_has_stats = true; - repSlotStat.sent_txns = stats->sentTxns; - repSlotStat.sent_bytes = stats->sentBytes; - repSlotStat.filtered_bytes = stats->filteredBytes; + repSlotStat.plugin_sent_txns = stats->sentTxns; + repSlotStat.plugin_sent_bytes = stats->sentBytes; + repSlotStat.plugin_filtered_bytes = stats->filteredBytes; } else repSlotStat.plugin_has_stats = false; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 788967e2ab1..d2ab41de438 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -96,6 +96,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi */ if (ctx->stats) ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId); + p->returned_rows++; } diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ed055324a99..895940f4eb9 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -94,14 +94,14 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_txns); REPLSLOT_ACC(stream_count); REPLSLOT_ACC(stream_bytes); - REPLSLOT_ACC(total_txns); - REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(total_wal_txns); + REPLSLOT_ACC(total_wal_bytes); statent->plugin_has_stats = repSlotStat->plugin_has_stats; if (repSlotStat->plugin_has_stats) { - REPLSLOT_ACC(sent_txns); - REPLSLOT_ACC(sent_bytes); - REPLSLOT_ACC(filtered_bytes); + REPLSLOT_ACC(plugin_sent_txns); + REPLSLOT_ACC(plugin_sent_bytes); + REPLSLOT_ACC(plugin_filtered_bytes); } #undef REPLSLOT_ACC diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 15bafe63b24..588b49059b2 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2158,13 +2158,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[4] = Int64GetDatum(slotent->stream_txns); values[5] = Int64GetDatum(slotent->stream_count); values[6] = Int64GetDatum(slotent->stream_bytes); - values[7] = Int64GetDatum(slotent->total_txns); - values[8] = Int64GetDatum(slotent->total_bytes); + values[7] = Int64GetDatum(slotent->total_wal_txns); + values[8] = Int64GetDatum(slotent->total_wal_bytes); if (slotent->plugin_has_stats) { - values[9] = Int64GetDatum(slotent->filtered_bytes); - values[10] = Int64GetDatum(slotent->sent_txns); - values[11] = Int64GetDatum(slotent->sent_bytes); + values[9] = Int64GetDatum(slotent->plugin_filtered_bytes); + values[10] = Int64GetDatum(slotent->plugin_sent_txns); + values[11] = Int64GetDatum(slotent->plugin_sent_bytes); } else { diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 87afeaed8a5..33a031c79b4 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -393,12 +393,12 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_txns; PgStat_Counter stream_count; PgStat_Counter stream_bytes; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; + PgStat_Counter total_wal_txns; + PgStat_Counter total_wal_bytes; bool plugin_has_stats; - PgStat_Counter sent_txns; - PgStat_Counter sent_bytes; - PgStat_Counter filtered_bytes; + PgStat_Counter plugin_sent_txns; + PgStat_Counter plugin_sent_bytes; + PgStat_Counter plugin_filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index b04a0d9f8db..92e42bec6a9 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -212,7 +212,7 @@ my $stats_test_slot2 = 'logical_slot'; # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_wal_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), qq(t|t|t), qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) @@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres', is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), qq(t|t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index c9c182892cf..c8577794eec 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -575,7 +575,7 @@ $node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]); $node_standby->poll_query_until('testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ) or die "replication slot stats of vacuum_full_activeslot not updated"; # This should trigger the conflict @@ -603,7 +603,7 @@ ok( $stderr =~ # Ensure that replication slot stats are not removed after invalidation. is( $node_standby->safe_psql( 'testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ), 't', 'replication slot stats not removed after invalidation'); diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl index ca55d8df50d..a7bee7fe5e4 100644 --- a/src/test/subscription/t/001_rep_changes.pl +++ b/src/test/subscription/t/001_rep_changes.pl @@ -124,6 +124,9 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_ins"); is($result, qq(1002), 'check initial data was copied to subscriber'); +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_ins SELECT generate_series(1,50)"); $node_publisher->safe_psql('postgres', "DELETE FROM tab_ins WHERE a > 20"); @@ -157,6 +160,14 @@ $node_publisher->safe_psql('postgres', $node_publisher->wait_for_catchup('tap_sub'); +# Verify that plugin_filtered_bytes increases due to filtered update and delete +# operations on tab_ins. We cannot test the exact value since it may include +# changes from other concurrent transactions. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after DML filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab_ins"); is($result, qq(1052|1|1002), 'check replicated inserts on subscriber'); diff --git a/src/test/subscription/t/010_truncate.pl b/src/test/subscription/t/010_truncate.pl index 3d16c2a800d..c41ad317221 100644 --- a/src/test/subscription/t/010_truncate.pl +++ b/src/test/subscription/t/010_truncate.pl @@ -69,6 +69,9 @@ $node_subscriber->safe_psql('postgres', # Wait for initial sync of all subscriptions $node_subscriber->wait_for_subscription_sync; +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); + # insert data to truncate $node_subscriber->safe_psql('postgres', @@ -98,6 +101,16 @@ $node_publisher->wait_for_catchup('sub1'); $result = $node_subscriber->safe_psql('postgres', "SELECT nextval('seq1')"); is($result, qq(101), 'truncate restarted identities'); +# All the DMLs above happen on tables that are subscribed to by sub1 and not +# sub2. plugin_filtered_bytes should get incremented for replication slot +# corresponding to the subscription sub2. We can not test the exact value of +# plugin_filtered_bytes because the counter is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after publication level filtering'); +$initial_filtered_bytes = $final_filtered_bytes; + # test publication that does not replicate truncate $node_subscriber->safe_psql('postgres', @@ -107,6 +120,13 @@ $node_publisher->safe_psql('postgres', "TRUNCATE tab2"); $node_publisher->wait_for_catchup('sub2'); +# Truncate changes are filtered out at publication level itself. Make sure that +# the plugin_filtered_bytes is incremented. +$final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'sub2'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after truncate filtering'); + $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM tab2"); is($result, qq(3|1|3), 'truncate not replicated'); diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index e2c83670053..039bf5ff5a0 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -579,6 +579,9 @@ is($result, qq(3|6), # commands are for testing normal logical replication behavior. # # test row filter (INSERT, UPDATE, DELETE) +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')"); $node_publisher->safe_psql('postgres', @@ -612,6 +615,14 @@ $node_publisher->safe_psql('postgres', $node_publisher->wait_for_catchup($appname); +# The changes which do not pass the row filter will be filtered. Make sure that +# the plugin_filtered_bytes reflects that. We can not test the exact value of +# plugin_filtered_bytes since it is affected by background activity. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT plugin_filtered_bytes FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after row filtering'); + # Check expected replicated rows for tab_rowfilter_2 # tap_pub_1 filter is: (c % 2 = 0) # tap_pub_2 filter is: (c % 3 = 0) -- 2.34.1