From df0f24e519a1a8fc94dfc21a321fda7677007982 Mon Sep 17 00:00:00 2001 From: Dilip Kumar Date: Mon, 28 Oct 2019 09:58:18 +0530 Subject: [PATCH 2/2] POC-divide-vacuum-cost-limit --- src/backend/access/heap/vacuumlazy.c | 94 +++++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 2 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index d7e99d7..18ae8bb 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -138,6 +138,7 @@ #define PARALLEL_VACUUM_KEY_DEAD_TUPLES 2 #define PARALLEL_VACUUM_KEY_QUERY_TEXT 3 #define PARALLEL_VACUUM_KEY_COST_DELAY 4 +#define PARALLEL_VACUUM_KEY_COST_BALANCE 5 /* * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION disables the leader's @@ -255,6 +256,14 @@ typedef struct LVCostDelay } LVCostDelay; #define SizeOfLVCostDelay offsetof(LVCostDelay, vaccostdelay) + sizeof(double) +typedef struct LVCostBalance +{ + pg_atomic_uint32 nslot; + int nworkers; + int vaccostbalance[FLEXIBLE_ARRAY_MEMBER]; +} LVCostBalance; +#define SizeOfLVCostBalance offsetof(LVCostBalance, vaccostbalance) + sizeof(int) + /* Struct for parallel lazy vacuum */ typedef struct LVParallelState { @@ -265,6 +274,9 @@ typedef struct LVParallelState /* Shared cost delay. */ LVCostDelay *lvcostdelay; + + /* Shared cost balance. */ + LVCostBalance *lvcostbalance; /* * Always true except for a debugging case where * PARALLEL_VACUUM_DISABLE_LEADER_PARTICIPATION are defined. @@ -1959,6 +1971,31 @@ lazy_check_needs_freeze(Buffer buf, bool *hastup) return false; } +static void +compute_cost_balance(LVParallelState *lps) +{ + int i; + + /* + * Share the estimated worker counts so that each worker can compute their + * cost limit. Include the leader if it is participating in the index + * vacuum phase. + * XXX: Actual worker launched might be lesser than the estimated worker so + * in that case each worker might operate with less vacuum cost limit. + */ + lps->lvcostbalance->nworkers = lps->pcxt->nworkers; + if (lps->leaderparticipates) + lps->lvcostbalance->nworkers += 1; + + /* + * Divide the current cost balance among the worker so that we don't loose + * accounting of the I/O balance so far. + */ + for (i = 0; i < lps->pcxt->nworkers; i++) + lps->lvcostbalance->vaccostbalance[i] = + VacuumCostBalance / lps->lvcostbalance->nworkers; +} + /* * Perform index vacuuming or index cleanup with parallel workers. This function * must be used by the parallel vacuum leader process. The caller must set @@ -1976,6 +2013,9 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, Assert(ParallelVacuumIsActive(lps)); Assert(nindexes > 0); + /* Compute cost balance for the workers. */ + compute_cost_balance(lps); + LaunchParallelWorkers(lps->pcxt); if (lps->lvshared->for_cleanup) @@ -2004,12 +2044,36 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, * no workers launched. */ if (lps->leaderparticipates || lps->pcxt->nworkers_launched == 0) + { + int base_cost_limit = VacuumCostLimit; + + /* + * If leader is participating and we have launched the parallel workers + * then compute the leaders share of the cost limit and cost balance. + */ + if (lps->pcxt->nworkers_launched > 0) + { + VacuumCostLimit /= lps->lvcostbalance->nworkers; + VacuumCostBalance /= lps->lvcostbalance->nworkers; + } vacuum_or_cleanup_indexes_worker(Irel, nindexes, stats, lps->lvshared, vacrelstats->dead_tuples); + VacuumCostLimit = base_cost_limit; + } /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(lps->pcxt); + pg_atomic_write_u32(&(lps->lvcostbalance->nslot), 0); + + /* + * Index vacuuming phase is complete, so collect the remaining balance from + * all the worker and add to the current balance of the leader. So that we + * don't loose the accounting for the extra I/O balance of the workers. + */ + for (i = 0; i < lps->pcxt->nworkers_launched; i++) + VacuumCostBalance += lps->lvcostbalance->vaccostbalance[i]; + /* Collect all the delay from wrokers and add to total delay. */ for (i = 0; i < lps->pcxt->nworkers_launched; i++) { @@ -2984,11 +3048,13 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, LVShared *shared; LVDeadTuples *dead_tuples; LVCostDelay *costdelay; + LVCostBalance *costbalance; long maxtuples; char *sharedquery; Size est_shared; Size est_deadtuples; Size est_costdelay; + Size est_costbalance; int querylen; int i; @@ -3067,6 +3133,15 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_COST_DELAY, costdelay); lps->lvcostdelay = costdelay; + + /* Vacuum cost balance. */ + est_costbalance = MAXALIGN(add_size(SizeOfLVCostBalance, + mul_size(sizeof(int), nrequested))); + costbalance = (LVCostBalance *) shm_toc_allocate(pcxt->toc, est_costbalance); + pg_atomic_init_u32(&(costbalance->nslot), 0); + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_COST_BALANCE, costbalance); + lps->lvcostbalance = costbalance; + return lps; } @@ -3195,7 +3270,8 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) Relation *indrels; LVShared *lvshared; LVDeadTuples *dead_tuples; - LVCostDelay *costdelay; + LVCostDelay *costdelay; + LVCostBalance *costbalance; int nindexes; char *sharedquery; int slot; @@ -3236,7 +3312,10 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) costdelay = (LVCostDelay *) shm_toc_lookup(toc, PARALLEL_VACUUM_KEY_COST_DELAY, false); - slot = pg_atomic_fetch_add_u32(&(costdelay->nslot), 1); + costbalance = (LVCostBalance *) shm_toc_lookup(toc, + PARALLEL_VACUUM_KEY_COST_BALANCE, + false); + slot = pg_atomic_fetch_add_u32(&(costbalance->nslot), 1); /* Set cost-based vacuum delay */ VacuumCostActive = (VacuumCostDelay > 0); @@ -3245,6 +3324,10 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacuumPageMiss = 0; VacuumPageDirty = 0; + /* Compute the vacuum cost limit for the worker. */ + VacuumCostLimit = VacuumCostLimit / costbalance->nworkers; + VacuumCostBalance = costbalance->vaccostbalance[slot]; + stats = (IndexBulkDeleteResult **) palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); @@ -3257,6 +3340,13 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* update the total delay in the shared location. */ costdelay->vaccostdelay[slot] = VacuumCostTotalDelay; + + /* + * Share the remaining balance with the leader so that we don't loose + * accounting for the same. + */ + costbalance->vaccostbalance[slot] = VacuumCostBalance; + vac_close_indexes(nindexes, indrels, RowExclusiveLock); table_close(onerel, ShareUpdateExclusiveLock); } -- 1.8.3.1