From 3db9ce87b46783f20e8c34958f7cc2903bddd7da Mon Sep 17 00:00:00 2001 From: Ashutosh Bapat Date: Tue, 23 Sep 2025 16:43:33 +0530 Subject: [PATCH 2/2] Address second round of comments from Shveta Malik Add a test for plugin_filtered_bytes in logical replication case. We can not test exact number of bytes filtered because of the unavoidable background transaction activity which will be counted in the filtered bytes. --- doc/src/sgml/logicaldecoding.sgml | 13 +++++++------ src/backend/replication/logical/logical.c | 10 +++++----- src/backend/replication/logical/logicalfuncs.c | 1 + src/backend/utils/activity/pgstat_replslot.c | 10 +++++----- src/backend/utils/adt/pgstatfuncs.c | 10 +++++----- src/include/pgstat.h | 10 +++++----- src/test/recovery/t/006_logical_decoding.pl | 6 +++--- .../recovery/t/035_standby_logical_decoding.pl | 4 ++-- src/test/subscription/t/028_row_filter.pl | 14 ++++++++++++++ 9 files changed, 47 insertions(+), 31 deletions(-) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index 3952f68e806..c02d4a88d57 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -956,12 +956,13 @@ typedef struct OutputPluginStats } OutputPluginStats; sentTxns is the number of transactions sent downstream - by the output plugin. sentBytes is the amount of data, in bytes, - sent downstream by the output plugin. - OutputPluginWrite will update this counter - if ctx->stats is initialized by the output plugin. - filteredBytes is the size of changes, in bytes, that are - filtered out by the output plugin. Function + by the output plugin. sentBytes is the amount of data, + in bytes, sent downstream by the output plugin. + filteredBytes is the size of changes, in bytes, that + are filtered out by the output plugin. + OutputPluginWrite will update + sentBytes if ctx->stats is + initialized by the output plugin. Function ReorderBufferChangeSize may be used to find the size of filtered ReorderBufferChange. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b26ac29e32f..1435873101f 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1980,14 +1980,14 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) repSlotStat.stream_txns = rb->streamTxns; repSlotStat.stream_count = rb->streamCount; repSlotStat.stream_bytes = rb->streamBytes; - repSlotStat.total_txns = rb->totalTxns; - repSlotStat.total_bytes = rb->totalBytes; + repSlotStat.total_wal_txns = rb->totalTxns; + repSlotStat.total_wal_bytes = rb->totalBytes; if (stats) { repSlotStat.plugin_has_stats = true; - repSlotStat.sent_txns = stats->sentTxns; - repSlotStat.sent_bytes = stats->sentBytes; - repSlotStat.filtered_bytes = stats->filteredBytes; + repSlotStat.plugin_sent_txns = stats->sentTxns; + repSlotStat.plugin_sent_bytes = stats->sentBytes; + repSlotStat.plugin_filtered_bytes = stats->filteredBytes; } else repSlotStat.plugin_has_stats = false; diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 788967e2ab1..d2ab41de438 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -96,6 +96,7 @@ LogicalOutputWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xi */ if (ctx->stats) ctx->stats->sentBytes += ctx->out->len + sizeof(XLogRecPtr) + sizeof(TransactionId); + p->returned_rows++; } diff --git a/src/backend/utils/activity/pgstat_replslot.c b/src/backend/utils/activity/pgstat_replslot.c index ed055324a99..895940f4eb9 100644 --- a/src/backend/utils/activity/pgstat_replslot.c +++ b/src/backend/utils/activity/pgstat_replslot.c @@ -94,14 +94,14 @@ pgstat_report_replslot(ReplicationSlot *slot, const PgStat_StatReplSlotEntry *re REPLSLOT_ACC(stream_txns); REPLSLOT_ACC(stream_count); REPLSLOT_ACC(stream_bytes); - REPLSLOT_ACC(total_txns); - REPLSLOT_ACC(total_bytes); + REPLSLOT_ACC(total_wal_txns); + REPLSLOT_ACC(total_wal_bytes); statent->plugin_has_stats = repSlotStat->plugin_has_stats; if (repSlotStat->plugin_has_stats) { - REPLSLOT_ACC(sent_txns); - REPLSLOT_ACC(sent_bytes); - REPLSLOT_ACC(filtered_bytes); + REPLSLOT_ACC(plugin_sent_txns); + REPLSLOT_ACC(plugin_sent_bytes); + REPLSLOT_ACC(plugin_filtered_bytes); } #undef REPLSLOT_ACC diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 15bafe63b24..588b49059b2 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2158,13 +2158,13 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) values[4] = Int64GetDatum(slotent->stream_txns); values[5] = Int64GetDatum(slotent->stream_count); values[6] = Int64GetDatum(slotent->stream_bytes); - values[7] = Int64GetDatum(slotent->total_txns); - values[8] = Int64GetDatum(slotent->total_bytes); + values[7] = Int64GetDatum(slotent->total_wal_txns); + values[8] = Int64GetDatum(slotent->total_wal_bytes); if (slotent->plugin_has_stats) { - values[9] = Int64GetDatum(slotent->filtered_bytes); - values[10] = Int64GetDatum(slotent->sent_txns); - values[11] = Int64GetDatum(slotent->sent_bytes); + values[9] = Int64GetDatum(slotent->plugin_filtered_bytes); + values[10] = Int64GetDatum(slotent->plugin_sent_txns); + values[11] = Int64GetDatum(slotent->plugin_sent_bytes); } else { diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 87afeaed8a5..33a031c79b4 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -393,12 +393,12 @@ typedef struct PgStat_StatReplSlotEntry PgStat_Counter stream_txns; PgStat_Counter stream_count; PgStat_Counter stream_bytes; - PgStat_Counter total_txns; - PgStat_Counter total_bytes; + PgStat_Counter total_wal_txns; + PgStat_Counter total_wal_bytes; bool plugin_has_stats; - PgStat_Counter sent_txns; - PgStat_Counter sent_bytes; - PgStat_Counter filtered_bytes; + PgStat_Counter plugin_sent_txns; + PgStat_Counter plugin_sent_bytes; + PgStat_Counter plugin_filtered_bytes; TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl index b04a0d9f8db..92e42bec6a9 100644 --- a/src/test/recovery/t/006_logical_decoding.pl +++ b/src/test/recovery/t/006_logical_decoding.pl @@ -212,7 +212,7 @@ my $stats_test_slot2 = 'logical_slot'; # Stats exist for stats test slot 1 is( $node_primary->safe_psql( 'postgres', - qq(SELECT total_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT total_wal_bytes > 0, plugin_sent_bytes > 0, stats_reset IS NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), qq(t|t|t), qq(Total bytes and plugin sent bytes are both > 0 and stats_reset is NULL for slot '$stats_test_slot1'.) @@ -233,10 +233,10 @@ $node_primary->safe_psql('postgres', is( $node_primary->safe_psql( 'postgres', - qq(SELECT stats_reset > '$reset1'::timestamptz, total_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') + qq(SELECT stats_reset > '$reset1'::timestamptz, total_wal_bytes = 0, plugin_sent_bytes is NULL FROM pg_stat_replication_slots WHERE slot_name = '$stats_test_slot1') ), qq(t|t|t), - qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) + qq(Check that reset timestamp is later after the second reset of stats for slot '$stats_test_slot1' and confirm total_wal_bytes and plugin_sent_bytes were set to 0 and NULL respectively.) ); # Check that test slot 2 has NULL in reset timestamp diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl index c9c182892cf..c8577794eec 100644 --- a/src/test/recovery/t/035_standby_logical_decoding.pl +++ b/src/test/recovery/t/035_standby_logical_decoding.pl @@ -575,7 +575,7 @@ $node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT 100,'100';]); $node_standby->poll_query_until('testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ) or die "replication slot stats of vacuum_full_activeslot not updated"; # This should trigger the conflict @@ -603,7 +603,7 @@ ok( $stderr =~ # Ensure that replication slot stats are not removed after invalidation. is( $node_standby->safe_psql( 'testdb', - qq[SELECT total_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] + qq[SELECT total_wal_txns > 0 FROM pg_stat_replication_slots WHERE slot_name = 'vacuum_full_activeslot'] ), 't', 'replication slot stats not removed after invalidation'); diff --git a/src/test/subscription/t/028_row_filter.pl b/src/test/subscription/t/028_row_filter.pl index e2c83670053..798364c62e6 100644 --- a/src/test/subscription/t/028_row_filter.pl +++ b/src/test/subscription/t/028_row_filter.pl @@ -579,6 +579,11 @@ is($result, qq(3|6), # commands are for testing normal logical replication behavior. # # test row filter (INSERT, UPDATE, DELETE) +# +# Get initial plugin statistics before any filtering occurs +my $initial_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); + $node_publisher->safe_psql('postgres', "INSERT INTO tab_rowfilter_1 (a, b) VALUES (800, 'test 800')"); $node_publisher->safe_psql('postgres', @@ -612,6 +617,15 @@ $node_publisher->safe_psql('postgres', $node_publisher->wait_for_catchup($appname); +# Check final plugin statistics and verify filtering occurred. +# plugin_filtered_bytes includes the amount of changes from background +# transactions, which may or may not happen. Hence testing exact amount of +# filtered data is not possible. +my $final_filtered_bytes = $node_publisher->safe_psql('postgres', + "SELECT coalesce(plugin_filtered_bytes, 0) FROM pg_stat_replication_slots WHERE slot_name = 'tap_sub'"); +cmp_ok($final_filtered_bytes, '>', $initial_filtered_bytes, + 'plugin_filtered_bytes increased after row filtering'); + # Check expected replicated rows for tab_rowfilter_2 # tap_pub_1 filter is: (c % 2 = 0) # tap_pub_2 filter is: (c % 3 = 0) -- 2.34.1