From bfabcb5ec86973562f5cc59a6a56c29da7ed9906 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 16 Jul 2021 23:10:22 +0900 Subject: [PATCH v21 1/3] Add a subscription worker statistics view "pg_stat_subscription_workers". This commit adds a new system view pg_stat_subscription_workers, that shows information about any errors which occur during application of logical replication changes as well as during performing initial table synchronization. The subscription statistics entries are removed when the corresponding subscription is removed. It also adds an SQL function pg_stat_reset_subscription_worker() to reset single subscription errors. The contents of this view can be used by an upcoming patch that skips the particular transaction that conflicts with the existing data on the subscriber. This view can be extended in the future to track other xact related statistics for subscription workers. --- doc/src/sgml/monitoring.sgml | 167 +++++++++ src/backend/catalog/system_functions.sql | 4 + src/backend/catalog/system_views.sql | 25 ++ src/backend/commands/subscriptioncmds.c | 15 +- src/backend/postmaster/pgstat.c | 379 +++++++++++++++++++- src/backend/replication/logical/worker.c | 54 ++- src/backend/utils/adt/pgstatfuncs.c | 134 ++++++- src/include/catalog/pg_proc.dat | 18 + src/include/pgstat.h | 105 +++++- src/test/regress/expected/rules.out | 20 ++ src/test/subscription/t/026_error_report.pl | 191 ++++++++++ src/tools/pgindent/typedefs.list | 4 + 12 files changed, 1095 insertions(+), 21 deletions(-) create mode 100644 src/test/subscription/t/026_error_report.pl diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3173ec2566..4ee97dbb2d 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -627,6 +627,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_subscription_workerspg_stat_subscription_workers + One row per subscription worker, showing statistics about errors + that occurred on that subscription worker. + See + pg_stat_subscription_workers for details. + + + @@ -3034,6 +3043,138 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + <structname>pg_stat_subscription_workers</structname> + + + pg_stat_subscription_workers + + + + The pg_stat_subscription_workers view will contain + one row per subscription error reported by workers applying logical + replication changes and workers handling the initial data copy of the + subscribed tables. The statistics entry is removed when the subscription + the worker is running on is removed. + + + + <structname>pg_stat_subscription_workers</structname> View + + + + + Column Type + + + Description + + + + + + + + subid oid + + + OID of the subscription + + + + + + subname name + + + Name of the subscription + + + + + + subrelid oid + + + OID of the relation that the worker is synchronizing; null for the + main apply worker + + + + + + relid oid + + + OID of the relation that the worker was processing when the + error occurred + + + + + + command text + + + Name of command being applied when the error occurred. This field + is always NULL if the error was reported during the initial data + copy. + + + + + + xid xid + + + Transaction ID of the publisher node being applied when the error + occurred. This field is always NULL if the error was reported + during the initial data copy. + + + + + + error_count uint8 + + + Number of consecutive times the error occurred + + + + + + error_message text + + + The error message + + + + + + first_error_time timestamp with time zone + + + Time at which the first error occurred + + + + + + last_error_time timestamp with time zone + + + Time at which the last error occurred + + + + + +
+ +
+ <structname>pg_stat_ssl</structname> @@ -5156,6 +5297,32 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i can be granted EXECUTE to run the function. + + + + + pg_stat_reset_subscription_worker + + pg_stat_reset_subscription_worker ( subid oid, relid oid ) + void + + + Resets the statistics of a single subscription worker running on the + subscription with subid shown in the + pg_stat_subscription_worker view. If the + argument relid is not NULL, + resets statistics of the subscription worker handling the initial data + copy of the relation with relid. Otherwise, + resets the subscription worker statistics of the main apply worker. + If the argument relid is omitted, resets the + statistics of all subscription workers running on the subscription + with subid. + + + This function is restricted to superusers by default, but other users + can be granted EXECUTE to run the function. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 54c93b16c4..cd1d649f9f 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -639,6 +639,10 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid) FROM public; + +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_worker(oid, oid) FROM public; + REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index eb560955cd..cb2f77cd1e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1261,3 +1261,28 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; + +CREATE VIEW pg_stat_subscription_workers AS + SELECT + w.subid, + s.subname, + w.subrelid, + w.relid, + w.command, + w.xid, + w.error_count, + w.error_message, + w.first_error_time, + w.last_error_time + FROM (SELECT + oid as subid, + NULL as relid + FROM pg_subscription + UNION ALL + SELECT + srsubid as subid, + srrelid as relid + FROM pg_subscription_rel + WHERE srsubstate <> 'r') sr, + LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w + JOIN pg_subscription s ON (w.subid = s.oid); diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index c47ba26369..18962b91e1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -32,6 +32,7 @@ #include "executor/executor.h" #include "miscadmin.h" #include "nodes/makefuncs.h" +#include "pgstat.h" #include "replication/logicallauncher.h" #include "replication/origin.h" #include "replication/slot.h" @@ -1204,7 +1205,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * Since dropping a replication slot is not transactional, the replication * slot stays dropped even if the transaction rolls back. So we cannot * run DROP SUBSCRIPTION inside a transaction block if dropping the - * replication slot. + * replication slot. Also, in this case, we report a message for dropping + * the subscription to the stats collector. * * XXX The command name should really be something like "DROP SUBSCRIPTION * of a subscription that is associated with a replication slot", but we @@ -1377,6 +1379,17 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) } PG_END_TRY(); + /* + * Send a message for dropping this subscription to the stats collector. We + * can safely report dropping the subscription statistics here if the + * subscription is associated with a replication slot since we cannot run + * DROP SUBSCRIPTION inside a transaction block. Subscription statistics will + * be removed later by (auto)vacuum either if it's not associated with a + * replication slot or if the message for dropping the subscription gets lost. + */ + if (slotname) + pgstat_report_subscription_drop(subid); + table_close(rel, NoLock); } diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 8c166e5e16..a620379957 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -41,6 +41,7 @@ #include "catalog/catalog.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" #include "common/ip.h" #include "executor/instrument.h" #include "libpq/libpq.h" @@ -106,6 +107,7 @@ #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 #define PGSTAT_REPLSLOT_HASH_SIZE 32 +#define PGSTAT_SUBWORKER_HASH_SIZE 32 /* ---------- @@ -320,10 +322,14 @@ NON_EXEC_STATIC void PgstatCollectorMain(int argc, char *argv[]) pg_attribute_no static PgStat_StatDBEntry *pgstat_get_db_entry(Oid databaseid, bool create); static PgStat_StatTabEntry *pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create); +static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, + Oid subid, Oid subrelid, + bool create); static void pgstat_write_statsfiles(bool permanent, bool allDbs); static void pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent); static HTAB *pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep); -static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, bool permanent); +static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, + HTAB *subworkerhash, bool permanent); static void backend_read_statsfile(void); static bool pgstat_write_statsfile_needed(void); @@ -335,6 +341,7 @@ static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, Timestamp static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); +static void pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg); static HTAB *pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid); static bool pgstat_should_report_connstat(void); static void pgstat_report_disconnect(Oid dboid); @@ -373,6 +380,8 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len); +static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1302,6 +1311,55 @@ pgstat_vacuum_stat(void) hash_destroy(htab); } + + /* + * Repeat for subscription workers. Similarly, we needn't bother + * in the common case where no function stats are being collected. + */ + if (dbentry->subworkers != NULL && + hash_get_num_entries(dbentry->subworkers) > 0) + { + PgStat_StatSubWorkerEntry *subwentry; + PgStat_MsgSubscriptionPurge spmsg; + + /* + * Read pg_subscription and make a list of OIDs of all existing + * subscriptions + */ + htab = pgstat_collect_oids(SubscriptionRelationId, Anum_pg_subscription_oid); + + pgstat_setheader(&spmsg.m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + spmsg.m_databaseid = MyDatabaseId; + spmsg.m_nentries = 0; + + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + CHECK_FOR_INTERRUPTS(); + + if (hash_search(htab, (void *) &(subwentry->key.subid), HASH_FIND, NULL) + != NULL) + continue; + + /* This subscription is dead, add the subid to the message */ + spmsg.m_subids[spmsg.m_nentries++] = subwentry->key.subid; + + /* + * If the message is full, send it out and reinitialize to empty + */ + if (spmsg.m_nentries >= PGSTAT_NUM_SUBSCRIPTIONPURGE) + { + pgstat_send_subscription_purge(&spmsg); + spmsg.m_nentries = 0; + } + } + + /* Send the rest of dead subscriptions */ + if (spmsg.m_nentries > 0) + pgstat_send_subscription_purge(&spmsg); + + hash_destroy(htab); + } } @@ -1474,7 +1532,8 @@ pgstat_reset_shared_counters(const char *target) * ---------- */ void -pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) +pgstat_reset_single_counter(Oid objoid, Oid subobjoid, + PgStat_Single_Reset_Type type) { PgStat_MsgResetsinglecounter msg; @@ -1485,6 +1544,7 @@ pgstat_reset_single_counter(Oid objoid, PgStat_Single_Reset_Type type) msg.m_databaseid = MyDatabaseId; msg.m_resettype = type; msg.m_objectid = objoid; + msg.m_subobjectid = subobjoid; pgstat_send(&msg, sizeof(msg)); } @@ -1869,6 +1929,53 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } +/* ---------- + * pgstat_report_subworker_error() - + * + * Tell the collector about the subscription worker error. + * ---------- + */ +void +pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, TransactionId xid, + const char *errmsg) +{ + PgStat_MsgSubWorkerError msg; + int len; + + Assert(strlen(errmsg) < PGSTAT_SUBWORKERERROR_MSGLEN); + len = offsetof(PgStat_MsgSubWorkerError, m_message[0]) + strlen(errmsg) + 1; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); + msg.m_databaseid = MyDatabaseId; + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_relid = relid; + msg.m_command = command; + msg.m_xid = xid; + msg.m_timestamp = GetCurrentTimestamp(); + strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); + + pgstat_send(&msg, len); +} + +/* ---------- + * pgstat_report_subscription_drop() - + * + * Tell the collector about dropping the subscription. + * ---------- + */ +void +pgstat_report_subscription_drop(Oid subid) +{ + PgStat_MsgSubscriptionPurge msg; + + msg.m_databaseid = MyDatabaseId; + msg.m_subids[0] = subid; + msg.m_nentries = 1; + pgstat_send_subscription_purge(&msg); +} + /* ---------- * pgstat_ping() - * @@ -2874,6 +2981,33 @@ pgstat_fetch_stat_funcentry(Oid func_id) return funcentry; } +/* + * --------- + * pgstat_fetch_stat_subworker_entry() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the subscription worker struct or NULL. + * --------- + */ +PgStat_StatSubWorkerEntry * +pgstat_fetch_stat_subworker_entry(Oid subid, Oid subrelid) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *wentry = NULL; + + /* Load the stats file if needed */ + backend_read_statsfile(); + + /* Look up database, then find the requested subscription worker stats */ + dbentry = pgstat_fetch_stat_dbentry(MyDatabaseId); + if (dbentry != NULL && dbentry->subworkers != NULL) + { + wentry = pgstat_get_subworker_entry(dbentry, subid, subrelid, + false); + } + + return wentry; +} /* * --------- @@ -3312,6 +3446,23 @@ pgstat_send_slru(void) } } +/* -------- + * pgstat_send_subscription_purge() - + * + * Send a subscription purge message to the collector + * -------- + */ +static void +pgstat_send_subscription_purge(PgStat_MsgSubscriptionPurge *msg) +{ + int len; + + len = offsetof(PgStat_MsgSubscriptionPurge, m_subids[0]) + + msg->m_nentries * sizeof(Oid); + + pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBSCRIPTIONPURGE); + pgstat_send(msg, len); +} /* ---------- * PgstatCollectorMain() - @@ -3568,6 +3719,14 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; + case PGSTAT_MTYPE_SUBSCRIPTIONPURGE: + pgstat_recv_subscription_purge(&msg.msg_subscriptionpurge, len); + break; + + case PGSTAT_MTYPE_SUBWORKERERROR: + pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + break; + default: break; } @@ -3613,7 +3772,8 @@ PgstatCollectorMain(int argc, char *argv[]) /* * Subroutine to clear stats in a database entry * - * Tables and functions hashes are initialized to empty. + * Tables, functions, and subscription workers hashes are initialized + * to empty. */ static void reset_dbentry_counters(PgStat_StatDBEntry *dbentry) @@ -3666,6 +3826,13 @@ reset_dbentry_counters(PgStat_StatDBEntry *dbentry) PGSTAT_FUNCTION_HASH_SIZE, &hash_ctl, HASH_ELEM | HASH_BLOBS); + + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); } /* @@ -3690,7 +3857,7 @@ pgstat_get_db_entry(Oid databaseid, bool create) /* * If not found, initialize the new one. This creates empty hash tables - * for tables and functions, too. + * for tables, functions, and subscription worker, too. */ if (!found) reset_dbentry_counters(result); @@ -3748,6 +3915,45 @@ pgstat_get_tab_entry(PgStat_StatDBEntry *dbentry, Oid tableoid, bool create) return result; } +/* ---------- + * pgstat_get_subworker_entry + * + * Return subscription worker entry with the given subscription OID and + * relation OID. If subrelid is InvalidOid, it returns an entry of the + * apply worker otherwise of the table sync worker associated with subrelid. + * If no subscription entry exists, initialize it, if the create parameter + * is true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubWorkerEntry * +pgstat_get_subworker_entry(PgStat_StatDBEntry *dbentry, Oid subid, Oid subrelid, + bool create) +{ + PgStat_StatSubWorkerEntry *subwentry; + PgStat_StatSubWorkerKey key; + bool found; + HASHACTION action = (create ? HASH_ENTER : HASH_FIND); + + key.subid = subid; + key.subrelid = subrelid; + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(dbentry->subworkers, + (void *) &key, + action, &found); + + /* If not found, initialize the new one */ + if (create && !found) + { + subwentry->relid = InvalidOid; + subwentry->command = 0; + subwentry->xid = InvalidTransactionId; + subwentry->error_count = 0; + subwentry->first_error_time = 0; + subwentry->last_error_time = 0; + subwentry->error_message[0] = '\0'; + } + + return subwentry; +} /* ---------- * pgstat_write_statsfiles() - @@ -3947,8 +4153,10 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) { HASH_SEQ_STATUS tstat; HASH_SEQ_STATUS fstat; + HASH_SEQ_STATUS sstat; PgStat_StatTabEntry *tabentry; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpout; int32 format_id; Oid dbid = dbentry->databaseid; @@ -4003,6 +4211,17 @@ pgstat_write_db_statsfile(PgStat_StatDBEntry *dbentry, bool permanent) (void) rc; /* we'll check for error with ferror */ } + /* + * Walk through the database's subscription worker stats table. + */ + hash_seq_init(&sstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(subwentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -4241,6 +4460,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) memcpy(dbentry, &dbbuf, sizeof(PgStat_StatDBEntry)); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * In the collector, disregard the timestamp we read from the @@ -4252,8 +4472,8 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) dbentry->stats_timestamp = 0; /* - * Don't create tables/functions hashtables for uninteresting - * databases. + * Don't create tables/functions/subworkers hashtables for + * uninteresting databases. */ if (onlydb != InvalidOid) { @@ -4278,6 +4498,14 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) &hash_ctl, HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + hash_ctl.hcxt = pgStatLocalContext; + dbentry->subworkers = hash_create("Per-database subscription worker", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* * If requested, read the data from the database-specific * file. Otherwise we just leave the hashtables empty. @@ -4286,6 +4514,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) pgstat_read_db_statsfile(dbentry->databaseid, dbentry->tables, dbentry->functions, + dbentry->subworkers, permanent); break; @@ -4363,19 +4592,21 @@ done: * As in pgstat_read_statsfiles, if the permanent file is requested, it is * removed after reading. * - * Note: this code has the ability to skip storing per-table or per-function - * data, if NULL is passed for the corresponding hashtable. That's not used - * at the moment though. + * Note: this code has the ability to skip storing per-table, per-function or + * per-subscription-worker data, if NULL is passed for the corresponding hashtable. + * That's not used at the moment though. * ---------- */ static void pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, - bool permanent) + HTAB *subworkerhash, bool permanent) { PgStat_StatTabEntry *tabentry; PgStat_StatTabEntry tabbuf; PgStat_StatFuncEntry funcbuf; PgStat_StatFuncEntry *funcentry; + PgStat_StatSubWorkerEntry subwbuf; + PgStat_StatSubWorkerEntry *subwentry; FILE *fpin; int32 format_id; bool found; @@ -4489,6 +4720,41 @@ pgstat_read_db_statsfile(Oid databaseid, HTAB *tabhash, HTAB *funchash, memcpy(funcentry, &funcbuf, sizeof(funcbuf)); break; + /* + * 'S' A PgStat_StatSubWorkerEntry struct describing + * subscription worker statistics. + */ + case 'S': + if (fread(&subwbuf, 1, sizeof(PgStat_StatSubWorkerEntry), + fpin) != sizeof(PgStat_StatSubWorkerEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* + * Skip if subscription worker data not wanted. + */ + if (subworkerhash == NULL) + break; + + subwentry = (PgStat_StatSubWorkerEntry *) hash_search(subworkerhash, + (void *) &subwbuf.key, + HASH_ENTER, &found); + + if (found) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + memcpy(subwentry, &subwbuf, sizeof(subwbuf)); + break; + /* * 'E' The EOF marker of a complete stats file. */ @@ -5162,6 +5428,8 @@ pgstat_recv_dropdb(PgStat_MsgDropdb *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); if (hash_search(pgStatDBHash, (void *) &dbid, @@ -5199,13 +5467,16 @@ pgstat_recv_resetcounter(PgStat_MsgResetcounter *msg, int len) hash_destroy(dbentry->tables); if (dbentry->functions != NULL) hash_destroy(dbentry->functions); + if (dbentry->subworkers != NULL) + hash_destroy(dbentry->subworkers); dbentry->tables = NULL; dbentry->functions = NULL; + dbentry->subworkers = NULL; /* * Reset database-level stats, too. This creates empty hash tables for - * tables and functions. + * tables, functions, and subscription workers. */ reset_dbentry_counters(dbentry); } @@ -5274,6 +5545,14 @@ pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len) else if (msg->m_resettype == RESET_FUNCTION) (void) hash_search(dbentry->functions, (void *) &(msg->m_objectid), HASH_REMOVE, NULL); + else if (msg->m_resettype == RESET_SUBWORKER) + { + PgStat_StatSubWorkerKey key; + + key.subid = msg->m_objectid; + key.subrelid = msg->m_subobjectid; + (void) hash_search(dbentry->subworkers, (void *) &key, HASH_REMOVE, NULL); + } } /* ---------- @@ -5816,6 +6095,84 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } } +/* ---------- + * pgstat_recv_subscription_purge() - + * + * Process a SUBSCRIPTIONPURGE message. + * ---------- + */ +static void +pgstat_recv_subscription_purge(PgStat_MsgSubscriptionPurge *msg, int len) +{ + HASH_SEQ_STATUS hstat; + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, false); + + /* No need to purge if we don't have even know the database */ + if (!dbentry || !dbentry->subworkers) + return; + + /* Remove all subscription worker statistics of the given subscriptions */ + hash_seq_init(&hstat, dbentry->subworkers); + while ((subwentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + for (int i = 0; i < msg->m_nentries; i++) + { + if (subwentry->key.subid == msg->m_subids[i]) + { + (void) hash_search(dbentry->subworkers, (void *) &(subwentry->key), + HASH_REMOVE, NULL); + break; + } + } + } +} + +/* ---------- + * pgstat_recv_subworker_error() - + * + * Process a SUBWORKERERROR message. + * ---------- + */ +static void +pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +{ + PgStat_StatDBEntry *dbentry; + PgStat_StatSubWorkerEntry *subwentry; + + dbentry = pgstat_get_db_entry(msg->m_databaseid, true); + + /* Get the subscription worker stats */ + subwentry = pgstat_get_subworker_entry(dbentry, msg->m_subid, + msg->m_subrelid, true); + Assert(subwentry); + + /* + * Update only the counter and last error timestamp if we received + * the same error again + */ + if (subwentry->relid == msg->m_relid && + subwentry->command == msg->m_command && + subwentry->xid == msg->m_xid && + strcmp(subwentry->error_message, msg->m_message) == 0) + { + subwentry->error_count++; + subwentry->last_error_time = msg->m_timestamp; + return; + } + + /* Otherwise, update the error information */ + subwentry->relid = msg->m_relid; + subwentry->command = msg->m_command; + subwentry->xid = msg->m_xid; + subwentry->error_count = 1; + subwentry->first_error_time = msg->m_timestamp; + subwentry->last_error_time = msg->m_timestamp; + strlcpy(subwentry->error_message, msg->m_message, PGSTAT_SUBWORKERERROR_MSGLEN); +} + /* ---------- * pgstat_write_statsfile_needed() - * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ae1b391bda..2e79302a48 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3332,6 +3332,7 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3432,8 +3433,30 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + /* + * Report the table sync error. There is no corresponding message + * type for table synchronization. + */ + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, /* message type */ + InvalidTransactionId, + errdata->message); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3551,7 +3574,32 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + apply_error_callback_arg.rel != NULL + ? apply_error_callback_arg.rel->localreloid + : InvalidOid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + errdata->message); + MemoryContextSwitchTo(ecxt); + } + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index ff5aedc99c..88bf0a6076 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -2171,7 +2171,7 @@ pg_stat_reset_single_table_counters(PG_FUNCTION_ARGS) { Oid taboid = PG_GETARG_OID(0); - pgstat_reset_single_counter(taboid, RESET_TABLE); + pgstat_reset_single_counter(taboid, InvalidOid, RESET_TABLE); PG_RETURN_VOID(); } @@ -2181,7 +2181,18 @@ pg_stat_reset_single_function_counters(PG_FUNCTION_ARGS) { Oid funcoid = PG_GETARG_OID(0); - pgstat_reset_single_counter(funcoid, RESET_FUNCTION); + pgstat_reset_single_counter(funcoid, InvalidOid, RESET_FUNCTION); + + PG_RETURN_VOID(); +} + +Datum +pg_stat_reset_subscription_worker_subrel(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + Oid relid = PG_ARGISNULL(1) ? InvalidOid : PG_GETARG_OID(1); + + pgstat_reset_single_counter(subid, relid, RESET_SUBWORKER); PG_RETURN_VOID(); } @@ -2239,6 +2250,21 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* Reset all subscription worker stats associated with the given subscription */ +Datum +pg_stat_reset_subscription_worker_sub(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + + /* + * Use subscription drop message to remove statistics of all subscription + * workers. + */ + pgstat_report_subscription_drop(subid); + + PG_RETURN_VOID(); +} + Datum pg_stat_get_archiver(PG_FUNCTION_ARGS) { @@ -2379,3 +2405,107 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* + * Get the subscription worker statistics for the given subscription + * (and relation). + */ +Datum +pg_stat_get_subscription_worker(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_WORKER_COLS 9 + Oid subid = PG_GETARG_OID(0); + Oid subrelid; + TupleDesc tupdesc; + Datum values[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_WORKER_COLS]; + PgStat_StatSubWorkerEntry *wentry; + int i; + + if (PG_ARGISNULL(1)) + subrelid = InvalidOid; + else + subrelid = PG_GETARG_OID(1); + + /* Get subscription worker stats */ + wentry = pgstat_fetch_stat_subworker_entry(subid, subrelid); + + /* Return NULL if there is no worker statistics */ + if (wentry == NULL) + PG_RETURN_NULL(); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_WORKER_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "error_count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "first_error_time", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "last_error_time", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + i = 0; + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* subrelid */ + if (OidIsValid(subrelid)) + values[i++] = ObjectIdGetDatum(subrelid); + else + nulls[i++] = true; + + /* relid */ + if (OidIsValid(wentry->relid)) + values[i++] = ObjectIdGetDatum(wentry->relid); + else + nulls[i++] = true; + + /* command */ + if (wentry->command != 0) + values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command)); + else + nulls[i++] = true; + + /* xid */ + if (TransactionIdIsValid(wentry->xid)) + values[i++] = TransactionIdGetDatum(wentry->xid); + else + nulls[i++] = true; + + /* error_count */ + values[i++] = Int64GetDatum(wentry->error_count); + + /* error_message */ + values[i++] = CStringGetTextDatum(wentry->error_message); + + /* first_error_time */ + if (wentry->first_error_time != 0) + values[i++] = TimestampTzGetDatum(wentry->first_error_time); + else + nulls[i++] = true; + + /* last_error_time */ + if (wentry->last_error_time != 0) + values[i++] = TimestampTzGetDatum(wentry->last_error_time); + else + nulls[i++] = true; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d068d6532e..50e1c7b68d 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5385,6 +5385,14 @@ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, +{ oid => '8523', descr => 'statistics: information about subscription worker', + proname => 'pg_stat_get_subscription_worker', prorows => '1', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'oid oid', + proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}', + proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,error_count,error_message,first_error_time,last_error_time}', + prosrc => 'pg_stat_get_subscription_worker' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5772,6 +5780,16 @@ proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v', prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' }, +{ oid => '8524', + descr => 'statistics: reset collected statistics for a single subscription worker', + proname => 'pg_stat_reset_subscription_worker', proisstrict => 'f', + provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid', + prosrc => 'pg_stat_reset_subscription_worker_subrel' }, +{ oid => '8525', + descr => 'statistics: reset all collected statistics for a single subscription', + proname => 'pg_stat_reset_subscription_worker', + provolatile => 'v', prorettype => 'void', proargtypes => 'oid', + prosrc => 'pg_stat_reset_subscription_worker_sub' }, { oid => '3163', descr => 'current trigger depth', proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index bcd3588ea2..6643938b55 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/logicalproto.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/hsearch.h" @@ -83,6 +84,8 @@ typedef enum StatMsgType PGSTAT_MTYPE_REPLSLOT, PGSTAT_MTYPE_CONNECT, PGSTAT_MTYPE_DISCONNECT, + PGSTAT_MTYPE_SUBSCRIPTIONPURGE, + PGSTAT_MTYPE_SUBWORKERERROR, } StatMsgType; /* ---------- @@ -145,7 +148,8 @@ typedef enum PgStat_Shared_Reset_Target typedef enum PgStat_Single_Reset_Type { RESET_TABLE, - RESET_FUNCTION + RESET_FUNCTION, + RESET_SUBWORKER } PgStat_Single_Reset_Type; /* ------------------------------------------------------------ @@ -364,6 +368,7 @@ typedef struct PgStat_MsgResetsinglecounter Oid m_databaseid; PgStat_Single_Reset_Type m_resettype; Oid m_objectid; + Oid m_subobjectid; } PgStat_MsgResetsinglecounter; /* ---------- @@ -536,6 +541,53 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; +/* ---------- + * PgStat_MsgSubscriptionPurge Sent by the backend and autovacuum to tell the + * collector about the dead subscriptions. + * ---------- + */ +#define PGSTAT_NUM_SUBSCRIPTIONPURGE \ + ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid)) + +typedef struct PgStat_MsgSubscriptionPurge +{ + PgStat_MsgHdr m_hdr; + Oid m_databaseid; + int m_nentries; + Oid m_subids[PGSTAT_NUM_SUBSCRIPTIONPURGE]; +} PgStat_MsgSubscriptionPurge; + +/* ---------- + * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync worker to + * report the error occurred during logical replication. + * ---------- + */ +#define PGSTAT_SUBWORKERERROR_MSGLEN 256 +typedef struct PgStat_MsgSubWorkerError +{ + PgStat_MsgHdr m_hdr; + + /* + * m_subid and m_subrelid are used to determine the subscription and the + * reporter of the error. m_subrelid is InvalidOid if reported by an apply + * worker otherwise reported by a table sync worker. + */ + Oid m_databaseid; + Oid m_subid; + Oid m_subrelid; + + /* + * Oids of the database and the table that the reporter was actually + * processing. m_relid can be InvalidOid if an error occurred during + * worker applying a non-data-modification message such as RELATION. + */ + Oid m_relid; + + LogicalRepMsgType m_command; + TransactionId m_xid; + TimestampTz m_timestamp; + char m_message[PGSTAT_SUBWORKERERROR_MSGLEN]; +} PgStat_MsgSubWorkerError; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -714,6 +766,8 @@ typedef union PgStat_Msg PgStat_MsgReplSlot msg_replslot; PgStat_MsgConnect msg_connect; PgStat_MsgDisconnect msg_disconnect; + PgStat_MsgSubscriptionPurge msg_subscriptionpurge; + PgStat_MsgSubWorkerError msg_subworkererror; } PgStat_Msg; @@ -768,11 +822,16 @@ typedef struct PgStat_StatDBEntry TimestampTz stats_timestamp; /* time of db stats file update */ /* - * tables and functions must be last in the struct, because we don't write - * the pointers out to the stats file. + * tables, functions, and subscription workers must be last in the struct, + * because we don't write the pointers out to the stats file. + * + * subworker is the hash table of PgStat_StatSubWorkerEntry which stores + * statistics of logical replication workers: apply worker + * and table sync worker. */ HTAB *tables; HTAB *functions; + HTAB *subworkers; } PgStat_StatDBEntry; @@ -929,6 +988,36 @@ typedef struct PgStat_StatReplSlotEntry TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; +/* The lookup key for subscription worker hash table */ +typedef struct PgStat_StatSubWorkerKey +{ + Oid subid; + Oid subrelid; /* InvalidOid for apply worker, otherwise for + * table sync worker */ +} PgStat_StatSubWorkerKey; + +/* + * Logical replication apply worker and table sync worker statistics kept in the + * stats collector. + */ +typedef struct PgStat_StatSubWorkerEntry +{ + PgStat_StatSubWorkerKey key; /* hash key (must be first) */ + + /* + * Subscription worker error statistics representing an error that + * occurred during application of logical replication or the initial table + * synchronization. + */ + Oid dbid; + Oid relid; + LogicalRepMsgType command; + TransactionId xid; + PgStat_Counter error_count; + TimestampTz first_error_time; + TimestampTz last_error_time; + char error_message[PGSTAT_SUBWORKERERROR_MSGLEN]; +} PgStat_StatSubWorkerEntry; /* * Working state needed to accumulate per-function-call timing statistics. @@ -1019,9 +1108,11 @@ extern void pgstat_drop_database(Oid databaseid); extern void pgstat_clear_snapshot(void); extern void pgstat_reset_counters(void); extern void pgstat_reset_shared_counters(const char *); -extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); +extern void pgstat_reset_single_counter(Oid objectid, Oid subobjectid, + PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); +extern void pgstat_reset_subworker_stats(Oid subid, Oid subrelid, bool allstats); extern void pgstat_report_connect(Oid dboid); extern void pgstat_report_autovac(Oid dboid); @@ -1038,6 +1129,10 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); +extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, + TransactionId xid, const char *errmsg); +extern void pgstat_report_subscription_drop(Oid subid); extern void pgstat_initialize(void); @@ -1129,6 +1224,8 @@ extern void pgstat_send_wal(bool force); extern PgStat_StatDBEntry *pgstat_fetch_stat_dbentry(Oid dbid); extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid); extern PgStat_StatFuncEntry *pgstat_fetch_stat_funcentry(Oid funcid); +extern PgStat_StatSubWorkerEntry *pgstat_fetch_stat_subworker_entry(Oid subid, + Oid subrelid); extern PgStat_ArchiverStats *pgstat_fetch_stat_archiver(void); extern PgStat_BgWriterStats *pgstat_fetch_stat_bgwriter(void); extern PgStat_CheckpointerStats *pgstat_fetch_stat_checkpointer(void); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2fa00a3c29..cb6da2c140 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2094,6 +2094,26 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_time FROM (pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); +pg_stat_subscription_workers| SELECT w.subid, + s.subname, + w.subrelid, + w.relid, + w.command, + w.xid, + w.error_count, + w.error_message, + w.first_error_time, + w.last_error_time + FROM ( SELECT pg_subscription.oid AS subid, + NULL::oid AS relid + FROM pg_subscription + UNION ALL + SELECT pg_subscription_rel.srsubid AS subid, + pg_subscription_rel.srrelid AS relid + FROM pg_subscription_rel + WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr, + (LATERAL pg_stat_get_subscription_worker(sr.subid, sr.relid) w(subid, subrelid, relid, command, xid, error_count, error_message, first_error_time, last_error_time) + JOIN pg_subscription s ON ((w.subid = s.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/subscription/t/026_error_report.pl b/src/test/subscription/t/026_error_report.pl new file mode 100644 index 0000000000..1227654774 --- /dev/null +++ b/src/test/subscription/t/026_error_report.pl @@ -0,0 +1,191 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for subscription error reporting. +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More tests => 5; + +# Test if the error reported on pg_stat_subscription_workers view is expected. +sub test_subscription_error +{ + my ($node, $relname, $xid, $expected_error, $msg) = @_; + + my $check_sql = qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_workers +WHERE relid = '$relname'::regclass]; + $check_sql .= " AND xid = '$xid'::xid;" if $xid ne ''; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', $check_sql, +) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT subname, command, relid::regclass, error_count > 0 +FROM pg_stat_subscription_workers +WHERE relid = '$relname'::regclass; +]); + is($result, $expected_error, $msg); +} + +# Create publisher node. +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq[ +logical_decoding_work_mem = 64kB +]); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# The subscriber will enter an infinite error loop, so we don't want +# to overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + qq[ +wal_retrieve_retry_interval = 5s +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql( + 'postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Setup publications. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_workers"); +is($result, qq(0), 'check no subscription error'); + +# Create subscriptions. The table sync for test_tab2 on tap_sub will enter into +# infinite error loop due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = off, two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate in ('r', 's') +]) or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data are copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violation +# of the unique constraint on test_tab1. +my $xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab1 VALUES (1); +SELECT pg_current_xact_id()::xid; +COMMIT; +]); +test_subscription_error($node_subscriber, 'test_tab1', $xid, + qq(tap_sub|INSERT|test_tab1|t), + 'check the error reported by the apply worker'); + +# Check the table sync worker's error in the view. +test_subscription_error($node_subscriber, 'test_tab2', '', + qq(tap_sub||test_tab2|t), + 'check the error reported by the table sync worker'); + +# Test for resetting subscription worker statistics. +# Truncate test_tab1 and test_tab2 so that logical replication can continue. +$node_subscriber->safe_psql( + 'postgres', + "TRUNCATE test_tab1, test_tab2;"); + +# Wait for the data to be replicated. +$node_subscriber->poll_query_until( + 'postgres', + "SELECT count(1) > 0 FROM test_tab1"); +$node_subscriber->poll_query_until( + 'postgres', + "SELECT count(1) > 0 FROM test_tab2"); + +# Reset stats of all subscription workers running on tap_sub. +$node_subscriber->safe_psql( + 'postgres', + qq[ +SELECT pg_stat_reset_subscription_worker(sw.subid) +FROM pg_stat_subscription_workers sw + JOIN pg_subscription s ON s.oid = sw.subid +WHERE + s.subname = 'tap_sub'; +]); + +# Wait for stats of all subscription workers running on tap_sub to be reset. +$node_subscriber->poll_query_until( + 'postgres', + qq[ +SELECT count(1) = 0 +FROM pg_stat_subscription_workers sw + JOIN pg_subscription s ON s.oid = sw.subid +WHERE + s.subname = 'tap_sub'; +]); + +# Check if the view doesn't show any entries after dropping the subscriptions. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql( + 'postgres', + "SELECT count(1) FROM pg_stat_subscription_workers"); +is($result, q(0), 'no error after dropping subscription'); + +$node_subscriber->stop('fast'); +$node_publisher->stop('fast'); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index da6ac8ed83..f41ef0d2bc 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1943,6 +1943,8 @@ PgStat_MsgResetsharedcounter PgStat_MsgResetsinglecounter PgStat_MsgResetslrucounter PgStat_MsgSLRU +PgStat_MsgSubscriptionPurge +PgStat_MsgSubWorkerError PgStat_MsgTabpurge PgStat_MsgTabstat PgStat_MsgTempFile @@ -1954,6 +1956,8 @@ PgStat_Single_Reset_Type PgStat_StatDBEntry PgStat_StatFuncEntry PgStat_StatReplSlotEntry +PgStat_StatSubWorkerEntry +PgStat_StatSubWorkerKey PgStat_StatTabEntry PgStat_SubXactStatus PgStat_TableCounts -- 2.24.3 (Apple Git-128)