From 3086779a6a89ba732853e35c43514659fb60f0c2 Mon Sep 17 00:00:00 2001 From: Takamichi Osumi Date: Wed, 2 Mar 2022 00:48:53 +0000 Subject: [PATCH v26] Extend pg_stat_subscription_stats to include general transaction statistics Introduce 2 new subscription statistics columns (apply_commit_count and apply_rollback_count) to the pg_stat_subscription_stats view for counting cumulative transaction commits/rollbacks. The timing when the data of transaction statistics is sent to the stats collector is adjusted with PGSTAT_STAT_INTERVAL to avoid overload. Author: Takamichi Osumi Reviewed-by: Amit Kapila, Masahiko Sawada, Hou Zhijie, Greg Nancarrow, Vignesh C, Ajin Cherian, Kyotaro Horiguchi, Tang Haiying, Peter Smith Tested-by: Wang wei Discussion: https://www.postgresql.org/message-id/OSBPR01MB48887CA8F40C8D984A6DC00CED199%40OSBPR01MB4888.jpnprd01.prod.outlook.com --- doc/src/sgml/monitoring.sgml | 22 ++++++++ src/backend/catalog/system_views.sql | 2 + src/backend/postmaster/pgstat.c | 81 ++++++++++++++++++++++++++++++ src/backend/replication/logical/launcher.c | 2 + src/backend/replication/logical/worker.c | 45 +++++++++++++++++ src/backend/utils/adt/pgstatfuncs.c | 18 +++++-- src/include/catalog/pg_proc.dat | 6 +-- src/include/pgstat.h | 23 +++++++++ src/include/replication/worker_internal.h | 9 ++++ src/test/regress/expected/rules.out | 4 +- src/tools/pgindent/typedefs.list | 2 + 11 files changed, 206 insertions(+), 8 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 9fb62fe..d1eff70 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -3128,6 +3128,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + apply_commit_count bigint + + + Number of transactions successfully applied in this subscription. + Both COMMIT and COMMIT PREPARED + increment this counter. + + + + + + apply_rollback_count bigint + + + Number of transactions rollbacks in this subscription. Both + ROLLBACK and ROLLBACK PREPARED + increment this counter. + + + + + stats_reset timestamp with time zone diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 40b7bca..eae957f 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1270,6 +1270,8 @@ CREATE VIEW pg_stat_subscription_stats AS s.subname, ss.apply_error_count, ss.sync_error_count, + ss.apply_commit_count, + ss.apply_rollback_count, ss.stats_reset FROM pg_subscription as s, pg_stat_get_subscription_stats(s.oid) as ss; diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 53ddd93..e6d365d 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -55,6 +55,7 @@ #include "postmaster/postmaster.h" #include "replication/slot.h" #include "replication/walsender.h" +#include "replication/worker_internal.h" #include "storage/backendid.h" #include "storage/dsm.h" #include "storage/fd.h" @@ -286,6 +287,8 @@ static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; static HTAB *subscriptionStatHash = NULL; +extern LogicalRepSubscriptionStats subStats; + /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, * it means to write only the shared-catalog stats ("DB 0"); otherwise, we @@ -382,6 +385,7 @@ static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); static void pgstat_recv_subscription_drop(PgStat_MsgSubscriptionDrop *msg, int len); static void pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len); +static void pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1962,6 +1966,58 @@ pgstat_report_subscription_drop(Oid subid) } /* ---------- + * pgstat_report_subscription_xact() - + * + * Tell the collector about subscriptions transaction stats. + * The statistics are cleared upon sending. + * + * Setting 'force' to true makes sure that no stats data + * related to subscription commit/rollback is lost before the + * logical worker exit. + * ---------- + */ +void +pgstat_report_subscription_xact(bool force) +{ + static TimestampTz last_report = 0; + PgStat_MsgSubscriptionXact msg; + + /* Bailout early if nothing to do */ + if (!OidIsValid(subStats.subid) || + (subStats.apply_commit_count == 0 && subStats.apply_rollback_count == 0)) + return; + + if (!force) + { + TimestampTz now = GetCurrentTimestamp(); + + /* + * Don't send a message unless it's been at least PGSTAT_STAT_INTERVAL + * msec since we last sent one. This is to avoid overloading the stats + * collector. + */ + if (!TimestampDifferenceExceeds(last_report, now, PGSTAT_STAT_INTERVAL)) + return; + last_report = now; + } + + /* + * Prepare and send the message. + */ + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONXACT); + msg.m_subid = subStats.subid; + msg.apply_commit_count = subStats.apply_commit_count; + msg.apply_rollback_count = subStats.apply_rollback_count; + pgstat_send(&msg, sizeof(PgStat_MsgSubscriptionXact)); + + /* + * Clear out the statistics. + */ + subStats.apply_commit_count = 0; + subStats.apply_rollback_count = 0; +} + +/* ---------- * pgstat_ping() - * * Send some junk data to the collector to increase traffic. @@ -3687,6 +3743,10 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_subscription_error(&msg.msg_subscriptionerror, len); break; + case PGSTAT_MTYPE_SUBSCRIPTIONXACT: + pgstat_recv_subscription_xact(&msg.msg_subscriptionxact, len); + break; + default: break; } @@ -6092,6 +6152,25 @@ pgstat_recv_subscription_error(PgStat_MsgSubscriptionError *msg, int len) } /* ---------- + * pgstat_recv_subscription_xact() - + * + * Process a SUBSCRIPTIONXACT message. + * ---------- + */ +static void +pgstat_recv_subscription_xact(PgStat_MsgSubscriptionXact *msg, int len) +{ + PgStat_StatSubEntry *subentry; + + /* Get the subscription stats */ + subentry = pgstat_get_subscription_entry(msg->m_subid, true); + Assert(subentry); + + subentry->apply_commit_count += msg->apply_commit_count; + subentry->apply_rollback_count += msg->apply_rollback_count; +} + +/* ---------- * pgstat_write_statsfile_needed() - * * Do we need to write out any stats files? @@ -6268,6 +6347,8 @@ pgstat_reset_subscription(PgStat_StatSubEntry *subentry, TimestampTz ts) { subentry->apply_error_count = 0; subentry->sync_error_count = 0; + subentry->apply_commit_count = 0; + subentry->apply_rollback_count = 0; subentry->stat_reset_timestamp = ts; } diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 5a68d6d..4dfcac8 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -647,6 +647,8 @@ logicalrep_worker_onexit(int code, Datum arg) if (LogRepWorkerWalRcvConn) walrcv_disconnect(LogRepWorkerWalRcvConn); + pgstat_report_subscription_xact(true); + logicalrep_worker_detach(); /* Cleanup fileset used for streaming transactions. */ diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7e267f7..95ec2eb 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -238,6 +238,13 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .ts = 0, }; +LogicalRepSubscriptionStats subStats = +{ + .subid = InvalidOid, + .apply_commit_count = 0, + .apply_rollback_count = 0, +}; + static MemoryContext ApplyMessageContext = NULL; MemoryContext ApplyContext = NULL; @@ -329,6 +336,9 @@ static void apply_handle_tuple_routing(ApplyExecutionData *edata, /* Compute GID for two_phase transactions */ static void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid); +static void subscription_stats_incr_commit(void); +static void subscription_stats_incr_rollback(void); + /* Common streaming function to apply all the spooled messages */ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); @@ -959,6 +969,8 @@ apply_handle_commit_prepared(StringInfo s) CommitTransactionCommand(); pgstat_report_stat(false); + subscription_stats_incr_commit(); + store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; @@ -1006,6 +1018,8 @@ apply_handle_rollback_prepared(StringInfo s) FinishPreparedTransaction(gid, false); end_replication_step(); CommitTransactionCommand(); + + subscription_stats_incr_rollback(); } pgstat_report_stat(false); @@ -1217,6 +1231,8 @@ apply_handle_stream_abort(StringInfo s) { set_apply_error_context_xact(xid, 0); stream_cleanup_files(MyLogicalRepWorker->subid, xid); + + subscription_stats_incr_rollback(); } else { @@ -1463,6 +1479,8 @@ apply_handle_commit_internal(LogicalRepCommitData *commit_data) CommitTransactionCommand(); pgstat_report_stat(false); + subscription_stats_incr_commit(); + store_flush_position(commit_data->end_lsn); } else @@ -2717,6 +2735,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received) if (endofstream) break; + pgstat_report_subscription_xact(false); + /* * Wait for more data or latch. If we have unflushed transactions, * wake up after WalWriterDelay to see if they've been flushed yet (in @@ -3372,6 +3392,28 @@ TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid, int szgid) snprintf(gid, szgid, "pg_gid_%u_%u", subid, xid); } +/* + * Increment the counter of commit for subscription statistics. + */ +static void +subscription_stats_incr_commit(void) +{ + Assert(OidIsValid(subStats.subid)); + + subStats.apply_commit_count++; +} + +/* + * Increment the counter of rollback for subscription statistics. + */ +static void +subscription_stats_incr_rollback(void) +{ + Assert(OidIsValid(subStats.subid)); + + subStats.apply_rollback_count++; +} + /* Logical Replication Apply worker entry point */ void ApplyWorkerMain(Datum main_arg) @@ -3469,6 +3511,9 @@ ApplyWorkerMain(Datum main_arg) CommitTransactionCommand(); + /* Set the subid for subscription statistics */ + subStats.subid = MyLogicalRepWorker->subid; + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index fd993d0..c3d4be7 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2405,7 +2405,7 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) Datum pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 4 +#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS 6 Oid subid = PG_GETARG_OID(0); TupleDesc tupdesc; Datum values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS]; @@ -2424,7 +2424,11 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); TupleDescInitEntry(tupdesc, (AttrNumber) 3, "sync_error_count", INT8OID, -1, 0); - TupleDescInitEntry(tupdesc, (AttrNumber) 4, "stats_reset", + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "apply_commit_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "apply_rollback_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "stats_reset", TIMESTAMPTZOID, -1, 0); BlessTupleDesc(tupdesc); @@ -2448,11 +2452,17 @@ pg_stat_get_subscription_stats(PG_FUNCTION_ARGS) /* sync_error_count */ values[2] = Int64GetDatum(subentry->sync_error_count); + /* apply_commit_count */ + values[3] = Int64GetDatum(subentry->apply_commit_count); + + /* apply_rollback_count */ + values[4] = Int64GetDatum(subentry->apply_rollback_count); + /* stats_reset */ if (subentry->stat_reset_timestamp == 0) - nulls[3] = true; + nulls[5] = true; else - values[3] = TimestampTzGetDatum(subentry->stat_reset_timestamp); + values[5] = TimestampTzGetDatum(subentry->stat_reset_timestamp); /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index bf88858..5a18b9a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5380,9 +5380,9 @@ proname => 'pg_stat_get_subscription_stats', proisstrict => 'f', provolatile => 's', proparallel => 'r', prorettype => 'record', proargtypes => 'oid', - proallargtypes => '{oid,oid,int8,int8,timestamptz}', - proargmodes => '{i,o,o,o,o}', - proargnames => '{subid,subid,apply_error_count,sync_error_count,stats_reset}', + proallargtypes => '{oid,oid,int8,int8,int8,int8,timestamptz}', + proargmodes => '{i,o,o,o,o,o,o}', + proargnames => '{subid,subid,apply_error_count,sync_error_count,apply_commit_count,apply_rollback_count,stats_reset}', prosrc => 'pg_stat_get_subscription_stats' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index be2f7e2..7588d6a 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -87,6 +87,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_DISCONNECT, PGSTAT_MTYPE_SUBSCRIPTIONDROP, PGSTAT_MTYPE_SUBSCRIPTIONERROR, + PGSTAT_MTYPE_SUBSCRIPTIONXACT } StatMsgType; /* ---------- @@ -577,6 +578,23 @@ typedef struct PgStat_MsgSubscriptionError bool m_is_apply_error; } PgStat_MsgSubscriptionError; + +/* ---------- + * PgStat_MsgSubscriptionXact Sent by the subscription worker to report transaction + * ends. + * ---------- + */ +typedef struct PgStat_MsgSubscriptionXact +{ + PgStat_MsgHdr m_hdr; + + /* determine the subscription entry */ + Oid m_subid; + + PgStat_Counter apply_commit_count; + PgStat_Counter apply_rollback_count; +} PgStat_MsgSubscriptionXact; + /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict * ---------- @@ -757,6 +775,7 @@ typedef union PgStat_Msg PgStat_MsgDisconnect msg_disconnect; PgStat_MsgSubscriptionError msg_subscriptionerror; PgStat_MsgSubscriptionDrop msg_subscriptiondrop; + PgStat_MsgSubscriptionXact msg_subscriptionxact; } PgStat_Msg; @@ -981,6 +1000,9 @@ typedef struct PgStat_StatSubEntry PgStat_Counter apply_error_count; PgStat_Counter sync_error_count; + PgStat_Counter apply_commit_count; + PgStat_Counter apply_rollback_count; + TimestampTz stat_reset_timestamp; } PgStat_StatSubEntry; @@ -1177,6 +1199,7 @@ extern void pgstat_send_archiver(const char *xlog, bool failed); extern void pgstat_send_bgwriter(void); extern void pgstat_send_checkpointer(void); extern void pgstat_send_wal(bool force); +extern void pgstat_report_subscription_xact(bool force); /* ---------- * Support functions for the SQL-callable functions to diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 3c3f5f6..c5d01fa 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -68,6 +68,15 @@ typedef struct LogicalRepWorker TimestampTz reply_time; } LogicalRepWorker; + +typedef struct LogicalRepSubscriptionStats +{ + Oid subid; + + int64 apply_commit_count; + int64 apply_rollback_count; +} LogicalRepSubscriptionStats; + /* Main memory context for apply worker. Permanent during worker lifetime. */ extern MemoryContext ApplyContext; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index ac46856..a7ef303 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2076,9 +2076,11 @@ pg_stat_subscription_stats| SELECT ss.subid, s.subname, ss.apply_error_count, ss.sync_error_count, + ss.apply_commit_count, + ss.apply_rollback_count, ss.stats_reset FROM pg_subscription s, - LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, stats_reset); + LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, apply_commit_count, apply_rollback_count, stats_reset); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index d9b83f7..c5f7aec 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1401,6 +1401,7 @@ LogicalRepRelId LogicalRepRelMapEntry LogicalRepRelation LogicalRepRollbackPreparedTxnData +LogicalRepSubscriptionStats LogicalRepTupleData LogicalRepTyp LogicalRepWorker @@ -1947,6 +1948,7 @@ PgStat_MsgResetsubcounter PgStat_MsgSLRU PgStat_MsgSubscriptionDrop PgStat_MsgSubscriptionError +PgStat_MsgSubscriptionXact PgStat_MsgTabpurge PgStat_MsgTabstat PgStat_MsgTempFile -- 1.8.3.1