From 104f888141948236e7546f901f6a4fcb16e1ce60 Mon Sep 17 00:00:00 2001 From: sherlockcpp Date: Sat, 17 Dec 2022 20:43:21 +0800 Subject: [PATCH v69 3/5] Stop extra worker if GUC was changed If the max_parallel_apply_workers_per_subscription is changed to a lower value, try to stop free workers in the pool to keep the number of workers lower than half of the max_parallel_apply_workers_per_subscription --- .../replication/logical/applyparallelworker.c | 72 ++++++++++++++++--- src/backend/replication/logical/worker.c | 7 ++ src/include/replication/worker_internal.h | 1 + 3 files changed, 69 insertions(+), 11 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1fe3cb81f2..9848832287 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -541,6 +541,25 @@ pa_find_worker(TransactionId xid) return NULL; } +/* + * Stop the given parallel apply worker and free the corresponding info. + */ +static void +pa_stop_worker(ParallelApplyWorkerInfo *winfo) +{ + int slot_no; + uint16 generation; + + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + + logicalrep_pa_worker_stop(slot_no, generation); + + pa_free_worker_info(winfo); +} + /* * Makes the worker available for reuse. * @@ -576,23 +595,54 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); + pa_stop_worker(winfo); + return; + } - logicalrep_pa_worker_stop(slot_no, generation); + winfo->in_use = false; + winfo->serialize_changes = false; +} - pa_free_worker_info(winfo); +/* + * Try to stop parallel apply workers that are not in use to keep the number of + * workers lower than half of the max_parallel_apply_workers_per_subscription. + */ +void +pa_stop_idle_workers(void) +{ + List *active_workers; + ListCell *lc; + int max_applyworkers = max_parallel_apply_workers_per_subscription / 2; + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) return; + + active_workers = list_copy(ParallelApplyWorkerPool); + + foreach(lc, active_workers) + { + ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + /* + * Try to free and stop the worker that is in use. Normally, we free + * the worker after ensuring that the transaction is committed by the + * parallel worker but for rollbacks, we don't wait for the + * transaction to finish so can't free the worker information + * immediately. + */ + if (winfo->in_use) + pa_free_worker(winfo); + + /* Directly stop the worker if not in use. */ + else + pa_stop_worker(winfo); + + /* Recheck the number of workers. */ + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) + break; } - winfo->in_use = false; - winfo->serialize_changes = false; + list_free(active_workers); } /* diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 1429f401c3..ff97b1e9de 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3605,6 +3605,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * Try to stop free workers in the pool in case the + * max_parallel_apply_workers_per_subscription is changed to a + * lower value. + */ + pa_stop_idle_workers(); } if (rc & WL_TIMEOUT) diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 74b9d4073b..60e21cd1e7 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -273,6 +273,7 @@ extern void set_apply_error_context_origin(char *originname); /* Parallel apply worker setup and interactions */ extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); +extern void pa_stop_idle_workers(void); extern void pa_detach_all_error_mq(void); extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, -- 2.23.0.windows.1