From a4bce0f6d662e4e42d98d6a9ffe70728e254f64a Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 25 Oct 2019 21:56:24 +0900 Subject: [PATCH v31 6/6] PoC: shared vacuum cost balance --- src/backend/access/heap/vacuumlazy.c | 23 ++++++++- src/backend/commands/vacuum.c | 72 +++++++++++++++++++++++----- src/include/access/heapam.h | 1 + 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 02040c837e..cf0ccee037 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -211,6 +211,13 @@ typedef struct LVShared /* The number of indexes that do NOT support parallel index vacuuming */ int nindexes_nonparallel; + /* + * Shared vacuum cost balance. During parallel index vacuuming + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum workers. + */ + pg_atomic_uint32 cost_balance; + /* * Variables to control parallel index vacuuming. Index statistics * returned from ambulkdelete and amvacuumcleanup is nullable variable @@ -230,6 +237,9 @@ typedef struct LVShared #define IndStatsIsNull(s, i) \ (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07)))) +/* Global variable for shared cost-based vacuum delay */ +pg_atomic_uint32 *VacuumSharedCostBalance = NULL; + /* * Struct for an index bulk-deletion statistic used for parallel lazy * vacuum. This is allocated in the DSM segment. IndexBulkDeleteResult @@ -1960,6 +1970,10 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, Assert(ParallelVacuumIsActive(lps)); Assert(nindexes > 0); + /* Move the current balance to the shared value */ + pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); + VacuumCostBalance = 0; + LaunchParallelWorkers(lps->pcxt); if (lps->lvshared->for_cleanup) @@ -1987,11 +2001,15 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, WaitForParallelWorkersToFinish(lps->pcxt); /* - * We need to reinitialize the parallel context as no more index vacuuming and - * index cleanup will be performed after that. + * We need neither to reinitialize the parallel context nor to reset vacuum cost + * balance after index cleanup as no more index vacuuming and index cleanup will + * be performed after that. */ if (!lps->lvshared->for_cleanup) { + /* Continue to use the shared balance value */ + VacuumCostBalance = pg_atomic_read_u32(&(lps->lvshared->cost_balance)); + /* Reset the processing count */ pg_atomic_write_u32(&(lps->lvshared->nprocessed), 0); @@ -2999,6 +3017,7 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, shared->offset = add_size(SizeOfLVShared, BITMAPLEN(nindexes)); prepare_index_statistics(shared, Irel, nindexes); pg_atomic_init_u32(&(shared->nprocessed), 0); + pg_atomic_init_u32(&(shared->cost_balance), 0); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); lps->lvshared = shared; diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 9ada501709..7ace51e099 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -412,6 +412,7 @@ vacuum(List *relations, VacuumParams *params, VacuumPageHit = 0; VacuumPageMiss = 0; VacuumPageDirty = 0; + VacuumSharedCostBalance = NULL; /* * Loop to process each selected relation. @@ -1990,28 +1991,73 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode) void vacuum_delay_point(void) { + bool require_sleep = false; + /* Always check for interrupts */ CHECK_FOR_INTERRUPTS(); - /* Nap if appropriate */ - if (VacuumCostActive && !InterruptPending && - VacuumCostBalance >= VacuumCostLimit) + if (VacuumCostActive && !InterruptPending) { - double msec; + /* + * If the vacuum cost balance is shared among parallel workers we + * decide whether to sleep based on that. + */ + if (VacuumSharedCostBalance != NULL) + { + while (true) + { + uint32 shared_balance; + uint32 new_balance; - msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit; - if (msec > VacuumCostDelay * 4) - msec = VacuumCostDelay * 4; + require_sleep = false; - pg_usleep((long) (msec * 1000)); + /* compute new balance by adding the local value */ + shared_balance = pg_atomic_read_u32(VacuumSharedCostBalance); + new_balance = shared_balance + VacuumCostBalance; - VacuumCostBalance = 0; + if (new_balance >= VacuumCostLimit) + { + require_sleep = true; + new_balance -= VacuumCostLimit; + } + + if (pg_atomic_compare_exchange_u32(VacuumSharedCostBalance, + &shared_balance, + new_balance)) + break; + } - /* update balance values for workers */ - AutoVacuumUpdateDelay(); + /* + * Reset the local balance as we accumulated it into the shared + * value. + */ + VacuumCostBalance = 0; + } + else if (VacuumCostBalance >= VacuumCostLimit) + { + /* In single process vacuum check only the local balance */ + require_sleep = true; + } + + /* Nap if appropriate */ + if (require_sleep) + { + double msec; - /* Might have gotten an interrupt while sleeping */ - CHECK_FOR_INTERRUPTS(); + msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit; + if (msec > VacuumCostDelay * 4) + msec = VacuumCostDelay * 4; + + pg_usleep((long) (msec * 1000)); + + VacuumCostBalance = 0; + + /* update balance values for workers */ + AutoVacuumUpdateDelay(); + + /* Might have gotten an interrupt while sleeping */ + CHECK_FOR_INTERRUPTS(); + } } } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 12065cc038..ac883f67d1 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -192,6 +192,7 @@ extern void SyncScanShmemInit(void); extern Size SyncScanShmemSize(void); /* in heap/vacuumlazy.c */ +extern pg_atomic_uint32 *VacuumSharedCostBalance; struct VacuumParams; extern void heap_vacuum_rel(Relation onerel, struct VacuumParams *params, BufferAccessStrategy bstrategy); -- 2.22.0