From a28775fc5900cd740556a864e1826ceda268b794 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 23 Aug 2023 09:25:35 +0200 Subject: [PATCH 1/3] Don't use LogicalRepWorker until ->in_use is verified --- src/backend/replication/logical/launcher.c | 4 ++-- src/include/replication/worker_internal.h | 8 ++++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 72e44d5a02..ec5156aa41 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -862,7 +862,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && isTablesyncWorker(w)) + if (isTablesyncWorker(w) && w->subid == subid) res++; } @@ -889,7 +889,7 @@ logicalrep_pa_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && isParallelApplyWorker(w)) + if (isParallelApplyWorker(w) && w->subid == subid) res++; } diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index a428663859..8f4bed0958 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -327,8 +327,10 @@ extern void pa_decr_and_wait_stream_block(void); extern void pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn); -#define isParallelApplyWorker(worker) ((worker)->type == WORKERTYPE_PARALLEL_APPLY) -#define isTablesyncWorker(worker) ((worker)->type == WORKERTYPE_TABLESYNC) +#define isParallelApplyWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_PARALLEL_APPLY) +#define isTablesyncWorker(worker) ((worker)->in_use && \ + (worker)->type == WORKERTYPE_TABLESYNC) static inline bool am_tablesync_worker(void) @@ -339,12 +341,14 @@ am_tablesync_worker(void) static inline bool am_leader_apply_worker(void) { + Assert(MyLogicalRepWorker->in_use); return (MyLogicalRepWorker->type == WORKERTYPE_APPLY); } static inline bool am_parallel_apply_worker(void) { + Assert(MyLogicalRepWorker->in_use); return isParallelApplyWorker(MyLogicalRepWorker); } -- 2.30.2