From 3c3ab0bd589aef7ba165f022c7adfe018fa65cec Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 16 Jul 2021 23:10:22 +0900 Subject: [PATCH v15 1/3] Add a subscription errors statistics view "pg_stat_subscription_errors". This commit adds a new system view pg_stat_logical_replication_errors, that shows information about any errors which occur during application of logical replication changes as well as during performing initial table synchronization. The subscription error entries are removed by autovacuum workers after table synchronization completes in table sync worker cases and after dropping the subscription in apply worker cases. It also adds an SQL function pg_stat_reset_subscription_error() to reset a single subscription error. --- doc/src/sgml/monitoring.sgml | 160 +++++ src/backend/catalog/system_functions.sql | 2 + src/backend/catalog/system_views.sql | 25 + src/backend/postmaster/pgstat.c | 609 ++++++++++++++++++++ src/backend/replication/logical/worker.c | 51 +- src/backend/utils/adt/pgstatfuncs.c | 121 ++++ src/include/catalog/pg_proc.dat | 13 + src/include/pgstat.h | 121 ++++ src/test/regress/expected/rules.out | 20 + src/test/subscription/t/025_error_report.pl | 154 +++++ src/tools/pgindent/typedefs.list | 7 + 11 files changed, 1280 insertions(+), 3 deletions(-) create mode 100644 src/test/subscription/t/025_error_report.pl diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 2cd8920645..6c57cd61d5 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -346,6 +346,15 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser + + pg_stat_subscription_errorspg_stat_subscription_errors + One row per error that occurred on subscription, showing information about + each subscription error. + See + pg_stat_subscription_errors for details. + + + pg_stat_sslpg_stat_ssl One row per connection (regular and replication), showing information about @@ -3050,6 +3059,135 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i + + <structname>pg_stat_subscription_errors</structname> + + + pg_stat_subscription_errors + + + + The pg_stat_subscription_errors view will contain one + row per subscription error reported by workers applying logical replication + changes and workers handling the initial data copy of the subscribed tables. + + + + <structname>pg_stat_subscription_errors</structname> View + + + + + Column Type + + + Description + + + + + + + + subid oid + + + OID of the subscription + + + + + + subname name + + + Name of the subscription + + + + + + subrelid oid + + + OID of the relation that the worker is synchronizing; NULL for the + main apply worker + + + + + + relid oid + + + OID of the relation that the worker was processing when the + error occurred + + + + + + command text + + + Name of command being applied when the error occurred. This field + is always NULL if the error was reported during the initial data + copy. + + + + + + xid xid + + + Transaction ID of the publisher node being applied when the error + occurred. This field is always NULL if the error was reported + during the initial data copy. + + + + + + count uint8 + + + Number of consecutive times the error occurred + + + + + + error_message text + + + Message of the error + + + + + + last_failed_time timestamp with time zone + + + Time at which the last error occurred + + + + + + stats_reset timestamp with time zone + + + Time at which these statistics were last reset + + + + +
+ +
+ <structname>pg_stat_ssl</structname> @@ -5172,6 +5310,28 @@ SELECT pid, wait_event_type, wait_event FROM pg_stat_activity WHERE wait_event i can be granted EXECUTE to run the function.
+ + + + + pg_stat_reset_subscription_error + + pg_stat_reset_subscription_error ( subid oid, relid oid ) + void + + + Resets statistics of a single subscription error. If + the argument relid is not NULL, + resets error statistics of the tablesync worker for + the relation with relid. Otherwise, resets the + error statistics of the apply worker running on the + subscription with subid. + + + This function is restricted to superusers by default, but other users + can be granted EXECUTE to run the function. + + diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index a416e94d37..c9aa6f04d3 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -639,6 +639,8 @@ REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM publ REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_stat_reset_subscription_error(oid, oid) FROM public; + REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public; REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public; diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 55f6e3711d..6e891b960e 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1257,3 +1257,28 @@ REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; + +CREATE VIEW pg_stat_subscription_errors AS + SELECT + e.subid, + s.subname, + e.subrelid, + e.relid, + e.command, + e.xid, + e.count, + e.error_message, + e.last_failed_time, + e.stats_reset + FROM (SELECT + oid as subid, + NULL as relid + FROM pg_subscription + UNION ALL + SELECT + srsubid as subid, + srrelid as relid + FROM pg_subscription_rel + WHERE srsubstate <> 'r') sr, + LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e + JOIN pg_subscription s ON (e.subid = s.oid); diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index b7d0fbaefd..7a5615c1df 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -41,6 +41,8 @@ #include "catalog/catalog.h" #include "catalog/pg_database.h" #include "catalog/pg_proc.h" +#include "catalog/pg_subscription.h" +#include "catalog/pg_subscription_rel.h" #include "common/ip.h" #include "executor/instrument.h" #include "libpq/libpq.h" @@ -106,6 +108,7 @@ #define PGSTAT_TAB_HASH_SIZE 512 #define PGSTAT_FUNCTION_HASH_SIZE 512 #define PGSTAT_REPLSLOT_HASH_SIZE 32 +#define PGSTAT_SUBWORKER_HASH_SIZE 32 /* ---------- @@ -282,6 +285,7 @@ static PgStat_GlobalStats globalStats; static PgStat_WalStats walStats; static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS]; static HTAB *replSlotStatHash = NULL; +static HTAB *subWorkerStatHash = NULL; /* * List of OIDs of databases we need to write out. If an entry is InvalidOid, @@ -332,6 +336,13 @@ static bool pgstat_db_requested(Oid databaseid); static PgStat_StatReplSlotEntry *pgstat_get_replslot_entry(NameData name, bool create_it); static void pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotstats, TimestampTz ts); +static PgStat_StatSubWorkerEntry *pgstat_get_subworker_entry(Oid subid, Oid subrelid, + bool create); +static void pgstat_reset_subworker_error(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts); +static void pgstat_report_subworker_purge(PgStat_MsgSubWorkerPurge *msg); +static void pgstat_report_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg); +static void pgstat_vacuum_subworker_stats(void); + static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg, TimestampTz now); static void pgstat_send_funcstats(void); static void pgstat_send_slru(void); @@ -356,6 +367,7 @@ static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, in static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len); static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len); static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len); +static void pgstat_recv_resetsubworkererror(PgStat_MsgResetsubworkererror *msg, int len); static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len); static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len); static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len); @@ -373,6 +385,10 @@ static void pgstat_recv_connect(PgStat_MsgConnect *msg, int len); static void pgstat_recv_disconnect(PgStat_MsgDisconnect *msg, int len); static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len); static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len); +static void pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len); +static void pgstat_recv_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg, + int len); +static void pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len); /* ------------------------------------------------------------ * Public functions called from postmaster follow @@ -1178,6 +1194,10 @@ pgstat_vacuum_stat(void) } } + /* Cleanup the dead subscription workers statistics */ + if (subWorkerStatHash) + pgstat_vacuum_subworker_stats(); + /* * Lookup our own database entry; if not found, nothing more to do. */ @@ -1355,6 +1375,218 @@ pgstat_collect_oids(Oid catalogid, AttrNumber anum_oid) } +/* PgStat_StatSubWorkerEntry comparator, sorting subid and subrelid */ +static int +subworker_stats_comparator(const ListCell *a, const ListCell *b) +{ + PgStat_StatSubWorkerEntry *entry1 = (PgStat_StatSubWorkerEntry *) lfirst(a); + PgStat_StatSubWorkerEntry *entry2 = (PgStat_StatSubWorkerEntry *) lfirst(b); + int ret; + + ret = oid_cmp(&entry1->key.subid, &entry2->key.subid); + if (ret != 0) + return ret; + + return oid_cmp(&entry1->key.subrelid, &entry2->key.subrelid); +} + +/* ---------- + * pgstat_vacuum_subworker_stat() - + * + * This is a subroutine for pgstat_vacuum_stat to tell the collector about + * the all the dead subscription worker statistics. + */ +static void +pgstat_vacuum_subworker_stats(void) +{ + struct subid_dbid_mapping + { + Oid subid; + Oid dbid; + }; + HTAB *subdbmap; + HASHCTL hash_ctl; + HASH_SEQ_STATUS hstat; + Relation rel; + HeapTuple tup; + Snapshot snapshot; + TupleDesc desc; + TableScanDesc scan; + PgStat_MsgSubWorkerPurge wpmsg; + PgStat_MsgSubWorkerErrorPurge epmsg; + PgStat_StatSubWorkerEntry *wentry; + List *subworker_stats = NIL; + List *not_ready_rels = NIL; + ListCell *lc1; + + /* Create a map for mapping subscriptoin OID and database OID */ + hash_ctl.keysize = sizeof(Oid); + hash_ctl.entrysize = sizeof(struct subid_dbid_mapping); + subdbmap = hash_create("Temporary map of subscription and database OIDs", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + + rel = table_open(SubscriptionRelationId, AccessShareLock); + snapshot = RegisterSnapshot(GetLatestSnapshot()); + scan = table_beginscan(rel, snapshot, 0, NULL); + desc = RelationGetDescr(rel); + + /* Register entries into the hash table */ + while (HeapTupleIsValid(tup = heap_getnext(scan, ForwardScanDirection))) + { + struct subid_dbid_mapping buf; + struct subid_dbid_mapping *entry; + bool isnull; + + CHECK_FOR_INTERRUPTS(); + + buf.subid = heap_getattr(tup, Anum_pg_subscription_oid, desc, &isnull); + Assert(!isnull); + + buf.dbid = heap_getattr(tup, Anum_pg_subscription_subdbid, desc, &isnull); + Assert(!isnull); + + entry = hash_search(subdbmap, (void *) &(buf.subid), HASH_ENTER, NULL); + entry->dbid = buf.dbid; + } + table_endscan(scan); + UnregisterSnapshot(snapshot); + table_close(rel, AccessShareLock); + + /* Build the list of worker stats and sort it by subid and relid */ + hash_seq_init(&hstat, subWorkerStatHash); + while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + subworker_stats = lappend(subworker_stats, wentry); + list_sort(subworker_stats, subworker_stats_comparator); + + wpmsg.m_nentries = 0; + epmsg.m_nentries = 0; + epmsg.m_subid = InvalidOid; + + /* + * Search for all the dead subscriptions and unnecessary table sync worker + * entries in stats hashtable and tell the stats collector to drop them. + */ + foreach(lc1, subworker_stats) + { + struct subid_dbid_mapping *hentry; + ListCell *lc2; + bool keep_it = false; + + wentry = (PgStat_StatSubWorkerEntry *) lfirst(lc1); + + CHECK_FOR_INTERRUPTS(); + + /* Skip if we already registered this subscription to purge */ + if (wpmsg.m_nentries > 0 && + wpmsg.m_subids[wpmsg.m_nentries - 1] == wentry->key.subid) + continue; + + /* Check if the subscription is dead */ + if ((hentry = hash_search(subdbmap, (void *) &(wentry->key.subid), + HASH_FIND, NULL)) == NULL) + { + /* This subscription is dead, add the subid to the message */ + wpmsg.m_subids[wpmsg.m_nentries++] = wentry->key.subid; + + /* + * If the message is full, send it out and reinitialize to empty + */ + if (wpmsg.m_nentries >= PGSTAT_NUM_SUBWORKERPURGE) + { + pgstat_report_subworker_purge(&wpmsg); + wpmsg.m_nentries = 0; + } + + continue; + } + + /* + * This subscription is live. The next step is that we search errors + * of the table sync workers who are already in sync state. These + * errors should be removed. + */ + + /* We remove only table sync errors in the current database */ + if (hentry->dbid != MyDatabaseId) + continue; + + /* Skip if it's an apply worker error */ + if (!OidIsValid(wentry->key.subrelid)) + continue; + + if (epmsg.m_subid != wentry->key.subid) + { + /* + * Send the purge message for previously collected table sync + * errors, if there is. + */ + if (epmsg.m_nentries > 0) + { + pgstat_report_subworker_error_purge(&epmsg); + epmsg.m_nentries = 0; + } + + /* Clean up if necessary */ + if (not_ready_rels != NIL) + list_free_deep(not_ready_rels); + + /* Refresh the not-ready-relations of this subscription */ + not_ready_rels = GetSubscriptionNotReadyRelations(wentry->key.subid); + + /* Prepare the error purge message for the subscription */ + epmsg.m_subid = wentry->key.subid; + } + + /* + * Check if the table is still being synchronized or no longer belongs + * to the subscription. + */ + foreach(lc2, not_ready_rels) + { + SubscriptionRelState *relstate = (SubscriptionRelState *) lfirst(lc2); + + if (relstate->relid == wentry->key.subrelid) + { + /* This table is still being synchronized, so keep it */ + keep_it = true; + break; + } + } + + if (keep_it) + continue; + + /* Add the table to the error purge message */ + epmsg.m_relids[epmsg.m_nentries++] = wentry->key.subrelid; + + /* + * If the error purge message is full, send it out and reinitialize to + * empty + */ + if (epmsg.m_nentries >= PGSTAT_NUM_SUBWORKERERRORPURGE) + { + pgstat_report_subworker_error_purge(&epmsg); + epmsg.m_nentries = 0; + } + } + + /* Send the rest of dead subscriptions */ + if (wpmsg.m_nentries > 0) + pgstat_report_subworker_purge(&wpmsg); + + /* Send the rest of dead error entries */ + if (epmsg.m_nentries > 0) + pgstat_report_subworker_error_purge(&epmsg); + + /* Clean up */ + if (not_ready_rels != NIL) + list_free_deep(not_ready_rels); + + hash_destroy(subdbmap); +} + /* ---------- * pgstat_drop_database() - * @@ -1544,6 +1776,24 @@ pgstat_reset_replslot_counter(const char *name) pgstat_send(&msg, sizeof(msg)); } +/* ---------- + * pgstat_reset_subscription_error_stats() - + * + * Tell the collector to reset the subscription worker error. + * ---------- + */ +void +pgstat_reset_subworker_error_stats(Oid subid, Oid subrelid) +{ + PgStat_MsgResetsubworkererror msg; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSUBWORKERERROR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + + pgstat_send(&msg, sizeof(PgStat_MsgResetsubworkererror)); +} + /* ---------- * pgstat_report_autovac() - * @@ -1804,6 +2054,47 @@ pgstat_should_report_connstat(void) return MyBackendType == B_BACKEND; } +/* -------- + * pgstat_report_subworker_purge() - + * + * Tell the collector about dead subscriptions. + * -------- + */ +static void +pgstat_report_subworker_purge(PgStat_MsgSubWorkerPurge *msg) +{ + int len; + + Assert(msg->m_nentries > 0); + + len = offsetof(PgStat_MsgSubWorkerPurge, m_subids[0]) + + msg->m_nentries * sizeof(Oid); + + pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBWORKERPURGE); + pgstat_send(msg, len); +} + +/* -------- + * pgstat_report_subworker_error_purge() - + * + * Tell the collector to remove table sync errors. + * -------- + */ +static void +pgstat_report_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg) +{ + int len; + + Assert(OidIsValid(msg->m_subid)); + Assert(msg->m_nentries > 0); + + len = offsetof(PgStat_MsgSubWorkerErrorPurge, m_relids[0]) + + msg->m_nentries * sizeof(Oid); + + pgstat_setheader(&msg->m_hdr, PGSTAT_MTYPE_SUBWORKERERRORPURGE); + pgstat_send(msg, len); +} + /* ---------- * pgstat_report_replslot() - * @@ -1869,6 +2160,35 @@ pgstat_report_replslot_drop(const char *slotname) pgstat_send(&msg, sizeof(PgStat_MsgReplSlot)); } +/* ---------- + * pgstat_report_subworker_error() - + * + * Tell the collector about the subscription worker error. + * ---------- + */ +void +pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, TransactionId xid, + const char *errmsg) +{ + PgStat_MsgSubWorkerError msg; + int len; + + Assert(strlen(errmsg) < PGSTAT_SUBWORKERERROR_MSGLEN); + len = offsetof(PgStat_MsgSubWorkerError, m_message[0]) + strlen(errmsg) + 1; + + pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_SUBWORKERERROR); + msg.m_subid = subid; + msg.m_subrelid = subrelid; + msg.m_relid = relid; + msg.m_command = command; + msg.m_xid = xid; + msg.m_timestamp = GetCurrentTimestamp(); + strlcpy(msg.m_message, errmsg, PGSTAT_SUBWORKERERROR_MSGLEN); + + pgstat_send(&msg, len); +} + /* ---------- * pgstat_ping() - * @@ -2987,6 +3307,22 @@ pgstat_fetch_replslot(NameData slotname) return pgstat_get_replslot_entry(slotname, false); } +/* + * --------- + * pgstat_fetch_subworker() - + * + * Support function for the SQL-callable pgstat* functions. Returns + * a pointer to the subscription worker struct. + * --------- + */ +PgStat_StatSubWorkerEntry * +pgstat_fetch_subworker(Oid subid, Oid subrelid) +{ + backend_read_statsfile(); + + return pgstat_get_subworker_entry(subid, subrelid, false); +} + /* * Shut down a single backend's statistics reporting at process exit. * @@ -3498,6 +3834,11 @@ PgstatCollectorMain(int argc, char *argv[]) len); break; + case PGSTAT_MTYPE_RESETSUBWORKERERROR: + pgstat_recv_resetsubworkererror(&msg.msg_resetsubworkererror, + len); + break; + case PGSTAT_MTYPE_AUTOVAC_START: pgstat_recv_autovac(&msg.msg_autovacuum_start, len); break; @@ -3568,6 +3909,19 @@ PgstatCollectorMain(int argc, char *argv[]) pgstat_recv_disconnect(&msg.msg_disconnect, len); break; + case PGSTAT_MTYPE_SUBWORKERERROR: + pgstat_recv_subworker_error(&msg.msg_subworkererror, len); + break; + + case PGSTAT_MTYPE_SUBWORKERERRORPURGE: + pgstat_recv_subworker_error_purge(&msg.msg_subworkererrorpurge, + len); + break; + + case PGSTAT_MTYPE_SUBWORKERPURGE: + pgstat_recv_subworker_purge(&msg.msg_subworkerpurge, len); + break; + default: break; } @@ -3868,6 +4222,22 @@ pgstat_write_statsfiles(bool permanent, bool allDbs) } } + /* + * Write subscription worker stats struct + */ + if (subWorkerStatHash) + { + PgStat_StatSubWorkerEntry *wentry; + + hash_seq_init(&hstat, subWorkerStatHash); + while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&hstat)) != NULL) + { + fputc('S', fpout); + rc = fwrite(wentry, sizeof(PgStat_StatSubWorkerEntry), 1, fpout); + (void) rc; /* we'll check for error with ferror */ + } + } + /* * No more output to be done. Close the temp file and replace the old * pgstat.stat with it. The ferror() check replaces testing for error @@ -4329,6 +4699,48 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep) break; } + /* + * 'S' A PgStat_StatSubWorkerEntry struct describing a + * subscription worker statistics. + */ + case 'S': + { + PgStat_StatSubWorkerEntry wbuf; + PgStat_StatSubWorkerEntry *wentry; + + /* Read the subscription entry */ + if (fread(&wbuf, 1, sizeof(PgStat_StatSubWorkerEntry), fpin) + != sizeof(PgStat_StatSubWorkerEntry)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + goto done; + } + + /* Create hash table if we don't have it already. */ + if (subWorkerStatHash == NULL) + { + HASHCTL hash_ctl; + + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + hash_ctl.hcxt = pgStatLocalContext; + subWorkerStatHash = hash_create("Subscription worker stat entries", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + } + + /* Enter the subscription entry and initialize fields */ + wentry = + (PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash, + (void *) &wbuf.key, + HASH_ENTER, NULL); + memcpy(wentry, &wbuf, sizeof(PgStat_StatSubWorkerEntry)); + break; + } + case 'E': goto done; @@ -4541,6 +4953,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, PgStat_WalStats myWalStats; PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS]; PgStat_StatReplSlotEntry myReplSlotStats; + PgStat_StatSubWorkerEntry mySubWorkerStats; FILE *fpin; int32 format_id; const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename; @@ -4671,6 +5084,22 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent, } break; + /* + * 'S' A PgStat_StatSubWorkerEntry struct describing a + * subscription worker statistics. + */ + case 'S': + if (fread(&mySubWorkerStats, 1, sizeof(mySubWorkerStats), fpin) + != sizeof(mySubWorkerStats)) + { + ereport(pgStatRunningInCollector ? LOG : WARNING, + (errmsg("corrupted statistics file \"%s\"", + statfile))); + FreeFile(fpin); + return false; + } + break; + case 'E': goto done; @@ -4876,6 +5305,7 @@ pgstat_clear_snapshot(void) pgStatLocalContext = NULL; pgStatDBHash = NULL; replSlotStatHash = NULL; + subWorkerStatHash = NULL; /* * Historically the backend_status.c facilities lived in this file, and @@ -5344,6 +5774,33 @@ pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, } } +/* ---------- + * pgstat_recv_resetsubworkererror() - + * + * Process a RESETSUBWORKERERROR message. + * ---------- + */ +static void +pgstat_recv_resetsubworkererror(PgStat_MsgResetsubworkererror *msg, int len) +{ + PgStat_StatSubWorkerEntry *wentry; + + Assert(OidIsValid(msg->m_subid)); + + /* Get subscription worker stats */ + wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, false); + + /* + * Nothing to do if the subscription error entry is not found. This could + * happen when the subscription is dropped and the message for dropping + * subscription entry arrived before the message for resetting the error. + */ + if (wentry == NULL) + return; + + /* reset the entry and set reset timestamp */ + pgstat_reset_subworker_error(wentry, GetCurrentTimestamp()); +} /* ---------- * pgstat_recv_autovac() - @@ -5816,6 +6273,93 @@ pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len) } } +/* ---------- + * pgstat_recv_subworker_error() - + * + * Process a SUBWORKERERROR message. + * ---------- + */ +static void +pgstat_recv_subworker_error(PgStat_MsgSubWorkerError *msg, int len) +{ + PgStat_StatSubWorkerEntry *wentry; + + /* Get the subscription worker stats */ + wentry = pgstat_get_subworker_entry(msg->m_subid, msg->m_subrelid, true); + Assert(wentry); + + /* + * Update only the counter and timestamp if we received the same error + * again + */ + if (wentry->relid == msg->m_relid && + wentry->command == msg->m_command && + wentry->xid == msg->m_xid && + strncmp(wentry->message, msg->m_message, strlen(wentry->message)) == 0) + { + wentry->count++; + wentry->timestamp = msg->m_timestamp; + return; + } + + /* Otherwise, update the error information */ + wentry->relid = msg->m_relid; + wentry->command = msg->m_command; + wentry->xid = msg->m_xid; + wentry->count = 1; + wentry->timestamp = msg->m_timestamp; + strlcpy(wentry->message, msg->m_message, PGSTAT_SUBWORKERERROR_MSGLEN); +} + +/* ---------- + * pgstat_recv_subworker_purge() - + * + * Process a SUBWORKERPURGE message. + * ---------- + */ +static void +pgstat_recv_subworker_purge(PgStat_MsgSubWorkerPurge *msg, int len) +{ + if (subWorkerStatHash == NULL) + return; + + for (int i = 0; i < msg->m_nentries; i++) + { + HASH_SEQ_STATUS sstat; + PgStat_StatSubWorkerEntry *wentry; + + /* Remove all worker statistics of the subscription */ + hash_seq_init(&sstat, subWorkerStatHash); + while ((wentry = (PgStat_StatSubWorkerEntry *) hash_seq_search(&sstat)) != NULL) + { + if (wentry->key.subid == msg->m_subids[i]) + (void) hash_search(subWorkerStatHash, (void *) &(wentry->key), + HASH_REMOVE, NULL); + } + } +} + +/* ---------- + * pgstat_recv_subworker_error_purge() - + * + * Process a SUBWORKERERRORPURGE message. + * ---------- + */ +static void +pgstat_recv_subworker_error_purge(PgStat_MsgSubWorkerErrorPurge *msg, int len) +{ + PgStat_StatSubWorkerKey key; + + key.subid = msg->m_subid; + for (int i = 0; i < msg->m_nentries; i++) + { + Assert(OidIsValid(msg->m_relids[i])); + + key.subrelid = msg->m_relids[i]; + (void) hash_search(subWorkerStatHash, (void *) &key, HASH_REMOVE, NULL); + } +} + /* ---------- * pgstat_write_statsfile_needed() - * @@ -5934,6 +6478,71 @@ pgstat_reset_replslot(PgStat_StatReplSlotEntry *slotent, TimestampTz ts) slotent->stat_reset_timestamp = ts; } +/* ---------- + * pgstat_get_subworker_entry + * + * Return the entry of subscription worker entry with the subscription + * OID and relation OID. If subrelid is InvalidOid, it returns an entry + * of the apply worker otherwise of the table sync worker associated with + * subrelid. If no subscription entry exists, initialize it, if the + * create parameter is true. Else, return NULL. + * ---------- + */ +static PgStat_StatSubWorkerEntry * +pgstat_get_subworker_entry(Oid subid, Oid subrelid, bool create) +{ + PgStat_StatSubWorkerEntry *wentry; + PgStat_StatSubWorkerKey key; + HASHACTION action; + bool found; + + if (subWorkerStatHash == NULL) + { + HASHCTL hash_ctl; + + if (!create) + return NULL; + + hash_ctl.keysize = sizeof(PgStat_StatSubWorkerKey); + hash_ctl.entrysize = sizeof(PgStat_StatSubWorkerEntry); + subWorkerStatHash = hash_create("Subscription worker stat entries", + PGSTAT_SUBWORKER_HASH_SIZE, + &hash_ctl, + HASH_ELEM | HASH_BLOBS); + } + + key.subid = subid; + key.subrelid = subrelid; + action = (create ? HASH_ENTER : HASH_FIND); + wentry = (PgStat_StatSubWorkerEntry *) hash_search(subWorkerStatHash, + (void *) &key, + action, &found); + + /* initialize fields */ + if (create && !found) + pgstat_reset_subworker_error(wentry, 0); + + return wentry; +} + +/* ---------- + * pgstat_reset_subworker_error + * + * Reset the given subscription worker error stats. + * ---------- + */ +static void +pgstat_reset_subworker_error(PgStat_StatSubWorkerEntry *wentry, TimestampTz ts) +{ + wentry->relid = InvalidOid; + wentry->command = 0; + wentry->xid = InvalidTransactionId; + wentry->count = 0; + wentry->timestamp = 0; + wentry->message[0] = '\0'; + wentry->stat_reset_timestamp = ts; +} + /* * pgstat_slru_index * diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 8d96c926b4..ac3236a573 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3329,6 +3329,7 @@ void ApplyWorkerMain(Datum main_arg) { int worker_slot = DatumGetInt32(main_arg); + MemoryContext cctx = CurrentMemoryContext; MemoryContext oldctx; char originname[NAMEDATALEN]; XLogRecPtr origin_startpos; @@ -3429,8 +3430,27 @@ ApplyWorkerMain(Datum main_arg) { char *syncslotname; - /* This is table synchronization worker, call initial sync. */ - syncslotname = LogicalRepSyncTableStart(&origin_startpos); + PG_TRY(); + { + /* This is table synchronization worker, call initial sync. */ + syncslotname = LogicalRepSyncTableStart(&origin_startpos); + } + PG_CATCH(); + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + /* report the table sync error */ + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + MyLogicalRepWorker->relid, + 0, + InvalidTransactionId, + errdata->message); + MemoryContextSwitchTo(ecxt); + PG_RE_THROW(); + } + PG_END_TRY(); /* allocate slot name in long-lived context */ myslotname = MemoryContextStrdup(ApplyContext, syncslotname); @@ -3548,7 +3568,32 @@ ApplyWorkerMain(Datum main_arg) } /* Run the main loop. */ - LogicalRepApplyLoop(origin_startpos); + PG_TRY(); + { + LogicalRepApplyLoop(origin_startpos); + } + PG_CATCH(); + { + /* report the apply error */ + if (apply_error_callback_arg.command != 0) + { + MemoryContext ecxt = MemoryContextSwitchTo(cctx); + ErrorData *errdata = CopyErrorData(); + + pgstat_report_subworker_error(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + apply_error_callback_arg.rel != NULL + ? apply_error_callback_arg.rel->localreloid + : InvalidOid, + apply_error_callback_arg.command, + apply_error_callback_arg.remote_xid, + errdata->message); + MemoryContextSwitchTo(ecxt); + } + + PG_RE_THROW(); + } + PG_END_TRY(); proc_exit(0); } diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index ff5aedc99c..b2e324036c 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -24,6 +24,7 @@ #include "pgstat.h" #include "postmaster/bgworker_internals.h" #include "postmaster/postmaster.h" +#include "replication/logicalproto.h" #include "replication/slot.h" #include "storage/proc.h" #include "storage/procarray.h" @@ -2239,6 +2240,23 @@ pg_stat_reset_replication_slot(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* Reset a subscription error stats */ +Datum +pg_stat_reset_subscription_error(PG_FUNCTION_ARGS) +{ + Oid subid = PG_GETARG_OID(0); + Oid relid; + + if (PG_ARGISNULL(1)) + relid = InvalidOid; /* reset apply worker error stats */ + else + relid = PG_GETARG_OID(1); /* reset table sync worker error stats */ + + pgstat_reset_subworker_error_stats(subid, relid); + + PG_RETURN_VOID(); +} + Datum pg_stat_get_archiver(PG_FUNCTION_ARGS) { @@ -2379,3 +2397,106 @@ pg_stat_get_replication_slot(PG_FUNCTION_ARGS) /* Returns the record as Datum */ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); } + +/* + * Get the subscription error for the given subscription (and relation). + */ +Datum +pg_stat_get_subscription_error(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_SUBSCRIPTION_ERROR_COLS 9 + Oid subid = PG_GETARG_OID(0); + Oid subrelid; + TupleDesc tupdesc; + Datum values[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS]; + bool nulls[PG_STAT_GET_SUBSCRIPTION_ERROR_COLS]; + PgStat_StatSubWorkerEntry *wentry; + int i; + + if (PG_ARGISNULL(1)) + subrelid = InvalidOid; + else + subrelid = PG_GETARG_OID(1); + + /* Initialise attributes information in the tuple descriptor */ + tupdesc = CreateTemplateTupleDesc(PG_STAT_GET_SUBSCRIPTION_ERROR_COLS); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "subid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 2, "subrelid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 3, "relid", + OIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 4, "command", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 5, "xid", + XIDOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 6, "count", + INT8OID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 7, "error_message", + TEXTOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 8, "last_failed_time", + TIMESTAMPTZOID, -1, 0); + TupleDescInitEntry(tupdesc, (AttrNumber) 9, "stats_reset", + TIMESTAMPTZOID, -1, 0); + BlessTupleDesc(tupdesc); + + /* Get subscription worker stats */ + wentry = pgstat_fetch_subworker(subid, subrelid); + + /* Return NULL if the subscription doesn't have any errors */ + if (wentry == NULL) + PG_RETURN_NULL(); + + /* Initialise values and NULL flags arrays */ + MemSet(values, 0, sizeof(values)); + MemSet(nulls, 0, sizeof(nulls)); + + i = 0; + /* subid */ + values[i++] = ObjectIdGetDatum(subid); + + /* subrelid */ + if (OidIsValid(subrelid)) + values[i++] = ObjectIdGetDatum(subrelid); + else + nulls[i++] = true; + + /* relid */ + if (OidIsValid(wentry->relid)) + values[i++] = ObjectIdGetDatum(wentry->relid); + else + nulls[i++] = true; + + /* command */ + if (wentry->command != 0) + values[i++] = CStringGetTextDatum(logicalrep_message_type(wentry->command)); + else + nulls[i++] = true; + + /* xid */ + if (TransactionIdIsValid(wentry->xid)) + values[i++] = TransactionIdGetDatum(wentry->xid); + else + nulls[i++] = true; + + /* count */ + values[i++] = Int64GetDatum(wentry->count); + + /* error_message */ + values[i++] = CStringGetTextDatum(wentry->message); + + /* last_failed_time */ + if (wentry->timestamp != 0) + values[i++] = TimestampTzGetDatum(wentry->timestamp); + else + nulls[i++] = true; + + /* stats_reset */ + if (wentry->stat_reset_timestamp != 0) + values[i++] = TimestampTzGetDatum(wentry->stat_reset_timestamp); + else + nulls[i++] = true; + + /* Returns the record as Datum */ + PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls))); +} diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index d068d6532e..a901fe9a55 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -5385,6 +5385,14 @@ proargmodes => '{i,o,o,o,o,o,o,o,o,o,o}', proargnames => '{slot_name,slot_name,spill_txns,spill_count,spill_bytes,stream_txns,stream_count,stream_bytes,total_txns,total_bytes,stats_reset}', prosrc => 'pg_stat_get_replication_slot' }, +{ oid => '8523', descr => 'statistics: information about subscription error', + proname => 'pg_stat_get_subscription_error', prorows => '1', proisstrict => 'f', + proretset => 't', provolatile => 's', proparallel => 'r', + prorettype => 'record', proargtypes => 'oid oid', + proallargtypes => '{oid,oid,oid,oid,oid,text,xid,int8,text,timestamptz,timestamptz}', + proargmodes => '{i,i,o,o,o,o,o,o,o,o,o}', + proargnames => '{subid,subrelid,subid,subrelid,relid,command,xid,count,error_message,last_failed_time,stats_reset}', + prosrc => 'pg_stat_get_subscription_error' }, { oid => '6118', descr => 'statistics: information about subscription', proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f', proretset => 't', provolatile => 's', proparallel => 'r', @@ -5772,6 +5780,11 @@ proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v', prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' }, +{ oid => '8524', + descr => 'statistics: reset collected statistics for a single subscription error', + proname => 'pg_stat_reset_subscription_error', proisstrict => 'f', + provolatile => 'v', prorettype => 'void', proargtypes => 'oid oid', + prosrc => 'pg_stat_reset_subscription_error' }, { oid => '3163', descr => 'current trigger depth', proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r', diff --git a/src/include/pgstat.h b/src/include/pgstat.h index bcd3588ea2..fdcfea3ec4 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -14,6 +14,7 @@ #include "datatype/timestamp.h" #include "portability/instr_time.h" #include "postmaster/pgarch.h" /* for MAX_XFN_CHARS */ +#include "replication/logicalproto.h" #include "utils/backend_progress.h" /* for backward compatibility */ #include "utils/backend_status.h" /* for backward compatibility */ #include "utils/hsearch.h" @@ -66,6 +67,7 @@ typedef enum StatMsgType PGSTAT_MTYPE_RESETSINGLECOUNTER, PGSTAT_MTYPE_RESETSLRUCOUNTER, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER, + PGSTAT_MTYPE_RESETSUBWORKERERROR, PGSTAT_MTYPE_AUTOVAC_START, PGSTAT_MTYPE_VACUUM, PGSTAT_MTYPE_ANALYZE, @@ -83,6 +85,9 @@ typedef enum StatMsgType PGSTAT_MTYPE_REPLSLOT, PGSTAT_MTYPE_CONNECT, PGSTAT_MTYPE_DISCONNECT, + PGSTAT_MTYPE_SUBWORKERERROR, + PGSTAT_MTYPE_SUBWORKERERRORPURGE, + PGSTAT_MTYPE_SUBWORKERPURGE, } StatMsgType; /* ---------- @@ -389,6 +394,24 @@ typedef struct PgStat_MsgResetreplslotcounter bool clearall; } PgStat_MsgResetreplslotcounter; +/* ---------- + * PgStat_MsgRestsubworkererror Sent by the backend to reset the subscription + * worker error information. + * ---------- + */ +typedef struct PgStat_MsgResetsubworkererror +{ + PgStat_MsgHdr m_hdr; + + /* + * Same as PgStat_MsgSubWorkerError, m_subid and m_subrelid are used to + * determine the subscription and the reporter of the error: the apply + * worker or the table sync worker. + */ + Oid m_subid; + Oid m_subrelid; +} PgStat_MsgResetsubworkererror; + /* ---------- * PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal * that a database is going to be processed @@ -536,6 +559,67 @@ typedef struct PgStat_MsgReplSlot PgStat_Counter m_total_bytes; } PgStat_MsgReplSlot; +/* ---------- + * PgStat_MsgSubWorkerError Sent by the apply worker or the table sync worker to + * report the error occurred during logical replication. + * ---------- + */ +#define PGSTAT_SUBWORKERERROR_MSGLEN 256 +typedef struct PgStat_MsgSubWorkerError +{ + PgStat_MsgHdr m_hdr; + + /* + * m_subid and m_subrelid are used to determine the subscription and the + * reporter of the error. m_subrelid is InvalidOid if reported by an apply + * worker otherwise reported by a table sync worker. + */ + Oid m_subid; + Oid m_subrelid; + + /* + * Oid of the table that the reporter was actually processing. This can be + * InvalidOid if the worker was applying a non-data-modification change + * such as STREAM_STOP. + */ + Oid m_relid; + + LogicalRepMsgType m_command; + TransactionId m_xid; + TimestampTz m_timestamp; + char m_message[PGSTAT_SUBWORKERERROR_MSGLEN]; +} PgStat_MsgSubWorkerError; + +/* ---------- + * PgStat_MsgSubWorkerPurge Sent by the backend and autovacuum to tell the + * collector about the dead subscriptions. + * ---------- + */ +#define PGSTAT_NUM_SUBWORKERPURGE \ + ((PGSTAT_MSG_PAYLOAD - sizeof(int)) / sizeof(Oid)) + +typedef struct PgStat_MsgSubWorkerPurge +{ + PgStat_MsgHdr m_hdr; + int m_nentries; + Oid m_subids[PGSTAT_NUM_SUBWORKERPURGE]; +} PgStat_MsgSubWorkerPurge; + +/* ---------- + * PgStat_MsgSubWorkerErrorPurge Sent by the backend and autovacuum to purge + * the subscription errors. + * ---------- + */ +#define PGSTAT_NUM_SUBWORKERERRORPURGE \ + ((PGSTAT_MSG_PAYLOAD - sizeof(Oid) - sizeof(int)) / sizeof(Oid)) + +typedef struct PgStat_MsgSubWorkerErrorPurge +{ + PgStat_MsgHdr m_hdr; + Oid m_subid; + int m_nentries; + Oid m_relids[PGSTAT_NUM_SUBWORKERERRORPURGE]; +} PgStat_MsgSubWorkerErrorPurge; /* ---------- * PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict @@ -697,6 +781,7 @@ typedef union PgStat_Msg PgStat_MsgResetsinglecounter msg_resetsinglecounter; PgStat_MsgResetslrucounter msg_resetslrucounter; PgStat_MsgResetreplslotcounter msg_resetreplslotcounter; + PgStat_MsgResetsubworkererror msg_resetsubworkererror; PgStat_MsgAutovacStart msg_autovacuum_start; PgStat_MsgVacuum msg_vacuum; PgStat_MsgAnalyze msg_analyze; @@ -714,6 +799,9 @@ typedef union PgStat_Msg PgStat_MsgReplSlot msg_replslot; PgStat_MsgConnect msg_connect; PgStat_MsgDisconnect msg_disconnect; + PgStat_MsgSubWorkerError msg_subworkererror; + PgStat_MsgSubWorkerErrorPurge msg_subworkererrorpurge; + PgStat_MsgSubWorkerPurge msg_subworkerpurge; } PgStat_Msg; @@ -929,6 +1017,34 @@ typedef struct PgStat_StatReplSlotEntry TimestampTz stat_reset_timestamp; } PgStat_StatReplSlotEntry; +/* The lookup key for subscription worker hash table */ +typedef struct PgStat_StatSubWorkerKey +{ + Oid subid; + Oid subrelid; /* InvalidOid for apply worker, otherwise for + * table sync worker */ +} PgStat_StatSubWorkerKey; + +/* + * Logical replication apply worker and table sync worker statistics kept in the + * stats collector. + */ +typedef struct PgStat_StatSubWorkerEntry +{ + PgStat_StatSubWorkerKey key; /* hash key (must be first) */ + + /* + * Subscription worker error statistics representing an error that occurred + * during application of logical replication or the initial table synchronization. + */ + Oid relid; + LogicalRepMsgType command; + TransactionId xid; + PgStat_Counter count; + TimestampTz timestamp; + char message[PGSTAT_SUBWORKERERROR_MSGLEN]; + TimestampTz stat_reset_timestamp; +} PgStat_StatSubWorkerEntry; /* * Working state needed to accumulate per-function-call timing statistics. @@ -1022,6 +1138,7 @@ extern void pgstat_reset_shared_counters(const char *); extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type); extern void pgstat_reset_slru_counter(const char *); extern void pgstat_reset_replslot_counter(const char *name); +extern void pgstat_reset_subworker_error_stats(Oid subid, Oid subrelid); extern void pgstat_report_connect(Oid dboid); extern void pgstat_report_autovac(Oid dboid); @@ -1038,6 +1155,9 @@ extern void pgstat_report_checksum_failure(void); extern void pgstat_report_replslot(const PgStat_StatReplSlotEntry *repSlotStat); extern void pgstat_report_replslot_create(const char *slotname); extern void pgstat_report_replslot_drop(const char *slotname); +extern void pgstat_report_subworker_error(Oid subid, Oid subrelid, Oid relid, + LogicalRepMsgType command, + TransactionId xid, const char *errmsg); extern void pgstat_initialize(void); @@ -1136,6 +1256,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void); extern PgStat_WalStats *pgstat_fetch_stat_wal(void); extern PgStat_SLRUStats *pgstat_fetch_slru(void); extern PgStat_StatReplSlotEntry *pgstat_fetch_replslot(NameData slotname); +extern PgStat_StatSubWorkerEntry *pgstat_fetch_subworker(Oid subid, Oid subrelid); extern void pgstat_count_slru_page_zeroed(int slru_idx); extern void pgstat_count_slru_page_hit(int slru_idx); diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 2fa00a3c29..7ecd4f167a 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2094,6 +2094,26 @@ pg_stat_subscription| SELECT su.oid AS subid, st.latest_end_time FROM (pg_subscription su LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid))); +pg_stat_subscription_errors| SELECT e.subid, + s.subname, + e.subrelid, + e.relid, + e.command, + e.xid, + e.count, + e.error_message, + e.last_failed_time, + e.stats_reset + FROM ( SELECT pg_subscription.oid AS subid, + NULL::oid AS relid + FROM pg_subscription + UNION ALL + SELECT pg_subscription_rel.srsubid AS subid, + pg_subscription_rel.srrelid AS relid + FROM pg_subscription_rel + WHERE (pg_subscription_rel.srsubstate <> 'r'::"char")) sr, + (LATERAL pg_stat_get_subscription_error(sr.subid, sr.relid) e(subid, subrelid, relid, command, xid, count, error_message, last_failed_time, stats_reset) + JOIN pg_subscription s ON ((e.subid = s.oid))); pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid, pg_stat_all_indexes.indexrelid, pg_stat_all_indexes.schemaname, diff --git a/src/test/subscription/t/025_error_report.pl b/src/test/subscription/t/025_error_report.pl new file mode 100644 index 0000000000..c6fea0d046 --- /dev/null +++ b/src/test/subscription/t/025_error_report.pl @@ -0,0 +1,154 @@ + +# Copyright (c) 2021, PostgreSQL Global Development Group + +# Tests for subscription error reporting. +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 5; + +# Test if the error reported on pg_subscription_errors view is expected. +sub test_subscription_error +{ + my ($node, $relname, $xid, $expected_error, $msg) = @_; + + my $check_sql = qq[ +SELECT count(1) > 0 FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass]; + $check_sql .= " AND xid = '$xid'::xid;" if $xid ne ''; + + # Wait for the error statistics to be updated. + $node->poll_query_until( + 'postgres', $check_sql, +) or die "Timed out while waiting for statistics to be updated"; + + my $result = $node->safe_psql( + 'postgres', + qq[ +SELECT subname, command, relid::regclass, count > 0 +FROM pg_stat_subscription_errors +WHERE relid = '$relname'::regclass; +]); + is($result, $expected_error, $msg); +} + +# Create publisher node. +my $node_publisher = PostgresNode->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + qq[ +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +]); +$node_publisher->start; + +# Create subscriber node. +my $node_subscriber = PostgresNode->new('subscriber'); +$node_subscriber->init(allows_streaming => 'logical'); + +# The subscriber will enter an infinite error loop, so we don't want +# to overflow the server log with error messages. +$node_subscriber->append_conf('postgresql.conf', + qq[ +max_prepared_transactions = 10 +wal_retrieve_retry_interval = 5s +]); +$node_subscriber->start; + +# Initial table setup on both publisher and subscriber. On subscriber we create +# the same tables but with primary keys. Also, insert some data that will conflict +# with the data replicated from publisher later. +$node_publisher->safe_psql( + 'postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int); +CREATE TABLE test_tab2 (a int); +CREATE TABLE test_tab_streaming (a int, b text); +INSERT INTO test_tab1 VALUES (1); +INSERT INTO test_tab2 VALUES (1); +COMMIT; +]); +$node_subscriber->safe_psql( + 'postgres', + q[ +BEGIN; +CREATE TABLE test_tab1 (a int primary key); +CREATE TABLE test_tab2 (a int primary key); +CREATE TABLE test_tab_streaming (a int primary key, b text); +INSERT INTO test_tab2 VALUES (1); +INSERT INTO test_tab_streaming SELECT 10000, md5(10000::text); +COMMIT; +]); + +# Setup publications. +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + q[ +CREATE PUBLICATION tap_pub FOR TABLE test_tab1, test_tab2; +CREATE PUBLICATION tap_pub_streaming FOR TABLE test_tab_streaming; +]); + +# Check if there is no subscription errors before starting logical replication. +my $result = + $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, qq(0), 'check no subscription error'); + +# Create subscriptions. The table sync for test_tab2 on tap_sub will enter to +# infinite error due to violating the unique constraint. +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = off, two_phase = on);"); +my $appname_streaming = 'tap_sub_streaming'; +$node_subscriber->safe_psql( + 'postgres', + "CREATE SUBSCRIPTION tap_sub_streaming CONNECTION '$publisher_connstr application_name=$appname_streaming' PUBLICATION tap_pub_streaming WITH (streaming = on, two_phase = on);"); + +$node_publisher->wait_for_catchup($appname); +$node_publisher->wait_for_catchup($appname_streaming); + +# Wait for initial table sync for test_tab1 and test_tab_streaming to finish. +$node_subscriber->poll_query_until('postgres', + q[ +SELECT count(1) = 2 FROM pg_subscription_rel +WHERE srrelid in ('test_tab1'::regclass, 'test_tab_streaming'::regclass) AND srsubstate in ('r', 's') +]) or die "Timed out while waiting for subscriber to synchronize data"; + +# Check the initial data. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(a) FROM test_tab1"); +is($result, q(1), 'check initial data are copied to subscriber'); + +# Insert more data to test_tab1, raising an error on the subscriber due to violation +# of the unique constraint on test_tab1. +my $xid = $node_publisher->safe_psql( + 'postgres', + qq[ +BEGIN; +INSERT INTO test_tab1 VALUES (1); +SELECT pg_current_xact_id()::xid; +COMMIT; +]); +test_subscription_error($node_subscriber, 'test_tab1', $xid, + qq(tap_sub|INSERT|test_tab1|t), + 'check the error reported by the apply worker'); + +# Check the table sync worker's error in the view. +test_subscription_error($node_subscriber, 'test_tab2', '', + qq(tap_sub||test_tab2|t), + 'check the error reported by the table sync worker'); + +# Check if the view doesn't show any entries after dropping the subscriptions. +$node_subscriber->safe_psql( + 'postgres', + q[ +DROP SUBSCRIPTION tap_sub; +DROP SUBSCRIPTION tap_sub_streaming; +]); +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(1) FROM pg_stat_subscription_errors"); +is($result, q(0), 'no error after dropping subscription'); + diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index cb5b5ec74c..8ff6294267 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1939,7 +1939,11 @@ PgStat_MsgResetreplslotcounter PgStat_MsgResetsharedcounter PgStat_MsgResetsinglecounter PgStat_MsgResetslrucounter +PgStat_MsgResetsubworkererror PgStat_MsgSLRU +PgStat_MsgSubWorkerError +PgStat_MsgSubWorkerErrorPurge +PgStat_MsgSubWorkerPurge PgStat_MsgTabpurge PgStat_MsgTabstat PgStat_MsgTempFile @@ -1951,6 +1955,9 @@ PgStat_Single_Reset_Type PgStat_StatDBEntry PgStat_StatFuncEntry PgStat_StatReplSlotEntry +PgStat_StatSubWorkerEntry +PgStat_StatSubWorkerKey +PgStat_SubWorkerError PgStat_StatTabEntry PgStat_SubXactStatus PgStat_TableCounts -- 2.24.3 (Apple Git-128)