From 14abdef918a73e465900f758204de19982fc4224 Mon Sep 17 00:00:00 2001 From: Daniil Davidov Date: Wed, 7 Jan 2026 16:03:20 +0700 Subject: [PATCH v18 5/5] Cost-based parameters propagation for parallel autovacuum --- src/backend/commands/vacuum.c | 26 +++- src/backend/commands/vacuumparallel.c | 130 ++++++++++++++++++ src/include/commands/vacuum.h | 2 + src/test/modules/test_autovacuum/Makefile | 2 + .../modules/test_autovacuum/t/001_basic.pl | 83 +++++++++++ .../test_autovacuum/test_autovacuum--1.0.sql | 12 ++ .../modules/test_autovacuum/test_autovacuum.c | 75 ++++++++++ 7 files changed, 328 insertions(+), 2 deletions(-) diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c index aa4fbec143f..4c40a36523a 100644 --- a/src/backend/commands/vacuum.c +++ b/src/backend/commands/vacuum.c @@ -2430,8 +2430,24 @@ vacuum_delay_point(bool is_analyze) /* Always check for interrupts */ CHECK_FOR_INTERRUPTS(); - if (InterruptPending || - (!VacuumCostActive && !ConfigReloadPending)) + if (InterruptPending) + return; + + if (!AmAutoVacuumWorkerProcess()) + { + /* + * If we are autovacuum parallel worker, check whether cost-based + * parameters had changed in leader worker. + * If so, vacuum_cost_delay and vacuum_cost_limit will be set to the + * values which leader worker is operating on. + * + * Do it before checking VacuumCostActive, because its value might be + * changed after leader's parameters consumption. + */ + parallel_vacuum_fix_cost_based_params(); + } + + if (!VacuumCostActive && !ConfigReloadPending) return; /* @@ -2445,6 +2461,12 @@ vacuum_delay_point(bool is_analyze) ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); VacuumUpdateCosts(); + + /* + * If we are parallel autovacuum leader and some of cost-based + * parameters had changed, let other parallel workers know. + */ + parallel_vacuum_propagate_cost_based_params(); } /* diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c index c2f0a37eef2..06ecffeec42 100644 --- a/src/backend/commands/vacuumparallel.c +++ b/src/backend/commands/vacuumparallel.c @@ -54,6 +54,22 @@ #define PARALLEL_VACUUM_KEY_WAL_USAGE 4 #define PARALLEL_VACUUM_KEY_INDEX_STATS 5 +/* + * Only autovacuum leader can reload config file. We use this structure in + * parallel autovacuum for keeping worker's parameters in sync with leader's + * parameters. + */ +typedef struct PVSharedCostParams +{ + slock_t spinlock; /* protects all fields below */ + + /* Copies of corresponding parameters from autovacuum leader process */ + double cost_delay; + int cost_limit; +} PVSharedCostParams; + +static PVSharedCostParams *pv_shared_cost_params = NULL; + /* * Shared information among parallel workers. So this is allocated in the DSM * segment. @@ -123,6 +139,18 @@ typedef struct PVShared /* Statistics of shared dead items */ VacDeadItemsInfo dead_items_info; + + /* + * If 'true' then we are running parallel autovacuum. Otherwise, we are + * running parallel maintenence VACUUM. + */ + bool am_parallel_autovacuum; + + /* + * Struct for syncing parameters between supportive parallel autovacuum + * workers with leader worker. + */ + PVSharedCostParams cost_params; } PVShared; /* Status used during parallel index vacuum or cleanup */ @@ -396,6 +424,17 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes, pg_atomic_init_u32(&(shared->active_nworkers), 0); pg_atomic_init_u32(&(shared->idx), 0); + shared->am_parallel_autovacuum = AmAutoVacuumWorkerProcess(); + + if (shared->am_parallel_autovacuum) + { + shared->cost_params.cost_delay = vacuum_cost_delay; + shared->cost_params.cost_limit = vacuum_cost_limit; + SpinLockInit(&shared->cost_params.spinlock); + + pv_shared_cost_params = &(shared->cost_params); + } + shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared); pvs->shared = shared; @@ -538,6 +577,53 @@ parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, long num_table_tup parallel_vacuum_process_all_indexes(pvs, num_index_scans, false, wusage); } +/* + * Function to be called from parallel autovacuum worker in order to sync + * some cost-based delay parameter with the leader worker. + */ +bool +parallel_vacuum_fix_cost_based_params(void) +{ + /* Check whether we are running parallel autovacuum */ + if (pv_shared_cost_params == NULL) + return false; + + Assert(IsParallelWorker() && !AmAutoVacuumWorkerProcess()); + + SpinLockAcquire(&pv_shared_cost_params->spinlock); + + vacuum_cost_delay = pv_shared_cost_params->cost_delay; + vacuum_cost_limit = pv_shared_cost_params->cost_limit; + + SpinLockRelease(&pv_shared_cost_params->spinlock); + + if (vacuum_cost_delay > 0 && !VacuumFailsafeActive) + VacuumCostActive = true; + + return true; +} + +/* + * Function to be called from parallel autovacuum leader in order to propagate + * some cost-based parameters to the supportive workers. + */ +void +parallel_vacuum_propagate_cost_based_params(void) +{ + /* Check whether we are running parallel autovacuum */ + if (pv_shared_cost_params == NULL) + return; + + Assert(AmAutoVacuumWorkerProcess()); + + SpinLockAcquire(&pv_shared_cost_params->spinlock); + + pv_shared_cost_params->cost_delay = vacuum_cost_delay; + pv_shared_cost_params->cost_limit = vacuum_cost_limit; + + SpinLockRelease(&pv_shared_cost_params->spinlock); +} + /* * Compute the number of parallel worker processes to request. Both index * vacuum and index cleanup can be executed with parallel workers. @@ -763,12 +849,26 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan /* Vacuum the indexes that can be processed by only leader process */ parallel_vacuum_process_unsafe_indexes(pvs); + /* + * To be able to exercise whether leader parallel autovacuum worker can + * propagate cost-based params to parallel workers, wait here until + * configuration is changed... + */ + INJECTION_POINT("av-leader-before-reload-conf", NULL); + /* * Join as a parallel worker. The leader vacuums alone processes all * parallel-safe indexes in the case where no workers are launched. */ parallel_vacuum_process_safe_indexes(pvs); + /* + * ...and then wait until leader guaranteed to propagate new parameters + * values to the workers. I.e. tests are expecting, that during processing + * of parallel safe index we have called vacuum_delay_point, + */ + INJECTION_POINT("av-leader-after-reload-conf", NULL); + /* * Next, accumulate buffer and WAL usage. (This must wait for the workers * to finish, or we might get incomplete data.) @@ -1104,6 +1204,9 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) VacuumSharedCostBalance = &(shared->cost_balance); VacuumActiveNWorkers = &(shared->active_nworkers); + if (shared->am_parallel_autovacuum) + pv_shared_cost_params = &(shared->cost_params); + /* Set parallel vacuum state */ pvs.indrels = indrels; pvs.nindexes = nindexes; @@ -1131,6 +1234,33 @@ parallel_vacuum_main(dsm_segment *seg, shm_toc *toc) /* Prepare to track buffer usage during parallel execution */ InstrStartParallelQuery(); +#ifdef USE_INJECTION_POINTS + if (shared->am_parallel_autovacuum) + { + Assert(VacuumActiveNWorkers != NULL); + + /* + * To be able to exercise whether leader parallel autovacuum worker can + * propagate cost-based params to parallel workers, wait here until + * configuration is changed and leader workers had updated shared state. + */ + INJECTION_POINT("av-worker-before-reload-conf", NULL); + + /* Simulate config reload during normal processing */ + pg_atomic_add_fetch_u32(VacuumActiveNWorkers, 1); + vacuum_delay_point(false); + pg_atomic_sub_fetch_u32(VacuumActiveNWorkers, 1); + + /* + * Wait until worker guaranteed to consume new parameters values from + * the leader and save new value in injection point state. + */ + INJECTION_POINT("autovacuum-set-cost-based-parameter", + &vacuum_cost_delay); + INJECTION_POINT("av-worker-after-reload-conf", NULL); + } +#endif + /* Process indexes to perform vacuum/cleanup */ parallel_vacuum_process_safe_indexes(&pvs); diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h index ec5d70aacdc..73125439bed 100644 --- a/src/include/commands/vacuum.h +++ b/src/include/commands/vacuum.h @@ -411,6 +411,8 @@ extern void parallel_vacuum_cleanup_all_indexes(ParallelVacuumState *pvs, int num_index_scans, bool estimated_count, PVWorkersUsage *wusage); +extern bool parallel_vacuum_fix_cost_based_params(void); +extern void parallel_vacuum_propagate_cost_based_params(void); extern void parallel_vacuum_main(dsm_segment *seg, shm_toc *toc); /* in commands/analyze.c */ diff --git a/src/test/modules/test_autovacuum/Makefile b/src/test/modules/test_autovacuum/Makefile index 4cf7344b2ac..32254c53a5d 100644 --- a/src/test/modules/test_autovacuum/Makefile +++ b/src/test/modules/test_autovacuum/Makefile @@ -12,6 +12,8 @@ DATA = test_autovacuum--1.0.sql TAP_TESTS = 1 +EXTRA_INSTALL = src/test/modules/injection_points + export enable_injection_points ifdef USE_PGXS diff --git a/src/test/modules/test_autovacuum/t/001_basic.pl b/src/test/modules/test_autovacuum/t/001_basic.pl index 8bf153d132c..eec0f41b6a6 100644 --- a/src/test/modules/test_autovacuum/t/001_basic.pl +++ b/src/test/modules/test_autovacuum/t/001_basic.pl @@ -28,6 +28,11 @@ $node->append_conf('postgresql.conf', qq{ }); $node->start; +if (!$node->check_extension('injection_points')) +{ + plan skip_all => 'Extension injection_points not installed'; +} + my $indexes_num = 4; my $initial_rows_num = 10_000; my $autovacuum_parallel_workers = 2; @@ -73,6 +78,9 @@ $node->safe_psql('postgres', qq{ CREATE EXTENSION test_autovacuum; SELECT inj_set_free_workers_attach(); SELECT inj_leader_failure_attach(); + SELECT inj_check_av_param_attach(); + + CREATE EXTENSION injection_points; }); # Test 1 : @@ -166,5 +174,80 @@ $node->safe_psql('postgres', qq{ SELECT inj_leader_failure_detach(); }); +# Test 4: +# Check whether parallel autovacuum leader can propagate cost-based parameters +# to parallel workers. + +# Disable autovacuum on table during preparation for the next test +$node->safe_psql('postgres', qq{ + ALTER TABLE test_autovac SET (autovacuum_enabled = false); +}); + +# Create more dead tuples +$node->safe_psql('postgres', qq{ + UPDATE test_autovac SET col_3 = 0 WHERE (col_4 % 3) = 0; + ANALYZE test_autovac; +}); + +$node->safe_psql('postgres', qq{ + SELECT injection_points_attach('av-leader-before-reload-conf', 'wait'); + SELECT injection_points_attach('av-leader-after-reload-conf', 'wait'); + SELECT injection_points_attach('av-worker-before-reload-conf', 'wait'); + SELECT injection_points_attach('av-worker-after-reload-conf', 'wait'); +}); + +$node->safe_psql('postgres', qq{ + ALTER TABLE test_autovac SET (autovacuum_enabled = true); +}); + +# Wait until leader parallel worker get to the point before vacuum_delay_point +# and change cost-based config parameter. + +$node->wait_for_event('autovacuum worker', 'av-leader-before-reload-conf'); +$node->psql('postgres', qq{ + ALTER SYSTEM SET autovacuum_vacuum_cost_delay = 10; + SELECT pg_reload_conf(); +}); +$node->psql('postgres', qq{ + SELECT injection_points_wakeup('av-leader-before-reload-conf'); +}); + +# Wait until leader worker propagates new patameter's value to the other +# workers and let them to call vacuum_delay_point + +$node->wait_for_event('autovacuum worker', 'av-leader-after-reload-conf'); +$node->safe_psql('postgres', qq{ + SELECT injection_points_wakeup('av-leader-after-reload-conf'); + SELECT injection_points_wakeup('av-worker-before-reload-conf'); +}); + +# Check whether parallel worker has consume new parameter's value from the +# leader. +# Aactually, it can happen before worker gets to the injection point, but we +# want to make everything as deterministic as possible. + +$node->wait_for_event('parallel worker', 'av-worker-after-reload-conf'); +$node->psql('postgres', + "SELECT get_parallel_autovacuum_worker_param_value('vacuum_cost_delay');", + stdout => \$psql_out, +); +is($psql_out, 10.0, 'Leader successfully propagated parameter value'); + +$node->safe_psql('postgres', qq{ + SELECT injection_points_wakeup('av-worker-after-reload-conf'); +}); + +# Cleanup +$node->safe_psql('postgres', qq{ + SELECT injection_points_detach('av-leader-before-reload-conf'); + SELECT injection_points_detach('av-leader-after-reload-conf'); + SELECT injection_points_detach('av-worker-before-reload-conf'); + SELECT injection_points_detach('av-worker-after-reload-conf'); + SELECT inj_check_av_param_detach(); + + DROP EXTENSION test_autovacuum; + DROP EXTENSION injection_points; +}); + $node->stop; done_testing(); diff --git a/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql b/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql index 017d5da85ea..cb0407952d7 100644 --- a/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql +++ b/src/test/modules/test_autovacuum/test_autovacuum--1.0.sql @@ -14,6 +14,10 @@ CREATE FUNCTION trigger_leader_failure(failure_type text) RETURNS VOID STRICT AS 'MODULE_PATHNAME' LANGUAGE C; +CREATE FUNCTION get_parallel_autovacuum_worker_param_value(param_name text) +RETURNS FLOAT8 STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + /* * Injection point related functions */ @@ -32,3 +36,11 @@ AS 'MODULE_PATHNAME' LANGUAGE C; CREATE FUNCTION inj_leader_failure_detach() RETURNS VOID STRICT AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_check_av_param_attach() +RETURNS VOID STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; + +CREATE FUNCTION inj_check_av_param_detach() +RETURNS VOID STRICT +AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/src/test/modules/test_autovacuum/test_autovacuum.c b/src/test/modules/test_autovacuum/test_autovacuum.c index 7948f4858ae..e96cfda7ae9 100644 --- a/src/test/modules/test_autovacuum/test_autovacuum.c +++ b/src/test/modules/test_autovacuum/test_autovacuum.c @@ -38,6 +38,9 @@ typedef struct InjPointState bool enabled_leader_failure; AVLeaderFaulureType ftype; + + bool enabled_check_av_param; + double vacuum_cost_delay; } InjPointState; static InjPointState * inj_point_state; @@ -92,6 +95,12 @@ test_autovacuum_shmem_startup(void) "inj_trigger_leader_failure", NULL, 0); + + InjectionPointAttach("autovacuum-set-cost-based-parameter", + "test_autovacuum", + "inj_set_av_parameter", + NULL, + 0); } LWLockRelease(AddinShmemInitLock); @@ -109,6 +118,9 @@ _PG_init(void) shmem_startup_hook = test_autovacuum_shmem_startup; } +extern PGDLLEXPORT void inj_set_av_parameter(const char *name, + const void *private_data, + void *arg); extern PGDLLEXPORT void inj_set_free_workers(const char *name, const void *private_data, void *arg); @@ -205,6 +217,45 @@ trigger_leader_failure(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } +/* + * Set current setting of "vacuum_cost_delay" parameter. + * + * Function is called from parallel autovacuum worker. + */ +void +inj_set_av_parameter(const char *name, const void *private_data, void *arg) +{ + ereport(LOG, + errmsg("set autovacuum parameter injection point called"), + errhidestmt(true), errhidecontext(true)); + + if (inj_point_state->enabled_check_av_param) + { + Assert(arg != NULL); + inj_point_state->vacuum_cost_delay = *(double *) arg; + } +} + +PG_FUNCTION_INFO_V1(get_parallel_autovacuum_worker_param_value); +Datum +get_parallel_autovacuum_worker_param_value(PG_FUNCTION_ARGS) +{ + const char *param_name = text_to_cstring(PG_GETARG_TEXT_PP(0)); + double value = 0.0; + +#ifndef USE_INJECTION_POINTS + elog(ERROR, "injection points not supported"); +#endif + + if (strcmp(param_name, "vacuum_cost_delay") == 0) + value = inj_point_state->vacuum_cost_delay; + else + elog(ERROR, + "cannot retrieve parameter %s from injection point", param_name); + + PG_RETURN_FLOAT8((float8) value); +} + PG_FUNCTION_INFO_V1(inj_set_free_workers_attach); Datum inj_set_free_workers_attach(PG_FUNCTION_ARGS) @@ -253,3 +304,27 @@ inj_leader_failure_detach(PG_FUNCTION_ARGS) #endif PG_RETURN_VOID(); } + +PG_FUNCTION_INFO_V1(inj_check_av_param_attach); +Datum +inj_check_av_param_attach(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_point_state->enabled_check_av_param = true; +#else + elog(ERROR, "injection points not supported"); +#endif + PG_RETURN_VOID(); +} + +PG_FUNCTION_INFO_V1(inj_check_av_param_detach); +Datum +inj_check_av_param_detach(PG_FUNCTION_ARGS) +{ +#ifdef USE_INJECTION_POINTS + inj_point_state->enabled_check_av_param = false; +#else + elog(ERROR, "injection points not supported"); +#endif + PG_RETURN_VOID(); +} -- 2.43.0