From a025cc7eda6598eb676663d96cd32dab0d20e92e Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Tue, 18 Feb 2025 17:45:36 -0800 Subject: [PATCH v16 2/4] vacuumparallel.c: Support parallel vacuuming for tables to collect dead items. Previously, parallel vacuum was available only for index vacuuming and index cleanup, ParallelVacuumState was initialized only when the table has at least two indexes that are eligible for parallel index vacuuming and cleanup. This commit extends vacuumparallel.c to support parallel table vacuuming. parallel_vacuum_init() now initializes ParallelVacuumState to perform parallel heap scan to collect dead items, or paralel index vacuuming/cleanup, or both. During the initialization, it asks the table AM for the number of parallel workers required for parallel table vacuuming. If >0, it enables parallel table vacuuming and calls further table AM APIs such as parallel_vacuum_estimate. For parallel table vacuuming, this commit introduces parallel_vacuum_collect_dead_items_begin() function, which can be used to collect dead items in the table (for example, the first pass over heap table in lazy vacuum for heap tables). Heap table AM disables the parallel heap vacuuming for now, but an upcoming patch uses it. Reviewed-by: Amit Kapila Reviewed-by: Hayato Kuroda Reviewed-by: Peter Smith Reviewed-by: Tomas Vondra Reviewed-by: Dilip Kumar Reviewed-by: Melanie Plageman Discussion: https://postgr.es/m/CAD21AoAEfCNv-GgaDheDJ+s-p_Lv1H24AiJeNoPGCmZNSwL1YA@mail.gmail.com --- src/backend/access/heap/vacuumlazy.c | 2 +- src/backend/commands/vacuumparallel.c | 392 +++++++++++++++++++------- src/include/commands/vacuum.h | 5 +- src/tools/pgindent/typedefs.list | 1 + 4 files changed, 292 insertions(+), 108 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 8fd44ccf5dc..3b948970437 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -3514,7 +3514,7 @@ dead_items_alloc(LVRelState *vacrel, int nworkers) vacrel->nindexes, nworkers, vac_work_mem, vacrel->verbose ? INFO : DEBUG2, - vacrel->bstrategy); + vacrel->bstrategy, (void *) vacrel); /* * If parallel mode started, dead_items and dead_items_info spaces are diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index 2b9d548cdeb..28997918f1c 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -4,17 +4,18 @@ * Support routines for parallel vacuum execution. * * This file contains routines that are intended to support setting up, using, - * and tearing down a ParallelVacuumState. + * and tearing down a ParallelVacuumState. ParallelVacuumState contains shared + * information as well as the memory space for storing dead items allocated in + * the DSA area. We launch * - * In a parallel vacuum, we perform both index bulk deletion and index cleanup - * with parallel worker processes. Individual indexes are processed by one - * vacuum process. ParallelVacuumState contains shared information as well as - * the memory space for storing dead items allocated in the DSA area. We - * launch parallel worker processes at the start of parallel index - * bulk-deletion and index cleanup and once all indexes are processed, the - * parallel worker processes exit. Each time we process indexes in parallel, - * the parallel context is re-initialized so that the same DSM can be used for - * multiple passes of index bulk-deletion and index cleanup. + * In a parallel vacuum, we perform table scan, index bulk-deletion, index + * cleanup, or all of them with parallel worker processes depending on the + * number of parallel workers required for each phase. So different numbers of + * workers might be required for the table scanning and index processing. + * We launch parallel worker processes at the start of a phase, and once we + * complete all work in the phase, parallel workers exit. Each time we process + * table or indexes in parallel, the parallel context is re-initialized so that + * the same DSM can be used for multiple passes of each phase. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -26,8 +27,10 @@ */ #include "postgres.h" +#include "access/parallel.h" #include "access/amapi.h" #include "access/table.h" +#include "access/tableam.h" #include "access/xact.h" #include "commands/progress.h" #include "commands/vacuum.h" @@ -50,6 +53,13 @@ #define PARALLEL_VACUUM_KEY_WAL_USAGE 4 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +/* The kind of parallel vacuum phases */ +typedef enum +{ + PV_WORK_PHASE_PROCESS_INDEXES, /* index vacuuming or cleanup */ + PV_WORK_PHASE_COLLECT_DEAD_ITEMS, /* collect dead tuples */ +} PVWorkPhase; + /* * Shared information among parallel workers. So this is allocated in the DSM * segment. @@ -65,6 +75,12 @@ typedef struct PVShared int elevel; uint64 queryid; + /* + * Tell parallel workers what phase to perform: processing indexes or + * collecting dead tuples from the table. + */ + PVWorkPhase work_phase; + /* * Fields for both index vacuum and cleanup. * @@ -164,6 +180,9 @@ struct ParallelVacuumState /* NULL for worker processes */ ParallelContext *pcxt; + /* Do we need to reinitialize parallel DSM? */ + bool need_reinitialize_dsm; + /* Parent Heap Relation */ Relation heaprel; @@ -178,7 +197,7 @@ struct ParallelVacuumState * Shared index statistics among parallel vacuum workers. The array * element is allocated for every index, even those indexes where parallel * index vacuuming is unsafe or not worthwhile (e.g., - * will_parallel_vacuum[] is false). During parallel vacuum, + * idx_will_parallel_vacuum[] is false). During parallel vacuum, * IndexBulkDeleteResult of each index is kept in DSM and is copied into * local memory at the end of parallel vacuum. */ @@ -193,12 +212,18 @@ struct ParallelVacuumState /* Points to WAL usage area in DSM */ WalUsage *wal_usage; + /* + * The number of workers for parallel table vacuuming. If 0, the parallel + * table vacuum is disabled. + */ + int nworkers_for_table; + /* * False if the index is totally unsuitable target for all parallel * processing. For example, the index could be < * min_parallel_index_scan_size cutoff. */ - bool *will_parallel_vacuum; + bool *idx_will_parallel_vacuum; /* * The number of indexes that support parallel index bulk-deletion and @@ -221,8 +246,10 @@ struct ParallelVacuumState PVIndVacStatus status; }; -static int parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum); +static int parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_for_table, + bool *idx_will_parallel_vacuum, + void *state); static void parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool vacuum); static void parallel_vacuum_process_safe_indexes(ParallelVacuumState *pvs); @@ -231,18 +258,25 @@ static void parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation PVIndStats *indstats); static bool parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, bool vacuum); +static void parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers, + PVWorkPhase work_phase); +static void parallel_vacuum_end_worke_phase(ParallelVacuumState *pvs); static void parallel_vacuum_error_callback(void *arg); /* * Try to enter parallel mode and create a parallel context. Then initialize * shared memory state. * + * nrequested_workers is the requested parallel degree. 0 means that the parallel + * degrees for table and indexes vacuum are decided differently. See the comments + * of parallel_vacuum_compute_workers() for details. + * * On success, return parallel vacuum state. Otherwise return NULL. */ ParallelVacuumState * parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, - int elevel, BufferAccessStrategy bstrategy) + int elevel, BufferAccessStrategy bstrategy, void *state) { ParallelVacuumState *pvs; ParallelContext *pcxt; @@ -251,38 +285,38 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PVIndStats *indstats; BufferUsage *buffer_usage; WalUsage *wal_usage; - bool *will_parallel_vacuum; + bool *idx_will_parallel_vacuum; Size est_indstats_len; Size est_shared_len; int nindexes_mwm = 0; int parallel_workers = 0; + int nworkers_for_table; int querylen; - /* - * A parallel vacuum must be requested and there must be indexes on the - * relation - */ + /* A parallel vacuum must be requested */ Assert(nrequested_workers >= 0); - Assert(nindexes > 0); /* * Compute the number of parallel vacuum workers to launch */ - will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); - parallel_workers = parallel_vacuum_compute_workers(indrels, nindexes, + idx_will_parallel_vacuum = (bool *) palloc0(sizeof(bool) * nindexes); + parallel_workers = parallel_vacuum_compute_workers(rel, indrels, nindexes, nrequested_workers, - will_parallel_vacuum); + &nworkers_for_table, + idx_will_parallel_vacuum, + state); + if (parallel_workers <= 0) { /* Can't perform vacuum in parallel -- return NULL */ - pfree(will_parallel_vacuum); + pfree(idx_will_parallel_vacuum); return NULL; } pvs = (ParallelVacuumState *) palloc0(sizeof(ParallelVacuumState)); pvs->indrels = indrels; pvs->nindexes = nindexes; - pvs->will_parallel_vacuum = will_parallel_vacuum; + pvs->idx_will_parallel_vacuum = idx_will_parallel_vacuum; pvs->bstrategy = bstrategy; pvs->heaprel = rel; @@ -291,6 +325,8 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, parallel_workers); Assert(pcxt->nworkers > 0); pvs->pcxt = pcxt; + pvs->need_reinitialize_dsm = false; + pvs->nworkers_for_table = nworkers_for_table; /* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */ est_indstats_len = mul_size(sizeof(PVIndStats), nindexes); @@ -327,6 +363,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, else querylen = 0; /* keep compiler quiet */ + /* Estimate AM-specific space for parallel table vacuum */ + if (pvs->nworkers_for_table > 0) + table_parallel_vacuum_estimate(rel, pcxt, pvs->nworkers_for_table, state); + InitializeParallelDSM(pcxt); /* Prepare index vacuum stats */ @@ -345,7 +385,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, ((vacoptions & VACUUM_OPTION_PARALLEL_COND_CLEANUP) == 0)); Assert(vacoptions <= VACUUM_OPTION_MAX_VALID_VALUE); - if (!will_parallel_vacuum[i]) + if (!idx_will_parallel_vacuum[i]) continue; if (indrel->rd_indam->amusemaintenanceworkmem) @@ -419,6 +459,10 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, PARALLEL_VACUUM_KEY_QUERY_TEXT, sharedquery); } + /* Initialize AM-specific DSM space for parallel table vacuum */ + if (pvs->nworkers_for_table > 0) + table_parallel_vacuum_initialize(rel, pcxt, pvs->nworkers_for_table, state); + /* Success -- return parallel vacuum state */ return pvs; } @@ -456,7 +500,7 @@ parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats) DestroyParallelContext(pvs->pcxt); ExitParallelMode(); - pfree(pvs->will_parallel_vacuum); + pfree(pvs->idx_will_parallel_vacuum); pfree(pvs); } @@ -533,26 +577,35 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup } /* - * Compute the number of parallel worker processes to request. Both index - * vacuum and index cleanup can be executed with parallel workers. - * The index is eligible for parallel vacuum iff its size is greater than - * min_parallel_index_scan_size as invoking workers for very small indexes - * can hurt performance. + * Compute the number of parallel worker processes to request for table + * vacuum and index vacuum/cleanup. Return the maximum number of parallel + * workers for table vacuuming and index vacuuming. + * + * nrequested is the number of parallel workers that user requested, which + * applies to both the number of workers for table vacuum and index vacuum. + * If nrequested is 0, we compute the parallel degree for them differently + * as described below. * - * nrequested is the number of parallel workers that user requested. If - * nrequested is 0, we compute the parallel degree based on nindexes, that is - * the number of indexes that support parallel vacuum. This function also - * sets will_parallel_vacuum to remember indexes that participate in parallel - * vacuum. + * For parallel table vacuum, we ask AM-specific routine to compute the + * number of parallel worker processes. The result is set to nworkers_table_p. + * + * For parallel index vacuum, the index is eligible for parallel vacuum iff + * its size is greater than min_parallel_index_scan_size as invoking workers + * for very small indexes can hurt performance. This function sets + * idx_will_parallel_vacuum to remember indexes that participate in parallel vacuum. */ static int -parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, - bool *will_parallel_vacuum) +parallel_vacuum_compute_workers(Relation rel, Relation *indrels, int nindexes, + int nrequested, int *nworkers_table_p, + bool *idx_will_parallel_vacuum, void *state) { int nindexes_parallel = 0; int nindexes_parallel_bulkdel = 0; int nindexes_parallel_cleanup = 0; - int parallel_workers; + int nworkers_table = 0; + int nworkers_index = 0; + + *nworkers_table_p = 0; /* * We don't allow performing parallel operation in standalone backend or @@ -561,6 +614,13 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, if (!IsUnderPostmaster || max_parallel_maintenance_workers == 0) return 0; + /* Compute the number of workers for parallel table scan */ + nworkers_table = table_parallel_vacuum_compute_workers(rel, nrequested, + state); + + /* Cap by max_parallel_maintenance_workers */ + nworkers_table = Min(nworkers_table, max_parallel_maintenance_workers); + /* * Compute the number of indexes that can participate in parallel vacuum. */ @@ -574,7 +634,7 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, RelationGetNumberOfBlocks(indrel) < min_parallel_index_scan_size) continue; - will_parallel_vacuum[i] = true; + idx_will_parallel_vacuum[i] = true; if ((vacoptions & VACUUM_OPTION_PARALLEL_BULKDEL) != 0) nindexes_parallel_bulkdel++; @@ -589,18 +649,18 @@ parallel_vacuum_compute_workers(Relation *indrels, int nindexes, int nrequested, /* The leader process takes one index */ nindexes_parallel--; - /* No index supports parallel vacuum */ - if (nindexes_parallel <= 0) - return 0; - - /* Compute the parallel degree */ - parallel_workers = (nrequested > 0) ? - Min(nrequested, nindexes_parallel) : nindexes_parallel; + if (nindexes_parallel > 0) + { + /* Take into account the requested number of workers */ + nworkers_index = (nrequested > 0) ? + Min(nrequested, nindexes_parallel) : nindexes_parallel; - /* Cap by max_parallel_maintenance_workers */ - parallel_workers = Min(parallel_workers, max_parallel_maintenance_workers); + /* Cap by max_parallel_maintenance_workers */ + nworkers_index = Min(nworkers_index, max_parallel_maintenance_workers); + } - return parallel_workers; + *nworkers_table_p = nworkers_table; + return Max(nworkers_table, nworkers_index); } /* @@ -657,7 +717,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan Assert(indstats->status == PARALLEL_INDVAC_STATUS_INITIAL); indstats->status = new_status; indstats->parallel_workers_can_process = - (pvs->will_parallel_vacuum[i] && + (pvs->idx_will_parallel_vacuum[i] && parallel_vacuum_index_is_parallel_safe(pvs->indrels[i], num_index_scans, vacuum)); @@ -669,40 +729,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan /* Setup the shared cost-based vacuum delay and launch workers */ if (nworkers > 0) { - /* Reinitialize parallel context to relaunch parallel workers */ - if (num_index_scans > 0) - ReinitializeParallelDSM(pvs->pcxt); - - /* - * Set up shared cost balance and the number of active workers for - * vacuum delay. We need to do this before launching workers as - * otherwise, they might not see the updated values for these - * parameters. - */ - pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); - pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); - - /* - * The number of workers can vary between bulkdelete and cleanup - * phase. - */ - ReinitializeParallelWorkers(pvs->pcxt, nworkers); - - LaunchParallelWorkers(pvs->pcxt); - - if (pvs->pcxt->nworkers_launched > 0) - { - /* - * Reset the local cost values for leader backend as we have - * already accumulated the remaining balance of heap. - */ - VacuumCostBalance = 0; - VacuumCostBalanceLocal = 0; - - /* Enable shared cost balance for leader backend */ - VacuumSharedCostBalance = &(pvs->shared->cost_balance); - VacuumActiveNWorkers = &(pvs->shared->active_nworkers); - } + /* Start parallel vacuum workers for processing indexes */ + parallel_vacuum_begin_work_phase(pvs, nworkers, + PV_WORK_PHASE_PROCESS_INDEXES); if (vacuum) ereport(pvs->shared->elevel, @@ -732,13 +761,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan * to finish, or we might get incomplete data.) */ if (nworkers > 0) - { - /* Wait for all vacuum workers to finish */ - WaitForParallelWorkersToFinish(pvs->pcxt); - - for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) - InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); - } + parallel_vacuum_end_worke_phase(pvs); /* * Reset all index status back to initial (while checking that we have @@ -755,15 +778,8 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan indstats->status = PARALLEL_INDVAC_STATUS_INITIAL; } - /* - * Carry the shared balance value to heap scan and disable shared costing - */ - if (VacuumSharedCostBalance) - { - VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); - VacuumSharedCostBalance = NULL; - VacuumActiveNWorkers = NULL; - } + /* Parallel DSM will need to be reinitialized for the next execution */ + pvs->need_reinitialize_dsm = true; } /* @@ -979,6 +995,77 @@ parallel_vacuum_index_is_parallel_safe(Relation indrel, int num_index_scans, return true; } +/* + * Begin the parallel scan to collect dead items. Return the number of + * launched parallel workers. + * + * The caller must call parallel_vacuum_collect_dead_items_end() to finish + * the parallel scan. + */ +int +parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + + if (pvs->nworkers_for_table == 0) + return 0; + + /* Start parallel vacuum workers for collecting dead items */ + Assert(pvs->nworkers_for_table <= pvs->pcxt->nworkers); + parallel_vacuum_begin_work_phase(pvs, pvs->nworkers_for_table, + PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + /* Include the worker count for the leader itself */ + if (pvs->pcxt->nworkers_launched > 0) + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + return pvs->pcxt->nworkers_launched; +} + +/* + * Wait for all workers for parallel vacuum workers launched by + * parallel_vacuum_collect_dead_items_begin(), and gather workers' statistics. + */ +void +parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs) +{ + Assert(!IsParallelWorker()); + Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + if (pvs->nworkers_for_table == 0) + return; + + /* Wait for parallel workers to finish */ + parallel_vacuum_end_worke_phase(pvs); + + /* Decrement the worker count for the leader itself */ + if (VacuumActiveNWorkers) + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + +/* + * The function is for parallel workers to execute the parallel scan to + * collect dead tuples. + */ +static void +parallel_vacuum_process_table(ParallelVacuumState *pvs, void *state) +{ + Assert(VacuumActiveNWorkers); + Assert(pvs->shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS); + + /* Increment the active worker before starting the table vacuum */ + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + + /* Do the parallel scan to collect dead tuples */ + table_parallel_vacuum_collect_dead_items(pvs->heaprel, pvs, state); + + /* + * We have completed the table vacuum so decrement the active worker + * count. + */ + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); +} + /* * Perform work within a launched parallel process. * @@ -998,6 +1085,7 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) WalUsage *wal_usage; int nindexes; char *sharedquery; + void *state; ErrorContextCallback errcallback; /* @@ -1030,7 +1118,6 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) * matched to the leader's one. */ vac_open_indexes(rel, RowExclusiveLock, &nindexes, &indrels); - Assert(nindexes > 0); /* * Apply the desired value of maintenance_work_mem within this process. @@ -1076,6 +1163,17 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) pvs.bstrategy = GetAccessStrategyWithSize(BAS_VACUUM, shared->ring_nbuffers * (BLCKSZ / 1024)); + /* Initialize AM-specific vacuum state for parallel table vacuuming */ + if (shared->work_phase == PV_WORK_PHASE_COLLECT_DEAD_ITEMS) + { + ParallelWorkerContext pwcxt; + + pwcxt.toc = toc; + pwcxt.seg = seg; + table_parallel_vacuum_initialize_worker(rel, &pvs, &pwcxt, + &state); + } + /* Setup error traceback support for ereport() */ errcallback.callback = parallel_vacuum_error_callback; errcallback.arg = &pvs; @@ -1085,8 +1183,19 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during parallel execution */ InstrStartParallelQuery(); - /* Process indexes to perform vacuum/cleanup */ - parallel_vacuum_process_safe_indexes(&pvs); + switch (pvs.shared->work_phase) + { + case PV_WORK_PHASE_COLLECT_DEAD_ITEMS: + /* Scan the table to collect dead items */ + parallel_vacuum_process_table(&pvs, state); + break; + case PV_WORK_PHASE_PROCESS_INDEXES: + /* Process indexes to perform vacuum/cleanup */ + parallel_vacuum_process_safe_indexes(&pvs); + break; + default: + elog(ERROR, "unrecognized parallel vacuum phase %d", pvs.shared->work_phase); + } /* Report buffer/WAL usage during parallel execution */ buffer_usage = shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_BUFFER_USAGE, false); @@ -1109,6 +1218,77 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) FreeAccessStrategy(pvs.bstrategy); } +/* + * Launch parallel vacuum workers for the given phase. If at least one + * worker launched, enable the shared vacuum delay costing. + */ +static void +parallel_vacuum_begin_work_phase(ParallelVacuumState *pvs, int nworkers, + PVWorkPhase work_phase) +{ + /* Set the work phase */ + pvs->shared->work_phase = work_phase; + + /* Reinitialize parallel context to relaunch parallel workers */ + if (pvs->need_reinitialize_dsm) + ReinitializeParallelDSM(pvs->pcxt); + + /* + * Set up shared cost balance and the number of active workers for vacuum + * delay. We need to do this before launching workers as otherwise, they + * might not see the updated values for these parameters. + */ + pg_atomic_write_u32(&(pvs->shared->cost_balance), VacuumCostBalance); + pg_atomic_write_u32(&(pvs->shared->active_nworkers), 0); + + /* + * The number of workers can vary between bulkdelete and cleanup phase. + */ + ReinitializeParallelWorkers(pvs->pcxt, nworkers); + + LaunchParallelWorkers(pvs->pcxt); + + /* Enable shared vacuum costing if we are able to launch any worker */ + if (pvs->pcxt->nworkers_launched > 0) + { + /* + * Reset the local cost values for leader backend as we have already + * accumulated the remaining balance of heap. + */ + VacuumCostBalance = 0; + VacuumCostBalanceLocal = 0; + + /* Enable shared cost balance for leader backend */ + VacuumSharedCostBalance = &(pvs->shared->cost_balance); + VacuumActiveNWorkers = &(pvs->shared->active_nworkers); + } +} + +/* + * Wait for parallel vacuum workers to finish, accumulate the statistics, + * and disable shared vacuum delay costing if enabled. + */ +static void +parallel_vacuum_end_worke_phase(ParallelVacuumState *pvs) +{ + /* Wait for all vacuum workers to finish */ + WaitForParallelWorkersToFinish(pvs->pcxt); + + for (int i = 0; i < pvs->pcxt->nworkers_launched; i++) + InstrAccumParallelQuery(&pvs->buffer_usage[i], &pvs->wal_usage[i]); + + /* Carry the shared balance value and disable shared costing */ + if (VacuumSharedCostBalance) + { + VacuumCostBalance = pg_atomic_read_u32(VacuumSharedCostBalance); + VacuumSharedCostBalance = NULL; + VacuumActiveNWorkers = NULL; + } + + /* Parallel DSM will need to be reinitialized for the next execution */ + pvs->need_reinitialize_dsm = true; +} + /* * Error context callback for errors occurring during parallel index vacuum. * The error context messages should match the messages set in the lazy vacuum diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index bc37a80dc74..e785a4a583f 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -382,7 +382,8 @@ extern void VacuumUpdateCosts(void); extern ParallelVacuumState *parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, int nrequested_workers, int vac_work_mem, int elevel, - BufferAccessStrategy bstrategy); + BufferAccessStrategy bstrategy, + void *state); extern void parallel_vacuum_end(ParallelVacuumState *pvs, IndexBulkDeleteResult **istats); extern TidStore *parallel_vacuum_get_dead_items(ParallelVacuumState *pvs, VacDeadItemsInfo **dead_items_info_p); @@ -394,6 +395,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tuples, int num_index_scans, bool estimated_count); +extern int parallel_vacuum_collect_dead_items_begin(ParallelVacuumState *pvs); +extern void parallel_vacuum_collect_dead_items_end(ParallelVacuumState *pvs); extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index b66cecd8799..9f52ceba1c6 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2012,6 +2012,7 @@ PVIndStats PVIndVacStatus PVOID PVShared +PVWorkPhase PX_Alias PX_Cipher PX_Combo -- 2.43.5