From 64258696f1e648cf5f7e05803c849c3f85be5c3a Mon Sep 17 00:00:00 2001 From: "Imseih (AWS)" Date: Tue, 7 Feb 2023 19:39:42 -0600 Subject: [PATCH 1/1] Report index vacuum progress. Add 2 new columns to pg_stat_progress_vacuum. The columns are ndexes_total as the total indexes to be vacuumed or cleaned and indexes_processed as the number of indexes vacuumed or cleaned up so far. Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada Reviewed by: Nathan Bossart, Masahiko Sawada Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com --- doc/src/sgml/monitoring.sgml | 31 +++++++++++ src/backend/access/heap/vacuumlazy.c | 80 +++++++++++++++++++++++---- src/backend/access/transam/parallel.c | 16 ++++++ src/backend/catalog/system_views.sql | 3 +- src/backend/commands/vacuumparallel.c | 55 +++++++++++++++++- src/include/access/parallel.h | 5 ++ src/include/commands/progress.h | 2 + src/include/commands/vacuum.h | 1 + src/test/regress/expected/rules.out | 4 +- 9 files changed, 184 insertions(+), 13 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 1756f1a4b6..2796d92f99 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1779,6 +1779,10 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser ParallelFinish Waiting for parallel workers to finish computing. + + ParallelVacuumFinish + Waiting for parallel vacuum workers to finish index vacuum. + ProcArrayGroupUpdate Waiting for the group leader to clear the transaction ID at @@ -6883,6 +6887,33 @@ FROM pg_stat_get_backend_idset() AS backendid; Number of dead tuples collected since the last index vacuum cycle. + + + + indexes_total bigint + + + Number of indexes that will be vacuumed or cleaned up. This value will be + 0 if the phase is not vacuuming indexes + or cleaning up indexes, INDEX_CLEANUP + is set to OFF, index vacuum is skipped due to very + few dead tuples in the table, or vacuum failsafe is triggered. + See + for more on vacuum failsafe. + + + + + + indexes_completed bigint + + + Number of indexes vacuumed in the current vacuum cycle when the + phase is vacuuming indexes, or the number + of indexes cleaned up during the cleaning up indexes + phase. + + diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 8f14cf85f3..28bac92591 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -2316,6 +2316,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) { bool allindexes = true; double old_live_tuples = vacrel->rel->rd_rel->reltuples; + const int progress_start_index[] = { + PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_INDEX_TOTAL + }; + const int progress_end_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_COMPLETED, + PROGRESS_VACUUM_NUM_INDEX_VACUUMS + }; + int64 progress_start_val[2]; + int64 progress_end_val[3]; Assert(vacrel->nindexes > 0); Assert(vacrel->do_index_vacuuming); @@ -2328,9 +2339,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) return false; } - /* Report that we are now vacuuming indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_INDEX); + /* + * Report that we are now vacuuming indexes + * and the number of indexes to vacuum. + */ + progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX; + progress_start_val[1] = vacrel->nindexes; + pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val); if (!ParallelVacuumIsActive(vacrel)) { @@ -2343,6 +2358,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) old_live_tuples, vacrel); + /* Done vacuuming an index -- increment the indexes completed */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED, + idx + 1); + if (lazy_check_wraparound_failsafe(vacrel)) { /* Wraparound emergency -- end current index scan */ @@ -2377,14 +2396,18 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) Assert(allindexes || vacrel->failsafe_active); /* - * Increase and report the number of index scans. + * Reset and report the total indexes to vacuum and the number of + * indexes vacuumed. + * Also, increase and report the number of index scans completed. * * We deliberately include the case where we started a round of bulk * deletes that we weren't able to finish due to the failsafe triggering. */ vacrel->num_index_scans++; - pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS, - vacrel->num_index_scans); + progress_end_val[0] = 0; + progress_end_val[1] = 0; + progress_end_val[2] = vacrel->num_index_scans; + pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val); return allindexes; } @@ -2621,12 +2644,23 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel) if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs))) { + const int progress_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_COMPLETED + }; + int64 progress_val[2] = {0, 0}; + vacrel->failsafe_active = true; - /* Disable index vacuuming, index cleanup, and heap rel truncation */ + /* + * Disable index vacuuming, index cleanup, and heap rel truncation. + * + * Also, report that we are no longer tracking index vacuum/cleanup. + */ vacrel->do_index_vacuuming = false; vacrel->do_index_cleanup = false; vacrel->do_rel_truncate = false; + pgstat_progress_update_multi_param(2, progress_index, progress_val); ereport(WARNING, (errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans", @@ -2654,13 +2688,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) { double reltuples = vacrel->new_rel_tuples; bool estimated_count = vacrel->scanned_pages < vacrel->rel_pages; + const int progress_start_index[] = { + PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_INDEX_TOTAL + }; + const int progress_end_index[] = { + PROGRESS_VACUUM_INDEX_TOTAL, + PROGRESS_VACUUM_INDEX_COMPLETED + }; + int64 progress_start_val[2]; + int64 progress_end_val[2]; Assert(vacrel->do_index_cleanup); Assert(vacrel->nindexes > 0); - /* Report that we are now cleaning up indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); + /* + * Report that we are now cleaning up indexes + * and the number of indexes to cleanup. + */ + progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP; + progress_start_val[1] = vacrel->nindexes; + pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val); if (!ParallelVacuumIsActive(vacrel)) { @@ -2672,6 +2720,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) vacrel->indstats[idx] = lazy_cleanup_one_index(indrel, istat, reltuples, estimated_count, vacrel); + + /* Done cleaning an index -- increment the indexes completed */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED, + idx + 1); } } else @@ -2681,6 +2733,14 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) vacrel->num_index_scans, estimated_count); } + + /* + * Reset and report the total number of indexes to cleanup + * and the number of indexes cleaned. + */ + progress_end_val[0] = 0; + progress_end_val[1] = 0; + pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val); } /* diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 9e3ec0d5d8..a321f76999 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -185,6 +185,8 @@ CreateParallelContext(const char *library_name, const char *function_name, pcxt->library_name = pstrdup(library_name); pcxt->function_name = pstrdup(function_name); pcxt->error_context_stack = error_context_stack; + pcxt->parallel_progress_callback = NULL; + pcxt->parallel_progress_callback_arg = NULL; shm_toc_initialize_estimator(&pcxt->estimator); dlist_push_head(&pcxt_list, &pcxt->node); @@ -1199,6 +1201,20 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) break; } + case 'P': /* Parallel progress reporting */ + { + /* + * A Leader process that receives this message + * must be ready to update progress. + */ + Assert(pcxt->parallel_progress_callback); + + /* Report progress */ + pcxt->parallel_progress_callback(pcxt->parallel_progress_callback_arg); + + break; + } + case 'X': /* Terminate, indicating clean exit */ { shm_mq_detach(pcxt->worker[i].error_mqh); diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8608e3fa5b..799587dcd7 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1165,7 +1165,8 @@ CREATE VIEW pg_stat_progress_vacuum AS END AS phase, S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned, S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count, - S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples + S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples, + S.param8 AS indexes_total, S.param9 AS indexes_completed FROM pg_stat_get_progress_info('VACUUM') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index bcd40c80a1..9e5a300ba4 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -30,7 +30,9 @@ #include "access/table.h" #include "access/xact.h" #include "catalog/index.h" +#include "commands/progress.h" #include "commands/vacuum.h" +#include "libpq/libpq.h" #include "optimizer/paths.h" #include "pgstat.h" #include "storage/bufmgr.h" @@ -103,6 +105,17 @@ typedef struct PVShared /* Counter for vacuuming and cleanup */ pg_atomic_uint32 idx; + + /* + * Counter for vacuuming and cleanup progress reporting. + * This value is used to report index vacuum/cleanup progress + * in parallel_vacuum_progress_report. We keep this + * counter to avoid having to loop through + * ParallelVacuumState->indstats to determine the number + * of indexes completed. + */ + pg_atomic_uint32 nindexes_completed; + } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -272,6 +285,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, parallel_workers); Assert(pcxt->nworkers > 0); pvs->pcxt = pcxt; + pcxt->parallel_progress_callback = parallel_vacuum_update_progress; + pcxt->parallel_progress_callback_arg = pvs; /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); @@ -364,6 +379,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); + pg_atomic_init_u32(&(shared->nindexes_completed), 0); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; @@ -618,8 +634,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan vacuum)); } - /* Reset the parallel index processing counter */ + /* Reset the parallel index processing and progress counters */ pg_atomic_write_u32(&(pvs->shared->idx), 0); + pg_atomic_write_u32(&(pvs->shared->nindexes_completed), 0); /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) @@ -888,6 +905,22 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; pfree(pvs->indname); pvs->indname = NULL; + + /* + * Update index vacuum progress. + * + * When a parallel worker completes an + * index vacuum, it sends a protocol message + * to notify the leader. The leader then + * updates the progress. See HandleParallelMessage(). + * + * When a leader performs the index vacuum, + * it can update the progress directly. + */ + if (IsParallelWorker()) + pq_putmessage('P', NULL, 0); + else + parallel_vacuum_update_progress(pvs); } /* @@ -1072,3 +1105,23 @@ parallel_vacuum_error_callback(void *arg) return; } } + +/* + * Read pvs->shared->nindexes_completed and report the number of indexes + * vacuumed so far. + * + * Note: This function should be called by the leader process only, + * and it's up to the caller to ensure this. + */ +void +parallel_vacuum_update_progress(void *arg) +{ + ParallelVacuumState *pvs = (ParallelVacuumState *)arg; + + Assert(!IsParallelWorker()); + Assert(pvs->pcxt->parallel_progress_callback_arg); + + if (pvs) + pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_COMPLETED, + pg_atomic_add_fetch_u32(&(pvs->shared->nindexes_completed), 1)); +} \ No newline at end of file diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 061f8a4c4c..7ddc71dae2 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -20,6 +20,9 @@ #include "storage/shm_mq.h" #include "storage/shm_toc.h" +/* progress callback definition */ +typedef void (*ParallelProgressCallback) (void *parallel_progress_callback_state); + typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc); typedef struct ParallelWorkerInfo @@ -46,6 +49,8 @@ typedef struct ParallelContext ParallelWorkerInfo *worker; int nknown_attached_workers; bool *known_attached_workers; + ParallelProgressCallback parallel_progress_callback; + void *parallel_progress_callback_arg; } ParallelContext; typedef struct ParallelWorkerContext diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index e5add41352..6b8b609a4f 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -25,6 +25,8 @@ #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS 4 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES 5 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES 6 +#define PROGRESS_VACUUM_INDEX_TOTAL 7 +#define PROGRESS_VACUUM_INDEX_COMPLETED 8 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */ #define PROGRESS_VACUUM_PHASE_SCAN_HEAP 1 diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 689dbb7702..7b13069d33 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -370,5 +370,6 @@ extern bool std_typanalyze(VacAttrStats *stats); extern double anl_random_fract(void); extern double anl_init_selection_state(int n); extern double anl_get_next_S(double t, int n, double *stateptr); +extern void parallel_vacuum_update_progress(void *arg); #endif /* VACUUM_H */ diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index e7a2f5856a..a20978d335 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -2021,7 +2021,9 @@ pg_stat_progress_vacuum| SELECT s.pid, s.param4 AS heap_blks_vacuumed, s.param5 AS index_vacuum_count, s.param6 AS max_dead_tuples, - s.param7 AS num_dead_tuples + s.param7 AS num_dead_tuples, + s.param8 AS indexes_total, + s.param9 AS indexes_completed FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_recovery_prefetch| SELECT stats_reset, -- 2.37.1 (Apple Git-137.1)