From 3e146938fca29e2ace9bb86f60ac346f4312534e Mon Sep 17 00:00:00 2001 From: "Imseih (AWS)" Date: Fri, 17 Feb 2023 19:34:02 -0600 Subject: [PATCH 1/1] Report index vacuum progress. Add 2 new columns to pg_stat_progress_vacuum. The columns are ndexes_total as the total indexes to be vacuumed or cleaned and indexes_processed as the number of indexes vacuumed or cleaned up so far. Authors: Sami Imseih Reviewed by: Masahiko Sawada, Nathan Bossart, Andres Freund Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com --- doc/src/sgml/monitoring.sgml | 21 ++++++++ src/backend/access/heap/vacuumlazy.c | 70 +++++++++++++++++++++++---- src/backend/access/transam/parallel.c | 9 ++++ src/backend/catalog/system_views.sql | 3 +- src/backend/commands/vacuumparallel.c | 45 ++++++++++++++++- src/include/access/parallel.h | 3 ++ src/include/commands/progress.h | 2 + src/include/commands/vacuum.h | 1 + src/test/regress/expected/rules.out | 4 +- 9 files changed, 146 insertions(+), 12 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index b0b997f092..02ebb718d7 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -7183,6 +7183,27 @@ FROM pg_stat_get_backend_idset() AS backendid; Number of dead tuples collected since the last index vacuum cycle. + + + + indexes_total bigint + + + Total number of indexes that will be vacuumed or cleaned up. This number is + reported as of the beginning of the vacuuming indexes phase + or the cleaning up indexes phase. + + + + + + indexes_processed bigint + + + Number of indexes processed. This counter only advances when the phase is + vacuuming indexes or cleaning up indexes. + + diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 8f14cf85f3..443a44d6c9 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -2316,6 +2316,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) { bool allindexes = true; double old_live_tuples = vacrel->rel->rd_rel->reltuples; + const int progress_start_index[] = { + PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_INDEX_TOTAL + }; + const int progress_end_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_PROCESSED, + PROGRESS_VACUUM_NUM_INDEX_VACUUMS + }; + int64 progress_start_val[2]; + int64 progress_end_val[3]; Assert(vacrel->nindexes > 0); Assert(vacrel->do_index_vacuuming); @@ -2328,9 +2339,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) return false; } - /* Report that we are now vacuuming indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_INDEX); + /* + * Report that we are now vacuuming indexes and the number of indexes + * to vacuum. + */ + progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX; + progress_start_val[1] = vacrel->nindexes; + pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val); if (!ParallelVacuumIsActive(vacrel)) { @@ -2343,6 +2358,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) old_live_tuples, vacrel); + /* Report the number of indexes vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED, + idx + 1); + if (lazy_check_wraparound_failsafe(vacrel)) { /* Wraparound emergency -- end current index scan */ @@ -2377,14 +2396,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) Assert(allindexes || vacrel->failsafe_active); /* - * Increase and report the number of index scans. + * Increase and report the number of index scans. Also, we reset the progress + * counters. * * We deliberately include the case where we started a round of bulk * deletes that we weren't able to finish due to the failsafe triggering. */ vacrel->num_index_scans++; - pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS, - vacrel->num_index_scans); + progress_end_val[0] = 0; + progress_end_val[1] = 0; + progress_end_val[2] = vacrel->num_index_scans; + pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val); return allindexes; } @@ -2621,6 +2643,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs))) { + const int progress_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_PROCESSED + }; + int64 progress_val[2] = {0, 0}; + vacrel->failsafe_active = true; /* Disable index vacuuming, index cleanup, and heap rel truncation */ @@ -2628,6 +2656,9 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) vacrel->do_index_cleanup = false; vacrel->do_rel_truncate = false; + /* Reset the progress counters */ + pgstat_progress_update_multi_param(2, progress_index, progress_val); + ereport(WARNING, (errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans", vacrel->dbname, vacrel->relnamespace, vacrel->relname, @@ -2654,13 +2685,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) { double reltuples = vacrel->new_rel_tuples; bool estimated_count = vacrel->scanned_pages < vacrel->rel_pages; + const int progress_start_index[] = { + PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_INDEX_TOTAL + }; + const int progress_end_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_PROCESSED + }; + int64 progress_start_val[2]; + int64 progress_end_val[2] = {0, 0}; Assert(vacrel->do_index_cleanup); Assert(vacrel->nindexes > 0); - /* Report that we are now cleaning up indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); + /* + * Report that we are now cleaning up indexes and the number of indexes + * to cleanup. + */ + progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP; + progress_start_val[1] = vacrel->nindexes; + pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val); if (!ParallelVacuumIsActive(vacrel)) { @@ -2672,6 +2717,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) vacrel->indstats[idx] = lazy_cleanup_one_index(indrel, istat, reltuples, estimated_count, vacrel); + + /* Report the number of indexes cleaned up */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED, + idx + 1); } } else @@ -2681,6 +2730,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) vacrel->num_index_scans, estimated_count); } + + /* Reset the progress counters */ + pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val); } /* diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index b26f2a64fb..b0c406fe7a 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -1199,6 +1199,15 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) break; } + case 'P': /* Parallel progress reporting */ + { + /* Call the progress reporting callback */ + Assert(pcxt->parallel_progress_callback); + pcxt->parallel_progress_callback(pcxt->parallel_progress_callback_arg); + + break; + } + case 'X': /* Terminate, indicating clean exit */ { shm_mq_detach(pcxt->worker[i].error_mqh); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 34ca0e739f..5886ee8b7c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1180,7 +1180,8 @@ CREATE VIEW pg_stat_progress_vacuum AS END AS phase, S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned, S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count, - S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples + S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples, + S.param8 AS indexes_total, S.param9 AS indexes_processed FROM pg_stat_get_progress_info('VACUUM') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index bcd40c80a1..57dc5fd8f0 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -30,7 +30,9 @@ #include "access/table.h" #include "access/xact.h" #include "catalog/index.h" +#include "commands/progress.h" #include "commands/vacuum.h" +#include "libpq/libpq.h" #include "optimizer/paths.h" #include "pgstat.h" #include "storage/bufmgr.h" @@ -103,6 +105,14 @@ typedef struct PVShared /* Counter for vacuuming and cleanup */ pg_atomic_uint32 idx; + + /* + * Number of indexes processed in a parallel index bulk-deletion or a + * parallel index cleanup. This counter is used to report the progress + * information. + */ + pg_atomic_uint32 nindexes_processed; + } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -271,6 +281,11 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pcxt = CreateParallelContext("postgres", "parallel_vacuum_main", parallel_workers); Assert(pcxt->nworkers > 0); + + /* Setup callback for updating the progress information */ + pcxt->parallel_progress_callback = parallel_vacuum_update_progress; + pcxt->parallel_progress_callback_arg = pvs; + pvs->pcxt = pcxt; /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ @@ -364,6 +379,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); + pg_atomic_init_u32(&(shared->nindexes_processed), 0); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; @@ -618,8 +634,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan vacuum)); } - /* Reset the parallel index processing counter */ + /* Reset the parallel index processing and progress counters */ pg_atomic_write_u32(&(pvs->shared->idx), 0); + pg_atomic_write_u32(&(pvs->shared->nindexes_processed), 0); /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) @@ -888,6 +905,16 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; pfree(pvs->indname); pvs->indname = NULL; + + /* + * Update the index vacuum progress information. Since the progress is + * updated only by the leader, the worker notifies the leader of it. + */ + pg_atomic_add_fetch_u32(&(pvs->shared->nindexes_processed), 1); + if (IsParallelWorker()) + pq_putmessage('P', NULL, 0); + else + parallel_vacuum_update_progress(pvs); } /* @@ -1072,3 +1099,19 @@ parallel_vacuum_error_callback(void *arg) return; } } + +/* + * Update the number of indexes processed so far in the current index bulk-deletion + * or index cleanup. + */ +void +parallel_vacuum_update_progress(void *arg) +{ + ParallelVacuumState *pvs = (ParallelVacuumState *) arg; + + Assert(!IsParallelWorker()); + Assert(pvs); + + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED, + pg_atomic_read_u32(&(pvs->shared->nindexes_processed))); +} diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 061f8a4c4c..f621d51c0d 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -21,6 +21,7 @@ #include "storage/shm_toc.h" typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc); +typedef void (*parallel_progress_callback_type) (void *arg); typedef struct ParallelWorkerInfo { @@ -46,6 +47,8 @@ typedef struct ParallelContext ParallelWorkerInfo *worker; int nknown_attached_workers; bool *known_attached_workers; + parallel_progress_callback_type parallel_progress_callback; + void *parallel_progress_callback_arg; } ParallelContext; typedef struct ParallelWorkerContext diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index e5add41352..23c38f2d0e 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -25,6 +25,8 @@ #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS 4 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES 5 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES 6 +#define PROGRESS_VACUUM_INDEX_TOTAL 7 +#define PROGRESS_VACUUM_INDEX_PROCESSED 8 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */ #define PROGRESS_VACUUM_PHASE_SCAN_HEAP 1 diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 689dbb7702..7b13069d33 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -370,5 +370,6 @@ extern bool std_typanalyze(VacAttrStats *stats); extern double anl_random_fract(void); extern double anl_init_selection_state(int n); extern double anl_get_next_S(double t, int n, double *stateptr); +extern void parallel_vacuum_update_progress(void *arg); #endif /* VACUUM_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index a3a5a62329..b1badc485d 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2033,7 +2033,9 @@ pg_stat_progress_vacuum| SELECT s.pid, s.param4 AS heap_blks_vacuumed, s.param5 AS index_vacuum_count, s.param6 AS max_dead_tuples, - s.param7 AS num_dead_tuples + s.param7 AS num_dead_tuples, + s.param8 AS indexes_total, + s.param9 AS indexes_processed FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_recovery_prefetch| SELECT stats_reset, -- 2.37.1 (Apple Git-137.1)