From 4460de90eda020c5e49f0ba264282a2a4bd6565d Mon Sep 17 00:00:00 2001 From: "Sami Imseih (AWS)" Date: Thu, 10 Feb 2022 19:28:27 +0000 Subject: [PATCH 1/1] Expose progress for the "vacuuming indexes" and "cleaning up indexes" phase of a VACUUM operation. Author: Sami Imseih, based on suggestions by Nathan Bossart, Peter Geoghegan and Masahiko Sawada Reviewed by: Nathan Bossart, Justin Pryzby --- doc/src/sgml/monitoring.sgml | 121 +++++++++++++ src/backend/access/gin/ginvacuum.c | 3 + src/backend/access/gist/gistvacuum.c | 3 + src/backend/access/hash/hash.c | 1 + src/backend/access/heap/vacuumlazy.c | 236 ++++++++++++++++++++++++-- src/backend/access/nbtree/nbtree.c | 1 + src/backend/access/spgist/spgvacuum.c | 4 + src/backend/catalog/system_views.sql | 42 ++++- src/backend/commands/vacuumparallel.c | 25 +++ src/backend/storage/ipc/ipci.c | 3 + src/backend/utils/adt/pgstatfuncs.c | 20 +++ src/include/commands/progress.h | 7 + src/include/commands/vacuum.h | 8 + src/include/utils/backend_progress.h | 1 + src/test/regress/expected/rules.out | 38 ++++- 15 files changed, 499 insertions(+), 14 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 62f2a3332b..0e2532effc 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6269,6 +6269,127 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, Number of dead tuples collected since the last index vacuum cycle. + + + + indexes_total bigint + + + The number of indexes to be processed in the + vacuuming indexes or cleaning up indexes phase + of the vacuum. + + + + + + indexes_processed bigint + + + The number of indexes processed so far in the + vacuuming indexes or cleaning up indexes phase + of the vacuum. + + + + + + + + pg_stat_progress_vacuum_index + + + + Whenever VACUUM is running, the + pg_stat_progress_vacuum_index view will contain + one row for each backend (including autovacuum worker processes) that is + currently performing the vacuuming indexes or + cleaning up indexes phase of the vacuum. + + + + <structname>pg_stat_progress_vacuum_index</structname> View + + + + + Column Type + + + Description + + + + + + + + pid integer + + + Process ID of backend. + + + + + + datid oid + + + OID of the database to which this backend is connected. + + + + + + datname name + + + Name of the database to which this backend is connected. + + + + + + indexrelid oid + + + OID of the index being processed in the ongoing phase of the vacuum. + + + + + + leader_pid integer + + + Process ID of the parallel vacuum leader, if this process is a + parallel vacuum worker. NULL if this process is a + parallel vacuum leader or does not participate in parallel vacuum. + + + + + + phase text + + + Current processing phase of a vacuum. Only the + vacuuming indexes or cleaning up indexes + phase will be listed in this view. See . + + + + + + tuples_removed oid + + + The number of index tuples removed by the vacuuming indexes phase. + This field is 0 during the cleaning up indexes + phase. + +
diff --git a/src/backend/access/gin/ginvacuum.c b/src/backend/access/gin/ginvacuum.c index b4fa5f6bf8..1d5d003780 100644 --- a/src/backend/access/gin/ginvacuum.c +++ b/src/backend/access/gin/ginvacuum.c @@ -17,8 +17,10 @@ #include "access/gin_private.h" #include "access/ginxlog.h" #include "access/xloginsert.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "miscadmin.h" +#include "pgstat.h" #include "postmaster/autovacuum.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" @@ -60,6 +62,7 @@ ginVacuumItemPointers(GinVacuumState *gvs, ItemPointerData *items, if (gvs->callback(items + i, gvs->callback_state)) { gvs->result->tuples_removed += 1; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, gvs->result->tuples_removed); if (!tmpitems) { /* diff --git a/src/backend/access/gist/gistvacuum.c b/src/backend/access/gist/gistvacuum.c index aac4afab8f..8a0f23388b 100644 --- a/src/backend/access/gist/gistvacuum.c +++ b/src/backend/access/gist/gistvacuum.c @@ -17,9 +17,11 @@ #include "access/genam.h" #include "access/gist_private.h" #include "access/transam.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "lib/integerset.h" #include "miscadmin.h" +#include "pgstat.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" #include "utils/memutils.h" @@ -375,6 +377,7 @@ restart: END_CRIT_SECTION(); vstate->stats->tuples_removed += ntodelete; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, vstate->stats->tuples_removed); /* must recompute maxoff */ maxoff = PageGetMaxOffsetNumber(page); } diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index a259a301fa..23dacee52e 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -632,6 +632,7 @@ loop_top: stats->estimated_count = false; stats->num_index_tuples = num_index_tuples; stats->tuples_removed += tuples_removed; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, stats->tuples_removed); /* hashvacuumcleanup will fill in num_pages */ return stats; diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index f09ad8f271..fb8c9b1030 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -58,6 +58,7 @@ #include "postmaster/autovacuum.h" #include "storage/bufmgr.h" #include "storage/freespace.h" +#include "storage/ipc.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/lsyscache.h" @@ -239,6 +240,25 @@ typedef struct LVSavedErrInfo VacErrPhase phase; } LVSavedErrInfo; +/* Structs for tracking shared Progress information + * amongst worker ( and leader ) processes of a vacuum. + */ +typedef struct VacOneWorkerProgressInfo +{ + int leader_pid; + int indexes_processed; +} VacOneWorkerProgressInfo; + +typedef struct VacWorkerProgressInfo +{ + int num_vacuums; /* number of active VACUUMS with parallel workers */ + int max_vacuums; /* max number of VACUUMS with parallel workers */ + slock_t mutex; + VacOneWorkerProgressInfo vacuums[FLEXIBLE_ARRAY_MEMBER]; +} VacWorkerProgressInfo; + +static VacWorkerProgressInfo *vacworkerprogress; + /* non-export function prototypes */ static void lazy_scan_heap(LVRelState *vacrel, VacuumParams *params, @@ -341,6 +361,7 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM, RelationGetRelid(rel)); + pgstat_progress_update_param(PROGRESS_VACUUM_LEADER_PID, MyProcPid); vacuum_set_xid_limits(rel, params->freeze_min_age, @@ -404,6 +425,10 @@ heap_vacuum_rel(Relation rel, VacuumParams *params, vacrel->rel = rel; vac_open_indexes(vacrel->rel, RowExclusiveLock, &vacrel->nindexes, &vacrel->indrels); + + /* Advertise the number of indexes we are vacuuming */ + pgstat_progress_update_param(PROGRESS_VACUUM_TOTAL_INDEXES, vacrel->nindexes); + if (instrument && vacrel->nindexes > 0) { /* Copy index names used by instrumentation (not error reporting) */ @@ -2000,21 +2025,34 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) return false; } - /* Report that we are now vacuuming indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_VACUUM_INDEX); - if (!ParallelVacuumIsActive(vacrel)) { + /* Report that we are now vacuuming indexes */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_VACUUM_INDEX); + for (int idx = 0; idx < vacrel->nindexes; idx++) { Relation indrel = vacrel->indrels[idx]; IndexBulkDeleteResult *istat = vacrel->indstats[idx]; + /* Advertise the index being vacuumed */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, RelationGetRelid(indrel)); + vacrel->indstats[idx] = lazy_vacuum_one_index(indrel, istat, vacrel->old_live_tuples, vacrel); + /* For the non-parallel variant of a vacuum, the array position + * of the index determines how many indexes are processed so far. + * Add 1 to the posititon as this is 0-based array. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, idx + 1); + + /* Advertise we are done vacuuming indexes */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, 0); + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, 0); + if (lazy_check_wraparound_failsafe(vacrel)) { /* Wraparound emergency -- end current index scan */ @@ -2025,9 +2063,28 @@ lazy_vacuum_all_indexes(LVRelState *vacrel) } else { - /* Outsource everything to parallel variant */ - parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, + /* Report that we are now vacuuming indexes in parallel + * and Outsource everything to parallel variant. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_VACUUM_INDEX_PARALLEL); + + /* + * For the parallel variant of a vacuum, we will be populating shared memory + * for the index completion progress. This is done with a call to + * vacuum_worker_update inside vacuumparallel.c. + * + * Make sure we are properly cleaning up this shared memory on failure + * or we will end up with a leak in the slots. + */ + PG_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + { + + parallel_vacuum_bulkdel_all_indexes(vacrel->pvs, vacrel->old_live_tuples, vacrel->num_index_scans); + } + PG_END_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + vacuum_worker_end(MyProcPid); /* * Do a postcheck to consider applying wraparound failsafe now. Note @@ -2405,33 +2462,55 @@ lazy_cleanup_all_indexes(LVRelState *vacrel) { Assert(vacrel->nindexes > 0); - /* Report that we are now cleaning up indexes */ - pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, - PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); - if (!ParallelVacuumIsActive(vacrel)) { double reltuples = vacrel->new_rel_tuples; bool estimated_count = vacrel->tupcount_pages < vacrel->rel_pages; + /* Report that we are now cleaning up indexes */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_INDEX_CLEANUP); + for (int idx = 0; idx < vacrel->nindexes; idx++) { Relation indrel = vacrel->indrels[idx]; IndexBulkDeleteResult *istat = vacrel->indstats[idx]; + /* Advertise the index being cleaned */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, RelationGetRelid(indrel)); + vacrel->indstats[idx] = lazy_cleanup_one_index(indrel, istat, reltuples, estimated_count, vacrel); + + /* See the lazy_vacuum_all_indexes comments */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_COMPLETED, idx + 1); + + /* Advertise we are done cleaning indexes */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, 0); + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, 0); } } else { - /* Outsource everything to parallel variant */ - parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, + /* Report that we are now cleaning up indexes in parallel + * and Outsource everything to parallel variant. + */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_INDEX_CLEANUP_PARALLEL); + + /* See the lazy_vacuum_all_indexes comments */ + PG_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + { + parallel_vacuum_cleanup_all_indexes(vacrel->pvs, vacrel->new_rel_tuples, vacrel->num_index_scans, (vacrel->tupcount_pages < vacrel->rel_pages)); + } + PG_END_ENSURE_ERROR_CLEANUP(vacuum_worker_end_callback, Int32GetDatum(MyProcPid)); + vacuum_worker_end(MyProcPid); } + } /* @@ -3205,3 +3284,136 @@ restore_vacuum_error_info(LVRelState *vacrel, vacrel->offnum = saved_vacrel->offnum; vacrel->phase = saved_vacrel->phase; } + +void +vacuum_worker_end(int leader_pid) +{ + SpinLockAcquire(&vacworkerprogress->mutex); + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + VacOneWorkerProgressInfo *vac = &vacworkerprogress->vacuums[i]; + + if (vac->leader_pid == leader_pid) + { + *vac = vacworkerprogress->vacuums[vacworkerprogress->num_vacuums - 1]; + vacworkerprogress->num_vacuums--; + SpinLockRelease(&vacworkerprogress->mutex); + break; + } + } + SpinLockRelease(&vacworkerprogress->mutex); +} + +/* + * vacuum_worker_end wrapped as an on_shmem_exit callback function + */ +void +vacuum_worker_end_callback(int code, Datum arg) +{ + vacuum_worker_end(DatumGetInt32(arg)); +} + +/* + * vacuum_worker_update sets the number of indexes processed so far + * in a parallel vacuum. This routine can be + * expanded to other progress tracking amongst parallel + * workers ( and leader ). + */ +void +vacuum_worker_update(int leader_pid) +{ + VacOneWorkerProgressInfo *vac; + + SpinLockAcquire(&vacworkerprogress->mutex); + + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + int next_leader_pid; + + vac = &vacworkerprogress->vacuums[i]; + + next_leader_pid = vac->leader_pid; + + if (next_leader_pid == leader_pid) + { + vac->indexes_processed++; + SpinLockRelease(&vacworkerprogress->mutex); + return; + } + } + + if (vacworkerprogress->num_vacuums >= vacworkerprogress->max_vacuums) + { + SpinLockRelease(&vacworkerprogress->mutex); + elog(ERROR, "out of vacuum worker progress slots"); + } + + vac = &vacworkerprogress->vacuums[vacworkerprogress->num_vacuums]; + vac->leader_pid = leader_pid; + vac->indexes_processed = 1; + vacworkerprogress->num_vacuums++; + SpinLockRelease(&vacworkerprogress->mutex); +} + +/* + * vacuum_progress_cb updates the number of indexes that have been + * vacuumed or cleaned up in a parallel vacuum. + */ +void +vacuum_progress_cb(Datum *values, int offset) +{ + VacOneWorkerProgressInfo *vac; + int leader_pid = values[0]; + + /* If we are vacuuming in parallel, set the number of indexes vacuumed + * from the shared memory counter. + * */ + for (int i = 0; i < vacworkerprogress->num_vacuums; i++) + { + int next_leader_pid; + + vac = &vacworkerprogress->vacuums[i]; + + next_leader_pid = vac->leader_pid; + + if (next_leader_pid == leader_pid) + values[PROGRESS_VACUUM_INDEXES_COMPLETED + offset] = vac->indexes_processed; + } +} + +/* + * VacuumWorkerProgressShmemSize --- report amount of shared memory space needed + */ +Size +VacuumWorkerProgressShmemSize(void) +{ + Size size; + + size = offsetof(VacWorkerProgressInfo, vacuums); + size = add_size(size, mul_size(GetMaxBackends(), sizeof(VacOneWorkerProgressInfo))); + return size; +} + +/* + * VacuumWorkerProgressShmemInit --- initialize this module's shared memory + */ +void +VacuumWorkerProgressShmemInit(void) +{ + bool found; + + vacworkerprogress = (VacWorkerProgressInfo *) ShmemInitStruct("Vacuum Worker Progress Stats", + VacuumWorkerProgressShmemSize(), + &found); + + if (!IsUnderPostmaster) + { + /* Initialize shared memory area */ + Assert(!found); + + vacworkerprogress->max_vacuums = GetMaxBackends(); + SpinLockInit(&vacworkerprogress->mutex); + } + else + Assert(found); +} diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index c9b4964c1e..09edf49082 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -1273,6 +1273,7 @@ backtrack: nupdatable); stats->tuples_removed += nhtidsdead; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, stats->tuples_removed); /* must recompute maxoff */ maxoff = PageGetMaxOffsetNumber(page); diff --git a/src/backend/access/spgist/spgvacuum.c b/src/backend/access/spgist/spgvacuum.c index 0049630532..db73f8ef59 100644 --- a/src/backend/access/spgist/spgvacuum.c +++ b/src/backend/access/spgist/spgvacuum.c @@ -21,8 +21,10 @@ #include "access/transam.h" #include "access/xloginsert.h" #include "catalog/storage_xlog.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "miscadmin.h" +#include "pgstat.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" @@ -160,6 +162,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer, bds->stats->tuples_removed += 1; deletable[i] = true; nDeletable++; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, bds->stats->tuples_removed); } else { @@ -430,6 +433,7 @@ vacuumLeafRoot(spgBulkDeleteState *bds, Relation index, Buffer buffer) bds->stats->tuples_removed += 1; toDelete[xlrec.nDelete] = i; xlrec.nDelete++; + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, bds->stats->tuples_removed); } else { diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 3cb69b1f87..0f91f66c73 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1123,13 +1123,53 @@ CREATE VIEW pg_stat_progress_vacuum AS WHEN 4 THEN 'cleaning up indexes' WHEN 5 THEN 'truncating heap' WHEN 6 THEN 'performing final cleanup' + WHEN 7 THEN 'vacuuming indexes' + WHEN 8 THEN 'cleaning up indexes' END AS phase, S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned, S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count, - S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples + S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples, + S.param8 AS indexes_total, S.param9 AS indexes_processed FROM pg_stat_get_progress_info('VACUUM') AS S LEFT JOIN pg_database D ON S.datid = D.oid; +CREATE VIEW pg_stat_progress_vacuum_index AS + SELECT + S.pid, + S.datid, + S.datname, + S.indexrelid, + S.leader_pid, + CASE S.phase WHEN 7 THEN 'vacuuming indexes' + WHEN 8 THEN 'cleaning up indexes' + END AS phase, + S.tuples_removed + FROM ( + SELECT + S.pid AS pid, + S.datid AS datid, + D.datname AS datname, + S.param10 AS indexrelid, + S.param12 AS leader_pid, + S.param1 AS phase, + S.param11 AS tuples_removed + FROM pg_stat_get_progress_info('VACUUM') AS S + LEFT JOIN pg_database D ON S.datid = D.oid + WHERE S.param1 IN (2, 4, 7, 8) AND S.param10 > 0 + UNION ALL + SELECT + S.pid AS pid, + S.datid AS datid, + D.datname AS datname, + S.param10 AS indexrelid, + S.param12 AS leader_pid, + S.param1 AS phase, + S.param11 AS tuples_removed + FROM pg_stat_get_progress_info('VACUUM_PARALLEL') AS S + LEFT JOIN pg_database D ON S.datid = D.oid + ) AS S + WHERE S.phase IN (2, 4, 7, 8) AND S.indexrelid > 0; + CREATE VIEW pg_stat_progress_cluster AS SELECT S.pid AS pid, diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 974a29e7a9..1332203047 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -29,6 +29,7 @@ #include "access/amapi.h" #include "access/table.h" #include "catalog/index.h" +#include "commands/progress.h" #include "commands/vacuum.h" #include "optimizer/paths.h" #include "pgstat.h" @@ -101,6 +102,9 @@ typedef struct PVShared /* Counter for vacuuming and cleanup */ pg_atomic_uint32 idx; + + /* Leader PID of the vacuum */ + int leader_pid; } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -357,6 +361,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, (nindexes_mwm > 0) ? maintenance_work_mem / Min(parallel_workers, nindexes_mwm) : maintenance_work_mem; + shared->leader_pid = MyProcPid; pg_atomic_init_u32(&(shared->cost_balance), 0); pg_atomic_init_u32(&(shared->active_nworkers), 0); @@ -716,6 +721,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan VacuumSharedCostBalance = NULL; VacuumActiveNWorkers = NULL; } + } /* @@ -840,13 +846,24 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, pvs->indname = pstrdup(RelationGetRelationName(indrel)); pvs->status = indstats->status; + /* Advertise the index we are cleaning or vacuuming */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, RelationGetRelid(indrel)); + switch (indstats->status) { case PARALLEL_INDVAC_STATUS_NEED_BULKDELETE: + /* Report that we are now vacuuming indexes in parallel */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_VACUUM_INDEX_PARALLEL); istat_res = vac_bulkdel_one_index(&ivinfo, istat, pvs->dead_items); + vacuum_worker_update(pvs->shared->leader_pid); break; case PARALLEL_INDVAC_STATUS_NEED_CLEANUP: + /* Report that we are now cleaning indexes in parallel */ + pgstat_progress_update_param(PROGRESS_VACUUM_PHASE, + PROGRESS_VACUUM_PHASE_INDEX_CLEANUP_PARALLEL); istat_res = vac_cleanup_one_index(&ivinfo, istat); + vacuum_worker_update(pvs->shared->leader_pid); break; default: elog(ERROR, "unexpected parallel vacuum index status %d for index \"%s\"", @@ -881,6 +898,10 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel, */ indstats->status = PARALLEL_INDVAC_STATUS_COMPLETED; + /* Advertise we are no longer vacuuming/cleaning an index */ + pgstat_progress_update_param(PROGRESS_VACUUM_INDEXRELID, 0); + pgstat_progress_update_param(PROGRESS_VACUUM_TUPLES_REMOVED, 0); + /* Reset error traceback information */ pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED; pfree(pvs->indname); @@ -965,6 +986,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) * workers. */ rel = table_open(shared->relid, ShareUpdateExclusiveLock); + pgstat_progress_start_command(PROGRESS_COMMAND_VACUUM_PARALLEL, + RelationGetRelid(rel)); + pgstat_progress_update_param(PROGRESS_VACUUM_LEADER_PID, shared->leader_pid); /* * Open all indexes. indrels are sorted in order by OID, which should be @@ -1036,6 +1060,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) vac_close_indexes(nindexes, indrels, RowExclusiveLock); table_close(rel, ShareUpdateExclusiveLock); FreeAccessStrategy(pvs.bstrategy); + pgstat_progress_end_command(); } /* diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 9f26e41c46..8682578ac3 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -23,6 +23,7 @@ #include "access/syncscan.h" #include "access/twophase.h" #include "commands/async.h" +#include "commands/vacuum.h" #include "miscadmin.h" #include "pgstat.h" #include "postmaster/autovacuum.h" @@ -143,6 +144,7 @@ CalculateShmemSize(int *num_semaphores) size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); + size = add_size(size, VacuumWorkerProgressShmemSize()); #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -293,6 +295,7 @@ CreateSharedMemoryAndSemaphores(void) BTreeShmemInit(); SyncScanShmemInit(); AsyncShmemInit(); + VacuumWorkerProgressShmemInit(); #ifdef EXEC_BACKEND diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c index 15cb17ace4..5dece71596 100644 --- a/src/backend/utils/adt/pgstatfuncs.c +++ b/src/backend/utils/adt/pgstatfuncs.c @@ -18,6 +18,7 @@ #include "access/xlog.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" +#include "commands/vacuum.h" #include "common/ip.h" #include "funcapi.h" #include "miscadmin.h" @@ -452,6 +453,14 @@ pg_stat_get_backend_idset(PG_FUNCTION_ARGS) /* * Returns command progress information for the named command. + * + * A command type can optionally define a callback function + * which will derive Datum values rather than use values + * directly from the backends progress array. + * + * Derived values are useful to calculate values form multiple backends + * as is the case with parallel operations, in which progress values + * are calculated form multiple workers. */ Datum pg_stat_get_progress_info(PG_FUNCTION_ARGS) @@ -466,6 +475,7 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; MemoryContext per_query_ctx; MemoryContext oldcontext; + void (*callback)(Datum *, int) = NULL; /* check to see if caller supports us returning a tuplestore */ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) @@ -483,7 +493,14 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) /* Translate command name into command type code. */ if (pg_strcasecmp(cmd, "VACUUM") == 0) + { cmdtype = PROGRESS_COMMAND_VACUUM; + callback = vacuum_progress_cb; + } + else if (pg_strcasecmp(cmd, "VACUUM_PARALLEL") == 0) + { + cmdtype = PROGRESS_COMMAND_VACUUM_PARALLEL; + } else if (pg_strcasecmp(cmd, "ANALYZE") == 0) cmdtype = PROGRESS_COMMAND_ANALYZE; else if (pg_strcasecmp(cmd, "CLUSTER") == 0) @@ -552,6 +569,9 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS) nulls[i + 3] = true; } + if (callback) + callback(values, 3); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index a28938caf4..37659882ee 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -25,6 +25,11 @@ #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS 4 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES 5 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES 6 +#define PROGRESS_VACUUM_TOTAL_INDEXES 7 +#define PROGRESS_VACUUM_INDEXES_COMPLETED 8 +#define PROGRESS_VACUUM_INDEXRELID 9 +#define PROGRESS_VACUUM_TUPLES_REMOVED 10 +#define PROGRESS_VACUUM_LEADER_PID 11 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */ #define PROGRESS_VACUUM_PHASE_SCAN_HEAP 1 @@ -33,6 +38,8 @@ #define PROGRESS_VACUUM_PHASE_INDEX_CLEANUP 4 #define PROGRESS_VACUUM_PHASE_TRUNCATE 5 #define PROGRESS_VACUUM_PHASE_FINAL_CLEANUP 6 +#define PROGRESS_VACUUM_PHASE_VACUUM_INDEX_PARALLEL 7 +#define PROGRESS_VACUUM_PHASE_INDEX_CLEANUP_PARALLEL 8 /* Progress parameters for analyze */ #define PROGRESS_ANALYZE_PHASE 0 diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index 5d0bdfa427..119924694d 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -336,4 +336,12 @@ extern double anl_random_fract(void); extern double anl_init_selection_state(int n); extern double anl_get_next_S(double t, int n, double *stateptr); +/* in commands/vacuumparallel.c */ +extern Size VacuumWorkerProgressShmemSize(void); +extern void VacuumWorkerProgressShmemInit(void); +extern void vacuum_worker_end(int leader_pid); +extern void vacuum_worker_update(int leader_pid); +extern void vacuum_progress_cb(Datum *values, int offset); +extern void vacuum_worker_end_callback(int code, Datum arg); + #endif /* VACUUM_H */ diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h index 47bf8029b0..4651e45c40 100644 --- a/src/include/utils/backend_progress.h +++ b/src/include/utils/backend_progress.h @@ -23,6 +23,7 @@ typedef enum ProgressCommandType { PROGRESS_COMMAND_INVALID, PROGRESS_COMMAND_VACUUM, + PROGRESS_COMMAND_VACUUM_PARALLEL, PROGRESS_COMMAND_ANALYZE, PROGRESS_COMMAND_CLUSTER, PROGRESS_COMMAND_CREATE_INDEX, diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 1420288d67..35193dc6b7 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1995,6 +1995,8 @@ pg_stat_progress_vacuum| SELECT s.pid, WHEN 4 THEN 'cleaning up indexes'::text WHEN 5 THEN 'truncating heap'::text WHEN 6 THEN 'performing final cleanup'::text + WHEN 7 THEN 'vacuuming indexes'::text + WHEN 8 THEN 'cleaning up indexes'::text ELSE NULL::text END AS phase, s.param2 AS heap_blks_total, @@ -2002,9 +2004,43 @@ pg_stat_progress_vacuum| SELECT s.pid, s.param4 AS heap_blks_vacuumed, s.param5 AS index_vacuum_count, s.param6 AS max_dead_tuples, - s.param7 AS num_dead_tuples + s.param7 AS num_dead_tuples, + s.param8 AS indexes_total, + s.param9 AS indexes_processed FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); +pg_stat_progress_vacuum_index| SELECT s.pid, + s.datid, + s.datname, + s.indexrelid, + s.leader_pid, + CASE s.phase + WHEN 7 THEN 'vacuuming indexes'::text + WHEN 8 THEN 'cleaning up indexes'::text + ELSE NULL::text + END AS phase, + s.tuples_removed + FROM ( SELECT s_1.pid, + s_1.datid, + d.datname, + s_1.param10 AS indexrelid, + s_1.param12 AS leader_pid, + s_1.param1 AS phase, + s_1.param11 AS tuples_removed + FROM (pg_stat_get_progress_info('VACUUM'::text) s_1(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s_1.datid = d.oid))) + WHERE ((s_1.param1 = ANY (ARRAY[(2)::bigint, (4)::bigint, (7)::bigint, (8)::bigint])) AND (s_1.param10 > 0)) + UNION ALL + SELECT s_1.pid, + s_1.datid, + d.datname, + s_1.param10 AS indexrelid, + s_1.param12 AS leader_pid, + s_1.param1 AS phase, + s_1.param11 AS tuples_removed + FROM (pg_stat_get_progress_info('VACUUM_PARALLEL'::text) s_1(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) + LEFT JOIN pg_database d ON ((s_1.datid = d.oid)))) s + WHERE ((s.phase = ANY (ARRAY[(2)::bigint, (4)::bigint, (7)::bigint, (8)::bigint])) AND (s.indexrelid > 0)); pg_stat_replication| SELECT s.pid, s.usesysid, u.rolname AS usename, -- 2.32.0