From 279ebab5dc0ad2ce0569bb84a80995257a746b80 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada Date: Fri, 25 Oct 2019 21:56:24 +0900 Subject: [PATCH v32 4/4] PoC: shared vacuum cost balance --- src/backend/access/heap/vacuumlazy.c | 26 +++++++++++ src/backend/commands/vacuum.c | 67 ++++++++++++++++++++++------ src/include/access/heapam.h | 1 + 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c index 0cfa13b81b..a9d9f31887 100644 --- a/src/backend/access/heap/vacuumlazy.c +++ b/src/backend/access/heap/vacuumlazy.c @@ -211,6 +211,13 @@ typedef struct LVShared /* The number of indexes that do NOT support parallel index vacuuming */ int nindexes_nonparallel; + /* + * Shared vacuum cost balance. During parallel index vacuuming + * VacuumSharedCostBalance points to this value and it accumulates the + * balance of each parallel vacuum workers. + */ + pg_atomic_uint32 cost_balance; + /* * Variables to control parallel index vacuuming. Index statistics * returned from ambulkdelete and amvacuumcleanup is nullable variable @@ -230,6 +237,9 @@ typedef struct LVShared #define IndStatsIsNull(s, i) \ (!(((LVShared *)(s))->bitmap[(i) >> 3] & (1 << ((i) & 0x07)))) +/* Global variable for shared cost-based vacuum delay */ +pg_atomic_uint32 *VacuumSharedCostBalance = NULL; + /* * Struct for an index bulk-deletion statistic used for parallel lazy * vacuum. This is allocated in the DSM segment. IndexBulkDeleteResult @@ -1961,6 +1971,14 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, Assert(ParallelVacuumIsActive(lps)); Assert(nindexes > 0); + /* + * Move the current balance to the shared value and enable shared cost + * balance. + */ + pg_atomic_write_u32(&(lps->lvshared->cost_balance), VacuumCostBalance); + VacuumCostBalance = 0; + VacuumSharedCostBalance = &(lps->lvshared->cost_balance); + LaunchParallelWorkers(lps->pcxt); if (lps->lvshared->for_cleanup) @@ -1987,6 +2005,12 @@ lazy_parallel_vacuum_or_cleanup_indexes(LVRelStats *vacrelstats, Relation *Irel, /* Wait for all vacuum workers to finish */ WaitForParallelWorkersToFinish(lps->pcxt); + /* Disable shared cost balance for vacuum delay */ + VacuumSharedCostBalance = NULL; + + /* Continue to use the shared balance value */ + VacuumCostBalance = pg_atomic_read_u32(&(lps->lvshared->cost_balance)); + /* * We need to reinitialize the parallel context as no more index vacuuming and * index cleanup will be performed after that. @@ -3000,6 +3024,7 @@ begin_parallel_vacuum(LVRelStats *vacrelstats, Oid relid, BlockNumber nblocks, shared->offset = add_size(SizeOfLVShared, BITMAPLEN(nindexes)); prepare_index_statistics(shared, Irel, nindexes); pg_atomic_init_u32(&(shared->nprocessed), 0); + pg_atomic_init_u32(&(shared->cost_balance), 0); shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); lps->lvshared = shared; @@ -3188,6 +3213,7 @@ heap_parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacuumPageHit = 0; VacuumPageMiss = 0; VacuumPageDirty = 0; + VacuumSharedCostBalance = &(lvshared->cost_balance); stats = (IndexBulkDeleteResult **) palloc0(nindexes * sizeof(IndexBulkDeleteResult *)); diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index 9ada501709..1b9ea9b672 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -412,6 +412,7 @@ vacuum(List *relations, VacuumParams *params, VacuumPageHit = 0; VacuumPageMiss = 0; VacuumPageDirty = 0; + VacuumSharedCostBalance = NULL; /* * Loop to process each selected relation. @@ -1990,28 +1991,68 @@ vac_close_indexes(int nindexes, Relation *Irel, LOCKMODE lockmode) void vacuum_delay_point(void) { + double msec = 0; + /* Always check for interrupts */ CHECK_FOR_INTERRUPTS(); - /* Nap if appropriate */ - if (VacuumCostActive && !InterruptPending && - VacuumCostBalance >= VacuumCostLimit) + if (VacuumCostActive && !InterruptPending) { - double msec; + /* + * If the vacuum cost balance is shared among parallel workers we + * decide whether to sleep based on that. + */ + if (VacuumSharedCostBalance != NULL) + { + while (true) + { + uint32 shared_balance; + uint32 new_balance; - msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit; - if (msec > VacuumCostDelay * 4) - msec = VacuumCostDelay * 4; + msec = 0; - pg_usleep((long) (msec * 1000)); + /* compute new balance by adding the local value */ + shared_balance = pg_atomic_read_u32(VacuumSharedCostBalance); + new_balance = shared_balance + VacuumCostBalance; - VacuumCostBalance = 0; + if (new_balance >= VacuumCostLimit) + { + /* compute sleep time based on the shared cost balance */ + msec = VacuumCostDelay * new_balance / VacuumCostLimit; + new_balance %= VacuumCostLimit; + } - /* update balance values for workers */ - AutoVacuumUpdateDelay(); + if (pg_atomic_compare_exchange_u32(VacuumSharedCostBalance, + &shared_balance, + new_balance)) + break; + } - /* Might have gotten an interrupt while sleeping */ - CHECK_FOR_INTERRUPTS(); + /* + * Reset the local balance as we accumulated it into the shared + * value. + */ + VacuumCostBalance = 0; + } + else if (VacuumCostBalance >= VacuumCostLimit) + msec = VacuumCostDelay * VacuumCostBalance / VacuumCostLimit; + + /* Nap if appropriate */ + if (msec > 0) + { + if (msec > VacuumCostDelay * 4) + msec = VacuumCostDelay * 4; + + pg_usleep((long) (msec * 1000)); + + VacuumCostBalance = 0; + + /* update balance values for workers */ + AutoVacuumUpdateDelay(); + + /* Might have gotten an interrupt while sleeping */ + CHECK_FOR_INTERRUPTS(); + } } } diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index 12065cc038..ac883f67d1 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -192,6 +192,7 @@ extern void SyncScanShmemInit(void); extern Size SyncScanShmemSize(void); /* in heap/vacuumlazy.c */ +extern pg_atomic_uint32 *VacuumSharedCostBalance; struct VacuumParams; extern void heap_vacuum_rel(Relation onerel, struct VacuumParams *params, BufferAccessStrategy bstrategy); -- 2.22.0