From f7ffa65330038e1e41b4385f05a8b20c9ada02e2 Mon Sep 17 00:00:00 2001 From: vignesh Date: Mon, 5 Apr 2021 19:01:03 +0530 Subject: [PATCH v4 1/3] Added total txns and total txn bytes to replication statistics. This adds the statistics about total transactions count and total transaction data logically replicated to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats. This also includes changing char datatype to NameData datatype for slotname. --- contrib/test_decoding/expected/stats.out | 79 +++++++++++++------ contrib/test_decoding/sql/stats.sql | 48 +++++++---- doc/src/sgml/monitoring.sgml | 23 ++++++ src/backend/catalog/system_views.sql | 2 + src/backend/postmaster/pgstat.c | 38 +++++---- src/backend/replication/logical/logical.c | 30 ++++--- .../replication/logical/reorderbuffer.c | 18 +++++ src/backend/replication/slot.c | 7 +- src/backend/utils/adt/pgstatfuncs.c | 10 ++- src/include/catalog/pg_proc.dat | 6 +- src/include/pgstat.h | 15 ++-- src/include/replication/reorderbuffer.h | 4 + src/test/regress/expected/rules.out | 4 +- 13 files changed, 205 insertions(+), 79 deletions(-) diff --git a/contrib/test_decoding/expected/stats.out b/contrib/test_decoding/expected/stats.out index bca36fa903..bc8e601eab 100644 --- a/contrib/test_decoding/expected/stats.out +++ b/contrib/test_decoding/expected/stats.out @@ -8,7 +8,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d CREATE TABLE stats_test(data text); -- function to wait for counters to advance -CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ +CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$ DECLARE start_time timestamptz := clock_timestamp(); updated bool; @@ -16,12 +16,25 @@ BEGIN -- we don't want to wait forever; loop will exit after 30 seconds FOR i IN 1 .. 300 LOOP - -- check to see if all updates have been reset/updated - SELECT CASE WHEN check_reset THEN (spill_txns = 0) - ELSE (spill_txns > 0) - END - INTO updated - FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + IF check_spill_txns THEN + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (spill_txns = 0) + ELSE (spill_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + + ELSE + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (total_txns = 0) + ELSE (total_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + + END IF; exit WHEN updated; @@ -51,16 +64,16 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, -- Check stats, wait for the stats collector to update. We can't test the -- exact stats count as that can vary if any background transaction (say by -- autovacuum) happens in parallel to the main transaction. -SELECT wait_for_decode_stats(false); +SELECT wait_for_decode_stats(false, true); wait_for_decode_stats ----------------------- (1 row) -SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; - slot_name | spill_txns | spill_count ------------------+------------+------------- - regression_slot | t | t +SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; + slot_name | spill_txns | spill_count | total_txns | total_bytes +-----------------+------------+-------------+------------+------------- + regression_slot | t | t | t | t (1 row) -- reset the slot stats, and wait for stats collector to reset @@ -70,16 +83,16 @@ SELECT pg_stat_reset_replication_slot('regression_slot'); (1 row) -SELECT wait_for_decode_stats(true); +SELECT wait_for_decode_stats(true, true); wait_for_decode_stats ----------------------- (1 row) -SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots; - slot_name | spill_txns | spill_count ------------------+------------+------------- - regression_slot | 0 | 0 +SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots; + slot_name | spill_txns | spill_count | total_txns | total_bytes +-----------------+------------+-------------+------------+------------- + regression_slot | 0 | 0 | 0 | 0 (1 row) -- decode and check stats again. @@ -89,16 +102,36 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 5002 (1 row) -SELECT wait_for_decode_stats(false); +SELECT wait_for_decode_stats(false, true); + wait_for_decode_stats +----------------------- + +(1 row) + +SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; + slot_name | spill_txns | spill_count | total_txns | total_bytes +-----------------+------------+-------------+------------+------------- + regression_slot | t | t | t | t +(1 row) + +SELECT pg_stat_reset_replication_slot('regression_slot'); + pg_stat_reset_replication_slot +-------------------------------- + +(1 row) + +-- non-spilled xact +INSERT INTO stats_test values(generate_series(1, 10)); +SELECT wait_for_decode_stats(false, false); wait_for_decode_stats ----------------------- (1 row) -SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; - slot_name | spill_txns | spill_count ------------------+------------+------------- - regression_slot | t | t +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; + slot_name | spill_txns | spill_count | total_txns | total_bytes +-----------------+------------+-------------+------------+------------- + regression_slot | f | f | t | t (1 row) -- Ensure stats can be repeatedly accessed using the same stats snapshot. See @@ -117,7 +150,7 @@ SELECT slot_name FROM pg_stat_replication_slots; (1 row) COMMIT; -DROP FUNCTION wait_for_decode_stats(bool); +DROP FUNCTION wait_for_decode_stats(bool, bool); DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot'); pg_drop_replication_slot diff --git a/contrib/test_decoding/sql/stats.sql b/contrib/test_decoding/sql/stats.sql index 51294e48e8..8c34aeced1 100644 --- a/contrib/test_decoding/sql/stats.sql +++ b/contrib/test_decoding/sql/stats.sql @@ -6,7 +6,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d CREATE TABLE stats_test(data text); -- function to wait for counters to advance -CREATE FUNCTION wait_for_decode_stats(check_reset bool) RETURNS void AS $$ +CREATE FUNCTION wait_for_decode_stats(check_reset bool, check_spill_txns bool) RETURNS void AS $$ DECLARE start_time timestamptz := clock_timestamp(); updated bool; @@ -14,12 +14,25 @@ BEGIN -- we don't want to wait forever; loop will exit after 30 seconds FOR i IN 1 .. 300 LOOP - -- check to see if all updates have been reset/updated - SELECT CASE WHEN check_reset THEN (spill_txns = 0) - ELSE (spill_txns > 0) - END - INTO updated - FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + IF check_spill_txns THEN + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (spill_txns = 0) + ELSE (spill_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + + ELSE + + -- check to see if all updates have been reset/updated + SELECT CASE WHEN check_reset THEN (total_txns = 0) + ELSE (total_txns > 0) + END + INTO updated + FROM pg_stat_replication_slots WHERE slot_name='regression_slot'; + + END IF; exit WHEN updated; @@ -46,18 +59,25 @@ SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, -- Check stats, wait for the stats collector to update. We can't test the -- exact stats count as that can vary if any background transaction (say by -- autovacuum) happens in parallel to the main transaction. -SELECT wait_for_decode_stats(false); -SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; +SELECT wait_for_decode_stats(false, true); +SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; -- reset the slot stats, and wait for stats collector to reset SELECT pg_stat_reset_replication_slot('regression_slot'); -SELECT wait_for_decode_stats(true); -SELECT slot_name, spill_txns, spill_count FROM pg_stat_replication_slots; +SELECT wait_for_decode_stats(true, true); +SELECT slot_name, spill_txns, spill_count, total_txns, total_bytes FROM pg_stat_replication_slots; -- decode and check stats again. SELECT count(*) FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'skip-empty-xacts', '1'); -SELECT wait_for_decode_stats(false); -SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count FROM pg_stat_replication_slots; +SELECT wait_for_decode_stats(false, true); +SELECT slot_name, spill_txns > 0 AS spill_txns, spill_count > 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; + +SELECT pg_stat_reset_replication_slot('regression_slot'); + +-- non-spilled xact +INSERT INTO stats_test values(generate_series(1, 10)); +SELECT wait_for_decode_stats(false, false); +SELECT slot_name, spill_txns = 0 AS spill_txns, spill_count = 0 AS spill_count, total_txns > 0 AS total_txns, total_bytes > 0 AS total_bytes FROM pg_stat_replication_slots; -- Ensure stats can be repeatedly accessed using the same stats snapshot. See -- https://postgr.es/m/20210317230447.c7uc4g3vbs4wi32i%40alap3.anarazel.de @@ -66,6 +86,6 @@ SELECT slot_name FROM pg_stat_replication_slots; SELECT slot_name FROM pg_stat_replication_slots; COMMIT; -DROP FUNCTION wait_for_decode_stats(bool); +DROP FUNCTION wait_for_decode_stats(bool, bool); DROP TABLE stats_test; SELECT pg_drop_replication_slot('regression_slot'); diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 56018745c8..a727f32fb2 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2689,6 +2689,29 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + + total_txns bigint + + + Number of decoded transactions sent to the decoding output plugin for + this slot. This counter is used to maintain the top level transactions, + so the counter is not incremented for subtransactions. + + + + + + total_bytesbigint + + + Amount of decoded transactions data sent to the decoding output plugin + while decoding the changes from WAL for this slot. This can be used to + gauge the total amount of data sent during logical decoding. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5f2541d316..e3991eb6f6 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -874,6 +874,8 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_txns, s.stream_count, s.stream_bytes, + s.total_txns, + s.total_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 5ba776e789..89b9315af6 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -62,6 +62,7 @@ #include "storage/pg_shmem.h" #include "storage/proc.h" #include "storage/procsignal.h" +#include "utils/builtins.h" #include "utils/guc.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -1525,7 +1526,7 @@ pgstat_reset_replslot_counter(const char *name) if (SlotIsPhysical(slot)) return; - strlcpy(msg.m_slotname, name, NAMEDATALEN); + namestrcpy(&msg.m_slotname, name); msg.clearall = false; } else @@ -1743,10 +1744,7 @@ pgstat_report_tempfile(size_t filesize) * ---------- */ void -pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns, - PgStat_Counter spillcount, PgStat_Counter spillbytes, - PgStat_Counter streamtxns, PgStat_Counter streamcount, - PgStat_Counter streambytes) +pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat) { PgStat_MsgReplSlot msg; @@ -1754,14 +1752,16 @@ pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns, * Prepare and send the message */ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - strlcpy(msg.m_slotname, slotname, NAMEDATALEN); + namestrcpy(&msg.m_slotname, NameStr(repSlotStat->slotname)); msg.m_drop = false; - msg.m_spill_txns = spilltxns; - msg.m_spill_count = spillcount; - msg.m_spill_bytes = spillbytes; - msg.m_stream_txns = streamtxns; - msg.m_stream_count = streamcount; - msg.m_stream_bytes = streambytes; + msg.m_spill_txns = repSlotStat->spill_txns; + msg.m_spill_count = repSlotStat->spill_count;; + msg.m_spill_bytes = repSlotStat->spill_bytes; + msg.m_stream_txns = repSlotStat->stream_txns; + msg.m_stream_count = repSlotStat->stream_count; + msg.m_stream_bytes = repSlotStat->stream_bytes; + msg.m_total_txns = repSlotStat->total_txns; + msg.m_total_bytes = repSlotStat->total_bytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -1777,7 +1777,7 @@ pgstat_report_replslot_drop(const char *slotname) PgStat_MsgReplSlot msg; pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT); - strlcpy(msg.m_slotname, slotname, NAMEDATALEN); + namestrcpy(&msg.m_slotname, slotname); msg.m_drop = true; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -5049,7 +5049,7 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, else { /* Get the index of replication slot statistics to reset */ - idx = pgstat_replslot_index(msg->m_slotname, false); + idx = pgstat_replslot_index(NameStr(msg->m_slotname), false); /* * Nothing to do if the given slot entry is not found. This could @@ -5347,7 +5347,7 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) * Get the index of replication slot statistics. On dropping, we don't * create the new statistics. */ - idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop); + idx = pgstat_replslot_index(NameStr(msg->m_slotname), !msg->m_drop); /* * The slot entry is not found or there is no space to accommodate the new @@ -5379,6 +5379,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].stream_txns += msg->m_stream_txns; replSlotStats[idx].stream_count += msg->m_stream_count; replSlotStats[idx].stream_bytes += msg->m_stream_bytes; + replSlotStats[idx].total_txns += msg->m_total_txns; + replSlotStats[idx].total_bytes += msg->m_total_bytes; } } @@ -5572,7 +5574,7 @@ pgstat_replslot_index(const char *name, bool create_it) Assert(nReplSlotStats <= max_replication_slots); for (i = 0; i < nReplSlotStats; i++) { - if (strcmp(replSlotStats[i].slotname, name) == 0) + if (namestrcmp(&replSlotStats[i].slotname, name) == 0) return i; /* found */ } @@ -5585,7 +5587,7 @@ pgstat_replslot_index(const char *name, bool create_it) /* Register new slot */ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats)); - strlcpy(replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN); + namestrcpy(&replSlotStats[nReplSlotStats].slotname, name); return nReplSlotStats++; } @@ -5606,6 +5608,8 @@ pgstat_reset_replslot(int i, TimestampTz ts) replSlotStats[i].stream_txns = 0; replSlotStats[i].stream_count = 0; replSlotStats[i].stream_bytes = 0; + replSlotStats[i].total_txns = 0; + replSlotStats[i].total_bytes = 0; replSlotStats[i].stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f6803637b..d0ad694477 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1763,30 +1763,42 @@ void UpdateDecodingStats(LogicalDecodingContext *ctx) { ReorderBuffer *rb = ctx->reorder; + PgStat_ReplSlotStats repSlotStat; /* - * Nothing to do if we haven't spilled or streamed anything since the last - * time the stats has been sent. + * Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, (long long) rb->spillBytes, (long long) rb->streamTxns, (long long) rb->streamCount, - (long long) rb->streamBytes); - - pgstat_report_replslot(NameStr(ctx->slot->data.name), - rb->spillTxns, rb->spillCount, rb->spillBytes, - rb->streamTxns, rb->streamCount, rb->streamBytes); + (long long) rb->streamBytes, + (long long) rb->totalTxns, + (long long) rb->totalBytes); + + namestrcpy(&repSlotStat.slotname, NameStr(ctx->slot->data.name)); + repSlotStat.spill_txns = rb->spillTxns; + repSlotStat.spill_count = rb->spillCount; + repSlotStat.spill_bytes = rb->spillBytes; + repSlotStat.stream_txns = rb->streamTxns; + repSlotStat.stream_count = rb->streamCount; + repSlotStat.stream_bytes = rb->streamBytes; + repSlotStat.total_txns = rb->totalTxns; + repSlotStat.total_bytes = rb->totalBytes; + + pgstat_report_replslot(&repSlotStat); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; + rb->totalTxns = 0; + rb->totalBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 52d06285a2..7899060f02 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -350,6 +350,8 @@ ReorderBufferAllocate(void) buffer->streamTxns = 0; buffer->streamCount = 0; buffer->streamBytes = 0; + buffer->totalTxns = 0; + buffer->totalBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -659,6 +661,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); AssertTXNLsnOrder(rb); } + } else txn = NULL; /* not found and not asked to create */ @@ -2051,6 +2054,17 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, rb->begin(rb, txn); } + /* + * Update total transaction count and total transaction bytes, if + * transaction is streamed or spilled it will be updated while the + * transaction gets spilled or streamed. + */ + if (!rb->streamBytes && !rb->spillBytes) + { + rb->totalTxns++; + rb->totalBytes += rb->size; + } + ReorderBufferIterTXNInit(rb, txn, &iterstate); while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) { @@ -3524,9 +3538,11 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) { rb->spillCount += 1; rb->spillBytes += size; + rb->totalBytes += size; /* don't consider already serialized transactions */ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; + rb->totalTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; } Assert(spilled == txn->nentries_mem); @@ -3892,9 +3908,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) rb->streamCount += 1; rb->streamBytes += stream_bytes; + rb->totalBytes += stream_bytes; /* Don't consider already streamed transaction. */ rb->streamTxns += (txn_is_streamed) ? 0 : 1; + rb->totalTxns += (txn_is_streamed) ? 0 : 1; Assert(dlist_is_empty(&txn->changes)); Assert(txn->nentries == 0); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c2f9..f61b163f78 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -328,7 +328,12 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); + { + PgStat_ReplSlotStats repSlotStat; + MemSet(&repSlotStat, 0, sizeof(PgStat_ReplSlotStats)); + namestrcpy(&repSlotStat.slotname, NameStr(slot->data.name)); + pgstat_report_replslot(&repSlotStat); + } /* * Now that the slot has been marked as in_use and active, it's safe to diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 9ffbca685c..54eb3d1538 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2279,7 +2279,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2323,18 +2323,20 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) MemSet(values, 0, sizeof(values)); MemSet(nulls, 0, sizeof(nulls)); - values[0] = PointerGetDatum(cstring_to_text(s->slotname)); + values[0] = CStringGetTextDatum(NameStr(s->slotname)); values[1] = Int64GetDatum(s->spill_txns); values[2] = Int64GetDatum(s->spill_count); values[3] = Int64GetDatum(s->spill_bytes); values[4] = Int64GetDatum(s->stream_txns); values[5] = Int64GetDatum(s->stream_count); values[6] = Int64GetDatum(s->stream_bytes); + values[7] = Int64GetDatum(s->total_txns); + values[8] = Int64GetDatum(s->total_bytes); if (s->stat_reset_timestamp == 0) - nulls[7] = true; + nulls[9] = true; else - values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 4309fa40dd..95be4db2b2 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5311,9 +5311,9 @@ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', + proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 7cd137506e..63b740ab40 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -378,7 +378,7 @@ typedef struct PgStat_MsgResetslrucounter typedef struct PgStat_MsgResetreplslotcounter { PgStat_MsgHdr m_hdr; - char m_slotname[NAMEDATALEN]; + NameData m_slotname; bool clearall; } PgStat_MsgResetreplslotcounter; @@ -506,7 +506,7 @@ typedef struct PgStat_MsgSLRU typedef struct PgStat_MsgReplSlot { PgStat_MsgHdr m_hdr; - char m_slotname[NAMEDATALEN]; + NameData m_slotname; bool m_drop; PgStat_Counter m_spill_txns; PgStat_Counter m_spill_count; @@ -514,6 +514,8 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_stream_txns; PgStat_Counter m_stream_count; PgStat_Counter m_stream_bytes; + PgStat_Counter m_total_txns; + PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; @@ -871,13 +873,15 @@ typedef struct PgStat_SLRUStats */ typedef struct PgStat_ReplSlotStats { - char slotname[NAMEDATALEN]; + NameData slotname; PgStat_Counter spill_txns; PgStat_Counter spill_count; PgStat_Counter spill_bytes; PgStat_Counter stream_txns; PgStat_Counter stream_count; PgStat_Counter stream_bytes; + PgStat_Counter total_txns; + PgStat_Counter total_bytes; TimestampTz stat_reset_timestamp; } PgStat_ReplSlotStats; @@ -980,10 +984,7 @@ extern void pgstat_report_recovery_conflict(int reason); extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); -extern void pgstat_report_replslot(const char *slotname, PgStat_Counter spilltxns, - PgStat_Counter spillcount, PgStat_Counter spillbytes, - PgStat_Counter streamtxns, PgStat_Counter streamcount, - PgStat_Counter streambytes); +extern void pgstat_report_replslot(const PgStat_ReplSlotStats *repSlotStat); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 565a961d6a..a372b70b7d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -618,6 +618,10 @@ struct ReorderBuffer int64 streamTxns; /* number of transactions streamed */ int64 streamCount; /* streaming invocation counter */ int64 streamBytes; /* amount of data streamed */ + + /* Statistics about all the replicated transactions */ + int64 totalTxns; /* total number of transactions replicated */ + int64 totalBytes; /* total amount of data replicated */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 9b59a7b4a5..f0eea8b18f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2056,8 +2056,10 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_txns, s.stream_count, s.stream_bytes, + s.total_txns, + s.total_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset); + FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, -- 2.25.1