From 58c6baa4b72961399b6e046163a49db05fcfb372 Mon Sep 17 00:00:00 2001 From: sherlockcpp Date: Sat, 17 Dec 2022 20:43:21 +0800 Subject: [PATCH v67 4/6] Stop extra worker if GUC was changed If the max_parallel_apply_workers_per_subscription is changed to a lower value, try to stop free workers in the pool to keep the number of workers lower than half of the max_parallel_apply_workers_per_subscription --- .../replication/logical/applyparallelworker.c | 74 ++++++++++++++++--- src/backend/replication/logical/worker.c | 7 ++ src/include/replication/worker_internal.h | 1 + 3 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 25b8f2bcf1..cb2695ec74 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -548,6 +548,25 @@ pa_find_worker(TransactionId xid) return NULL; } +/* + * Stop the given parallel apply worker and free the corresponding info. + */ +static void +pa_stop_worker(ParallelApplyWorkerInfo *winfo) +{ + int slot_no; + uint16 generation; + + SpinLockAcquire(&winfo->shared->mutex); + generation = winfo->shared->logicalrep_worker_generation; + slot_no = winfo->shared->logicalrep_worker_slot_no; + SpinLockRelease(&winfo->shared->mutex); + + logicalrep_pa_worker_stop(slot_no, generation); + + pa_free_worker_info(winfo); +} + /* * Makes the worker available for reuse. * @@ -595,18 +614,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); - - logicalrep_pa_worker_stop(slot_no, generation); - - pa_free_worker_info(winfo); - + pa_stop_worker(winfo); return true; } @@ -616,6 +624,48 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) return false; } +/* + * Try to stop parallel apply workers that are not in use to keep the number of + * workers lower than half of the max_parallel_apply_workers_per_subscription. + */ +void +pa_stop_idle_workers(void) +{ + List *active_workers; + ListCell *lc; + int max_applyworkers = max_parallel_apply_workers_per_subscription / 2; + + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) + return; + + active_workers = list_copy(ParallelApplyWorkerPool); + + foreach(lc, active_workers) + { + ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); + + /* + * Try to free and stop the worker that is in use. Normally, we free + * the worker after ensuring that the transaction is committed by the + * parallel worker but for rollbacks, we don't wait for the + * transaction to finish so can't free the worker information + * immediately. + */ + if (winfo->in_use) + pa_free_worker(winfo); + + /* Directly stop the worker if not in use. */ + else + pa_stop_worker(winfo); + + /* Recheck the number of workers. */ + if (list_length(ParallelApplyWorkerPool) <= max_applyworkers) + break; + } + + list_free(active_workers); +} + /* * Free the parallel apply worker information and unlink the files with * serialized changes if any. diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 4320bbe811..ef32fa0d70 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -3614,6 +3614,13 @@ LogicalRepApplyLoop(XLogRecPtr last_received) { ConfigReloadPending = false; ProcessConfigFile(PGC_SIGHUP); + + /* + * Try to stop free workers in the pool in case the + * max_parallel_apply_workers_per_subscription is changed to a + * lower value. + */ + pa_stop_idle_workers(); } if (rc & WL_TIMEOUT) diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 9014152e6c..b8a9d52c8e 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -274,6 +274,7 @@ extern void set_apply_error_context_origin(char *originname); extern void pa_allocate_worker(TransactionId xid); extern ParallelApplyWorkerInfo *pa_find_worker(TransactionId xid); extern bool pa_free_worker(ParallelApplyWorkerInfo *winfo); +extern void pa_stop_idle_workers(void); extern void pa_detach_all_error_mq(void); extern bool pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, -- 2.23.0.windows.1