From 65c54baa51a26a0189024b01cef21d495e5a49ea Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Thu, 8 Oct 2020 18:44:22 +0530 Subject: [PATCH v2] Track streaming stats. --- doc/src/sgml/monitoring.sgml | 34 +++++++++++++++++++++++++ src/backend/catalog/system_views.sql | 3 +++ src/backend/postmaster/pgstat.c | 8 +++++- src/backend/replication/logical/logical.c | 19 +++++++++----- src/backend/replication/logical/reorderbuffer.c | 10 ++++++++ src/backend/replication/slot.c | 2 +- src/backend/utils/adt/pgstatfuncs.c | 9 ++++--- src/include/catalog/pg_proc.dat | 6 ++--- src/include/pgstat.h | 8 +++++- src/include/replication/reorderbuffer.h | 3 +++ src/test/regress/expected/rules.out | 5 +++- 11 files changed, 91 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 6656676..b182a9e 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -2631,6 +2631,40 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + stream_txns bigint + + + Number of in-progress transactions streamed to subscriber after + memory used by logical decoding exceeds logical_decoding_work_mem. + Streaming only works with toplevel transactions (subtransactions can't + be streamed independently), so the counter does not get incremented for + subtransactions. + + + + + + stream_countbigint + + + Number of times in-progress transactions were streamed to subscriber. + Transactions may get streamed repeatedly, and this counter gets incremented + on every such invocation. + + + + + + stream_bytesbigint + + + Amount of decoded in-progress transaction data streamed to subscriber. + + + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index c293907..dd5584f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -802,6 +802,9 @@ CREATE VIEW pg_stat_replication_slots AS s.spill_txns, s.spill_count, s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, s.stats_reset FROM pg_stat_get_replication_slots() AS s; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 822f0eb..f7a9dac 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -1708,7 +1708,7 @@ pgstat_report_tempfile(size_t filesize) */ void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes) + int spillbytes, int streamtxns, int streamcount, int streambytes) { PgStat_MsgReplSlot msg; @@ -1721,6 +1721,9 @@ pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, msg.m_spill_txns = spilltxns; msg.m_spill_count = spillcount; msg.m_spill_bytes = spillbytes; + msg.m_stream_txns = streamtxns; + msg.m_stream_count = streamcount; + msg.m_stream_bytes = streambytes; pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } @@ -6892,6 +6895,9 @@ pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len) replSlotStats[idx].spill_txns += msg->m_spill_txns; replSlotStats[idx].spill_count += msg->m_spill_count; replSlotStats[idx].spill_bytes += msg->m_spill_bytes; + replSlotStats[idx].stream_txns += msg->m_stream_txns; + replSlotStats[idx].stream_count += msg->m_stream_count; + replSlotStats[idx].stream_bytes += msg->m_stream_bytes; } } diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3346df3..0e5ede1 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1471,21 +1471,28 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) ReorderBuffer *rb = ctx->reorder; /* - * Nothing to do if we haven't spilled anything since the last time the - * stats has been sent. + * Nothing to do if we haven't spilled or streamed anything since the last + * time the stats has been sent. */ - if (rb->spillBytes <= 0) + if (rb->spillBytes <= 0 && rb->streamBytes <= 0) return; - elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld", + elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld %lld %lld %lld", rb, (long long) rb->spillTxns, (long long) rb->spillCount, - (long long) rb->spillBytes); + (long long) rb->spillBytes, + (long long) rb->streamTxns, + (long long) rb->streamCount, + (long long) rb->streamBytes); pgstat_report_replslot(NameStr(ctx->slot->data.name), - rb->spillTxns, rb->spillCount, rb->spillBytes); + rb->spillTxns, rb->spillCount, rb->spillBytes, + rb->streamTxns, rb->streamCount, rb->streamBytes); rb->spillTxns = 0; rb->spillCount = 0; rb->spillBytes = 0; + rb->streamTxns = 0; + rb->streamCount = 0; + rb->streamBytes = 0; } diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 189641b..28d3980 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -346,6 +346,9 @@ ReorderBufferAllocate(void) buffer->spillTxns = 0; buffer->spillCount = 0; buffer->spillBytes = 0; + buffer->streamTxns = 0; + buffer->streamCount = 0; + buffer->streamBytes = 0; buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; @@ -3520,6 +3523,13 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) txn->snapshot_now = NULL; } + + rb->streamCount += 1; + rb->streamBytes += txn->total_size; + + /* Don't consider already streamed transaction. */ + rb->streamTxns += (rbtxn_is_streamed(txn)) ? 0 : 1; + /* Process and send the changes to output plugin. */ ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, command_id, true); diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 220b4cd..09be1d8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -320,7 +320,7 @@ ReplicationSlotCreate(const char *name, bool db_specific, * ReplicationSlotAllocationLock. */ if (SlotIsLogical(slot)) - pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0); + pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0, 0, 0, 0); /* * Now that the slot has been marked as in_use and active, it's safe to diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 0d0d2e6..ae87bc1 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2153,7 +2153,7 @@ pg_stat_get_archiver(PG_FUNCTION_ARGS) Datum pg_stat_get_replication_slots(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5 +#define PG_STAT_GET_REPLICATION_SLOT_CLOS 8 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2201,11 +2201,14 @@ pg_stat_get_replication_slots(PG_FUNCTION_ARGS) values[1] = Int64GetDatum(s->spill_txns); values[2] = Int64GetDatum(s->spill_count); values[3] = Int64GetDatum(s->spill_bytes); + values[4] = Int64GetDatum(s->stream_txns); + values[5] = Int64GetDatum(s->stream_count); + values[6] = Int64GetDatum(s->stream_bytes); if (s->stat_reset_timestamp == 0) - nulls[4] = true; + nulls[7] = true; else - values[4] = TimestampTzGetDatum(s->stat_reset_timestamp); + values[7] = TimestampTzGetDatum(s->stat_reset_timestamp); tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 22340ba..1b64aa8 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5261,9 +5261,9 @@ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => '', - proallargtypes => '{text,int8,int8,int8,timestamptz}', - proargmodes => '{o,o,o,o,o}', - proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}', + proallargtypes => '{text,int8,int8,int8,int8,int8,int8,timestamptz}', + proargmodes => '{o,o,o,o,o,o,o,o}', + proargnames => '{name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slots' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index a821ff4..960533b 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -492,6 +492,9 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_spill_txns; PgStat_Counter m_spill_count; PgStat_Counter m_spill_bytes; + PgStat_Counter m_stream_txns; + PgStat_Counter m_stream_count; + PgStat_Counter m_stream_bytes; } PgStat_MsgReplSlot; @@ -823,6 +826,9 @@ typedef struct PgStat_ReplSlotStats PgStat_Counter spill_txns; PgStat_Counter spill_count; PgStat_Counter spill_bytes; + PgStat_Counter stream_txns; + PgStat_Counter stream_count; + PgStat_Counter stream_bytes; TimestampTz stat_reset_timestamp; } PgStat_ReplSlotStats; @@ -1387,7 +1393,7 @@ extern void pgstat_report_deadlock(void); extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount); extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount, - int spillbytes); + int spillbytes, int streamtxns, int streamcount, int streambytes); extern void pgstat_report_replslot_drop(const char *slotname); extern void pgstat_initialize(void); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 0cc3aeb..018ad4c 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -543,6 +543,9 @@ struct ReorderBuffer int64 spillTxns; /* number of transactions spilled to disk */ int64 spillCount; /* spill-to-disk invocation counter */ int64 spillBytes; /* amount of data spilled to disk */ + int64 streamTxns; /* number of transactions spilled to disk */ + int64 streamCount; /* streaming invocation counter */ + int64 streamBytes; /* amount of data streamed to subscriber */ }; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index cf2a9b4..576166d 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2022,8 +2022,11 @@ pg_stat_replication_slots| SELECT s.name, s.spill_txns, s.spill_count, s.spill_bytes, + s.stream_txns, + s.stream_count, + s.stream_bytes, s.stats_reset - FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset); + FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stream_txns, stream_count, stream_bytes, stats_reset); pg_stat_slru| SELECT s.name, s.blks_zeroed, s.blks_hit, -- 1.8.3.1