From e3019009a359939873a05637ad030485e132d551 Mon Sep 17 00:00:00 2001 From: David Geier Date: Wed, 19 Nov 2025 11:23:08 +0100 Subject: [PATCH v3] Parallel workers stop earlier in presence of LIMIT In the presence of a LIMIT N clause, the executor stops as soon as it got enough rows and shuts down the plan. In the serial case the query ends immediately. If the query happens to be parallel, the workers only exit if they produced N rows, regardless of how many rows got already produced by other participants. Worst-case example: a query has a LIMIT 1 clause and scans a large table where only a single row qualifies. The first worker that finds the matching row will return that row and terminate. All other workers will keep running until the table is scanned to completion. This change informs all workers to stop execution once the leader got enough rows. The information to stop is passed to the workers via shared memory. Using signals, e.g. via the SendProcSignal() API, doesn't work reliably because the signal might get lost in case the worker only starts up after the signal got already sent. --- src/backend/access/heap/heapam.c | 8 ++++++ src/backend/executor/execParallel.c | 42 ++++++++++++++++++++++++++--- src/backend/executor/nodeGather.c | 3 +++ src/backend/executor/nodeSeqscan.c | 2 ++ src/include/access/relscan.h | 6 +++++ src/include/executor/execParallel.h | 2 ++ src/include/nodes/execnodes.h | 1 + 7 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ad9d6338ec2..5e115472e61 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -39,6 +39,7 @@ #include "access/syncscan.h" #include "access/valid.h" #include "access/visibilitymap.h" +#include "executor/execParallel.h" #include "access/xloginsert.h" #include "catalog/pg_database.h" #include "catalog/pg_database_d.h" @@ -935,6 +936,9 @@ heapgettup(HeapScanDesc scan, */ while (true) { + if (scan->rs_base.rs_stop_flag != NULL && !pg_atomic_unlocked_test_flag(scan->rs_base.rs_stop_flag)) + break; + heap_fetch_next_buffer(scan, dir); /* did we run out of blocks to scan? */ @@ -1052,6 +1056,9 @@ heapgettup_pagemode(HeapScanDesc scan, */ while (true) { + if (scan->rs_base.rs_stop_flag != NULL && !pg_atomic_unlocked_test_flag(scan->rs_base.rs_stop_flag)) + break; + heap_fetch_next_buffer(scan, dir); /* did we run out of blocks to scan? */ @@ -1153,6 +1160,7 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_base.rs_flags = flags; scan->rs_base.rs_parallel = parallel_scan; scan->rs_strategy = NULL; /* set in initscan */ + scan->rs_base.rs_stop_flag = NULL; scan->rs_cbuf = InvalidBuffer; /* diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 772e81f3154..2bbb3cc5c89 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -74,10 +74,11 @@ */ typedef struct FixedParallelExecutorState { - int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ - dsa_pointer param_exec; - int eflags; - int jit_flags; + int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */ + dsa_pointer param_exec; + int eflags; + int jit_flags; + pg_atomic_flag stop_flag; } FixedParallelExecutorState; /* @@ -603,6 +604,20 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) return responseq; } +/* + * Propagate the terminate flag pointer to all nodes in the plan tree. + * This ensures that all nodes can check for termination requests from the leader. + */ +static bool +ExecParallelPropagateTerminateFlag(PlanState *planstate, pg_atomic_flag *stop_flag) +{ + if (planstate == NULL) + return false; + + planstate->parallel_stop_flag = stop_flag; + return planstate_tree_walker(planstate, ExecParallelPropagateTerminateFlag, stop_flag); +} + /* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. @@ -768,6 +783,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, fpes->param_exec = InvalidDsaPointer; fpes->eflags = estate->es_top_eflags; fpes->jit_flags = estate->es_jit_flags; + pg_atomic_init_flag(&fpes->stop_flag); shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes); /* Store query string */ @@ -803,6 +819,8 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; + pei->stop_flag = &fpes->stop_flag; + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -888,6 +906,9 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, if (e.nnodes != d.nnodes) elog(ERROR, "inconsistent count of PlanState nodes"); + /* Set up pointer to the shared terminate flag */ + pei->stop_flag = &fpes->stop_flag; + /* OK, we're ready to rock and roll. */ return pei; } @@ -1165,6 +1186,16 @@ ExecParallelRetrieveJitInstrumentation(PlanState *planstate, memcpy(planstate->worker_jit_instrument, shared_jit, ibytes); } +/* + * Signal all parallel workers to terminate execution by setting the shared + * terminate flag. This causes ExecProcNode() to return NULL in all workers. + */ +void +ExecParallelTerminate(ParallelExecutorInfo *pei) +{ + pg_atomic_test_set_flag(pei->stop_flag); +} + /* * Finish parallel execution. We wait for parallel workers to finish, and * accumulate their buffer/WAL usage. @@ -1488,6 +1519,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); + /* Propagate the terminate flag pointer to all nodes in the worker's plan tree */ + ExecParallelPropagateTerminateFlag(queryDesc->planstate, &fpes->stop_flag); + /* Special executor initialization steps for parallel workers */ queryDesc->planstate->state->es_query_dsa = area; if (DsaPointerIsValid(fpes->param_exec)) diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index 4105f1d1968..b7cbb3d9ab8 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -417,6 +417,9 @@ ExecShutdownGatherWorkers(GatherState *node) void ExecShutdownGather(GatherState *node) { + if (node->pei != NULL) + ExecParallelTerminate(node->pei); + ExecShutdownGatherWorkers(node); /* Now destroy the parallel context. */ diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index b8119face43..a5a99a4dd84 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -375,6 +375,7 @@ ExecSeqScanInitializeDSM(SeqScanState *node, shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); + node->ss.ss_currentScanDesc->rs_stop_flag = node->ss.ps.parallel_stop_flag; } /* ---------------------------------------------------------------- @@ -408,4 +409,5 @@ ExecSeqScanInitializeWorker(SeqScanState *node, pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false); node->ss.ss_currentScanDesc = table_beginscan_parallel(node->ss.ss_currentRelation, pscan); + node->ss.ss_currentScanDesc->rs_stop_flag = node->ss.ps.parallel_stop_flag; } diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h index 9b342d5bd80..8b39ff7200c 100644 --- a/src/include/access/relscan.h +++ b/src/include/access/relscan.h @@ -63,6 +63,12 @@ typedef struct TableScanDescData */ uint32 rs_flags; + /* + * Flag used to indicate if the parallel workers participating in the scan + * should bail out and stop scanning. + */ + pg_atomic_flag *rs_stop_flag; + struct ParallelTableScanDescData *rs_parallel; /* parallel scan * information */ } TableScanDescData; diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a2034811d5..4e7858e533c 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -32,6 +32,7 @@ typedef struct ParallelExecutorInfo dsa_area *area; /* points to DSA area in DSM */ dsa_pointer param_exec; /* serialized PARAM_EXEC parameters */ bool finished; /* set true by ExecParallelFinish */ + pg_atomic_flag *stop_flag; /* pointer to shared terminate flag */ /* These two arrays have pcxt->nworkers_launched entries: */ shm_mq_handle **tqueue; /* tuple queues for worker output */ struct TupleQueueReader **reader; /* tuple reader/writer support */ @@ -41,6 +42,7 @@ extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate, EState *estate, Bitmapset *sendParams, int nworkers, int64 tuples_needed); extern void ExecParallelCreateReaders(ParallelExecutorInfo *pei); +extern void ExecParallelTerminate(ParallelExecutorInfo *pei); extern void ExecParallelFinish(ParallelExecutorInfo *pei); extern void ExecParallelCleanup(ParallelExecutorInfo *pei); extern void ExecParallelReinitialize(PlanState *planstate, diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 02265456978..2e59f9809cf 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1248,6 +1248,7 @@ typedef struct PlanState bool outeropsset; bool inneropsset; bool resultopsset; + pg_atomic_flag *parallel_stop_flag; } PlanState; /* ---------------- -- 2.51.0