From 93c65c7e942e60c1b4f8363cb2fc521c49e47ee7 Mon Sep 17 00:00:00 2001 From: vignesh Date: Thu, 1 Apr 2021 11:02:54 +0530 Subject: [PATCH v1 3/4] Add total txns and total txn bytes for replication statistics. This adds the statistics about total transactions count and total transaction data logically replicated to the decoding output plugin from ReorderBuffer. Users can query the pg_stat_replication_slots view to check these stats. --- doc/src/sgml/monitoring.sgml | 24 +++++++++++++++++++ src/backend/catalog/system_views.sql | 2 ++ src/backend/postmaster/pgstat.c | 9 ++++++- src/backend/replication/logical/logical.c | 16 ++++++++----- .../replication/logical/reorderbuffer.c | 5 ++++ src/backend/replication/slot.c | 2 +- src/backend/utils/adt/pgstatfuncs.c | 8 ++++--- src/include/catalog/pg_proc.dat | 6 ++--- src/include/pgstat.h | 7 +++++- src/include/replication/reorderbuffer.h | 4 ++++ src/test/regress/expected/rules.out | 4 +++- 11 files changed, 71 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index af540fb02f..d816c0525a 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2689,6 +2689,30 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + + total_txns bigint + + + Number of in-progress transactions replicated to the decoding output + plugin. This counter is used to maintain the top level transactions, + so the counter is not incremented for subtransactions. + + + + + + total_bytesbigint + + + Amount of decoded in-progress transaction data replicated to the decoding + output plugin while decoding changes from WAL for this slot. This and other + counters for this slot can be used to gauge the network I/O which occurred + during logical decoding and allow tuning logical_decoding_work_mem. + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5f2541d316..e3991eb6f6 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -874,6 +874,8 @@ CREATE VIEW pg_stat_replication_slots AS s.stream_txns, s.stream_count, s.stream_bytes, + s.total_txns, + s.total_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 132fdef78c..b497fbed37 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1775,7 +1775,8 @@ pgstat_report_tempfile(size_t filesize) */ void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes, int streamtxns, int streamcount, int streambytes) + int spillbytes, int streamtxns, int streamcount, + int streambytes, int totaltxns, int totalbytes) { PgStat_MsgReplSlot msg; @@ -1791,6 +1792,8 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, msg.m_stream_txns = streamtxns; msg.m_stream_count = streamcount; msg.m_stream_bytes = streambytes; + msg.m_total_txns = totaltxns; + msg.m_total_bytes = totalbytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -7089,6 +7092,8 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].stream_txns += msg->m_stream_txns; replSlotStats[idx].stream_count += msg->m_stream_count; replSlotStats[idx].stream_bytes += msg->m_stream_bytes; + replSlotStats[idx].total_txns += msg->m_total_txns; + replSlotStats[idx].total_bytes += msg->m_total_bytes; } } @@ -7360,6 +7365,8 @@ pgstat_reset_replslot(int i, TimestampTz ts) replSlotStats[i].stream_txns = 0; replSlotStats[i].stream_count = 0; replSlotStats[i].stream_bytes = 0; + replSlotStats[i].total_txns = 0; + replSlotStats[i].total_bytes = 0; replSlotStats[i].stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f6803637b..3a178bfc3d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1765,28 +1765,32 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ReorderBuffer *rb = ctx->reorder; /* - * Nothing to do if we haven't spilled or streamed anything since the last - * time the stats has been sent. + * Nothing to do if we don't have any replication stats to be sent. */ - if (rb->spillBytes <= 0 && rb->streamBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0 && rb->totalBytes <= 0) return; - elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld", + elog(DEBUG2, "UpdateDecodingStats: updating stats %p %lld %lld %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, (long long) rb->spillBytes, (long long) rb->streamTxns, (long long) rb->streamCount, - (long long) rb->streamBytes); + (long long) rb->streamBytes, + (long long) rb->totalTxns, + (long long) rb->totalBytes); pgstat_report_replslot(NameStr(ctx->slot->data.name), rb->spillTxns, rb->spillCount, rb->spillBytes, - rb->streamTxns, rb->streamCount, rb->streamBytes); + rb->streamTxns, rb->streamCount, rb->streamBytes, + rb->totalTxns, rb->totalBytes); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; rb->streamTxns = 0; rb->streamCount = 0; rb->streamBytes = 0; + rb->totalTxns = 0; + rb->totalBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 52d06285a2..5bc0b05a0e 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -350,6 +350,8 @@ ReorderBufferAllocate(void) buffer->streamTxns = 0; buffer->streamCount = 0; buffer->streamBytes = 0; + buffer->totalTxns = 0; + buffer->totalBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -659,6 +661,8 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); AssertTXNLsnOrder(rb); } + + rb->totalTxns++; } else txn = NULL; /* not found and not asked to create */ @@ -3078,6 +3082,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, { txn->size += sz; rb->size += sz; + rb->totalBytes += sz; /* Update the total size in the top transaction. */ if (toptxn) diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 75a087c2f9..7dba2298b4 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -328,7 +328,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0, 0, 0); /* * Now that the slot has been marked as in_use and active, it's safe to diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 2ce8e1fd2d..c229b37abd 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2279,7 +2279,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_COLS 8 +#define PG_STAT_GET_REPLICATION_SLOT_COLS 10 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2330,11 +2330,13 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[4] = Int64GetDatum(s->stream_txns); values[5] = Int64GetDatum(s->stream_count); values[6] = Int64GetDatum(s->stream_bytes); + values[7] = Int64GetDatum(s->total_txns); + values[8] = Int64GetDatum(s->total_bytes); if (s->stat_reset_timestamp == 0) - nulls[7] = true; + nulls[9] = true; else - values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[9] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 69ffd0c3f4..8b2071c77b 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5296,9 +5296,9 @@ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o,o,o,o}', - proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', + proallargtypes => '{text,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o,o,o}', + proargnames => '{slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 10784f947e..46445c3cee 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -515,6 +515,8 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_stream_txns; PgStat_Counter m_stream_count; PgStat_Counter m_stream_bytes; + PgStat_Counter m_total_txns; + PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; @@ -879,6 +881,8 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter stream_txns; PgStat_Counter stream_count; PgStat_Counter stream_bytes; + PgStat_Counter total_txns; + PgStat_Counter total_bytes; TimestampTz stat_reset_timestamp; } PgStat_ReplSlotStats; @@ -1451,7 +1455,8 @@ extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes, int streamtxns, int streamcount, int streambytes); + int spillbytes, int streamtxns, int streamcount, + int streambytes, int totaltxns, int totalbytes); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 565a961d6a..a372b70b7d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -618,6 +618,10 @@ struct ReorderBuffer int64 streamTxns; /* number of transactions streamed */ int64 streamCount; /* streaming invocation counter */ int64 streamBytes; /* amount of data streamed */ + + /* Statistics about all the replicated transactions */ + int64 totalTxns; /* total number of transactions replicated */ + int64 totalBytes; /* total amount of data replicated */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 9b59a7b4a5..f0eea8b18f 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2056,8 +2056,10 @@ pg_stat_replication_slots| SELECT s.slot_name, s.stream_txns, s.stream_count, s.stream_bytes, + s.total_txns, + s.total_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset); + FROM pg_stat_get_replication_slots() s(slot_name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, total_txns, total_bytes, stats_reset); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, -- 2.25.1