From ba785f2abca7c9f5199f0fc27c3b71f6d1d8010b Mon Sep 17 00:00:00 2001 From: jcoleman Date: Fri, 21 Jan 2022 22:38:46 -0500 Subject: [PATCH v4 2/4] Parallelize correlated subqueries When params are provided at the current query level (i.e., are generated within a single worker and not shared across workers) we can safely execute these in parallel. Alternative approach using just relids subset check --- doc/src/sgml/parallel.sgml | 3 +- src/backend/optimizer/path/allpaths.c | 18 ++- src/backend/optimizer/path/joinpath.c | 16 ++- src/backend/optimizer/util/clauses.c | 3 + src/backend/optimizer/util/pathnode.c | 2 + src/backend/utils/misc/guc.c | 1 + src/include/nodes/pathnodes.h | 2 +- .../regress/expected/incremental_sort.out | 28 ++-- src/test/regress/expected/select_parallel.out | 125 ++++++++++++++++++ src/test/regress/sql/select_parallel.sql | 25 ++++ 10 files changed, 197 insertions(+), 26 deletions(-) diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index 13479d7e5e..2d924dd2ac 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -517,7 +517,8 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; - Plan nodes that reference a correlated SubPlan. + Plan nodes that reference a correlated SubPlan where + the result is shared between workers. diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 6a581e20fa..776f002054 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -556,7 +556,8 @@ set_rel_pathlist(PlannerInfo *root, RelOptInfo *rel, * (see grouping_planner). */ if (rel->reloptkind == RELOPT_BASEREL && - bms_membership(root->all_baserels) != BMS_SINGLETON) + bms_membership(root->all_baserels) != BMS_SINGLETON + && (rel->subplan_params == NIL || rte->rtekind != RTE_SUBQUERY)) generate_useful_gather_paths(root, rel, false); /* Now find the cheapest of the paths for this rel */ @@ -2700,7 +2701,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) cheapest_partial_path->rows * cheapest_partial_path->parallel_workers; simple_gather_path = (Path *) create_gather_path(root, rel, cheapest_partial_path, rel->reltarget, - NULL, rowsp); + rel->lateral_relids, rowsp); add_path(rel, simple_gather_path); /* @@ -2717,7 +2718,7 @@ generate_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_rows) rows = subpath->rows * subpath->parallel_workers; path = create_gather_merge_path(root, rel, subpath, rel->reltarget, - subpath->pathkeys, NULL, rowsp); + subpath->pathkeys, rel->lateral_relids, rowsp); add_path(rel, &path->path); } } @@ -2819,11 +2820,15 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r double *rowsp = NULL; List *useful_pathkeys_list = NIL; Path *cheapest_partial_path = NULL; + Relids required_outer = rel->lateral_relids; /* If there are no partial paths, there's nothing to do here. */ if (rel->partial_pathlist == NIL) return; + if (!bms_is_subset(required_outer, rel->relids)) + return; + /* Should we override the rel's rowcount estimate? */ if (override_rows) rowsp = &rows; @@ -2895,7 +2900,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -2929,7 +2934,7 @@ generate_useful_gather_paths(PlannerInfo *root, RelOptInfo *rel, bool override_r tmp, rel->reltarget, tmp->pathkeys, - NULL, + required_outer, rowsp); add_path(rel, &path->path); @@ -3108,7 +3113,8 @@ standard_join_search(PlannerInfo *root, int levels_needed, List *initial_rels) /* * Except for the topmost scan/join rel, consider gathering * partial paths. We'll do the same for the topmost scan/join rel - * once we know the final targetlist (see grouping_planner). + * once we know the final targetlist (see + * apply_scanjoin_target_to_paths). */ if (lev < levels_needed) generate_useful_gather_paths(root, rel, false); diff --git a/src/backend/optimizer/path/joinpath.c b/src/backend/optimizer/path/joinpath.c index f96fc9fd28..e85b5449ea 100644 --- a/src/backend/optimizer/path/joinpath.c +++ b/src/backend/optimizer/path/joinpath.c @@ -1791,16 +1791,24 @@ match_unsorted_outer(PlannerInfo *root, * partial path and the joinrel is parallel-safe. However, we can't * handle JOIN_UNIQUE_OUTER, because the outer path will be partial, and * therefore we won't be able to properly guarantee uniqueness. Nor can - * we handle joins needing lateral rels, since partial paths must not be - * parameterized. Similarly, we can't handle JOIN_FULL and JOIN_RIGHT, - * because they can produce false null extended rows. + * we handle JOIN_FULL and JOIN_RIGHT, because they can produce false null + * extended rows. + * + * Partial paths may only have parameters in limited cases + * where the parameterization is fully satisfied without sharing state + * between workers, so we only allow lateral rels on inputs to the join + * if the resulting join contains no lateral rels, the inner rel's laterals + * are fully satisfied by the outer rel, and the outer rel doesn't depend + * on the inner rel to produce any laterals. */ if (joinrel->consider_parallel && save_jointype != JOIN_UNIQUE_OUTER && save_jointype != JOIN_FULL && save_jointype != JOIN_RIGHT && outerrel->partial_pathlist != NIL && - bms_is_empty(joinrel->lateral_relids)) + bms_is_empty(joinrel->lateral_relids) && + bms_is_subset(innerrel->lateral_relids, outerrel->relids) && + (bms_is_empty(outerrel->lateral_relids) || !bms_is_subset(outerrel->lateral_relids, innerrel->relids))) { if (nestjoinOK) consider_parallel_nestloop(root, joinrel, outerrel, innerrel, diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index a707dc9f26..f0002f6887 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -822,6 +822,9 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) if (param->paramkind == PARAM_EXTERN) return false; + if (param->paramkind == PARAM_EXEC) + return false; + if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5c32c96b71..144b2c485d 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -2418,6 +2418,8 @@ create_nestloop_path(PlannerInfo *root, NestPath *pathnode = makeNode(NestPath); Relids inner_req_outer = PATH_REQ_OUTER(inner_path); + /* TODO: Assert lateral relids subset safety? */ + /* * If the inner path is parameterized by the outer, we must drop any * restrict_clauses that are due to be moved into the inner path. We have diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index effb9d03a0..52cd3512b3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -58,6 +58,7 @@ #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "optimizer/cost.h" #include "optimizer/geqo.h" #include "optimizer/optimizer.h" diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index 1f33fe13c1..75681d6fb9 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -2478,7 +2478,7 @@ typedef struct MinMaxAggInfo * for conflicting purposes. * * In addition, PARAM_EXEC slots are assigned for Params representing outputs - * from subplans (values that are setParam items for those subplans). These + * from subplans (values that are setParam items for those subplans). [TODO: is this true, or only for init plans?] These * IDs need not be tracked via PlannerParamItems, since we do not need any * duplicate-elimination nor later processing of the represented expressions. * Instead, we just record the assignment of the slot number by appending to diff --git a/src/test/regress/expected/incremental_sort.out b/src/test/regress/expected/incremental_sort.out index 545e301e48..8f9ca05e60 100644 --- a/src/test/regress/expected/incremental_sort.out +++ b/src/test/regress/expected/incremental_sort.out @@ -1614,16 +1614,16 @@ from tenk1 t, generate_series(1, 1000); QUERY PLAN --------------------------------------------------------------------------------- Unique - -> Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + -> Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (11 rows) explain (costs off) select @@ -1633,16 +1633,16 @@ from tenk1 t, generate_series(1, 1000) order by 1, 2; QUERY PLAN --------------------------------------------------------------------------- - Sort - Sort Key: t.unique1, ((SubPlan 1)) - -> Gather - Workers Planned: 2 + Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: t.unique1, ((SubPlan 1)) -> Nested Loop -> Parallel Index Only Scan using tenk1_unique1 on tenk1 t -> Function Scan on generate_series - SubPlan 1 - -> Index Only Scan using tenk1_unique1 on tenk1 - Index Cond: (unique1 = t.unique1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on tenk1 + Index Cond: (unique1 = t.unique1) (10 rows) -- Parallel sort but with expression not available until the upper rel. diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index 2303f70d6e..124fe9fec5 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -311,6 +311,131 @@ select count(*) from tenk1 where (two, four) not in 10000 (1 row) +-- test parallel plans for queries containing correlated subplans +-- where the subplan only needs params available from the current +-- worker's scan. +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t, generate_series(1, 10); + QUERY PLAN +---------------------------------------------------------------------------- + Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Nested Loop + Output: (SubPlan 1) + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + -> Function Scan on pg_catalog.generate_series + Output: generate_series.generate_series + Function Call: generate_series(1, 10) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(14 rows) + +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t; + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(9 rows) + +explain (analyze, costs off, summary off, verbose, timing off) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t + limit 1; + QUERY PLAN +---------------------------------------------------------------------------------------------------- + Limit (actual rows=1 loops=1) + Output: ((SubPlan 1)) + -> Gather (actual rows=1 loops=1) + Output: ((SubPlan 1)) + Workers Planned: 4 + Workers Launched: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t (actual rows=1 loops=5) + Output: (SubPlan 1) + Heap Fetches: 0 + Worker 0: actual rows=1 loops=1 + Worker 1: actual rows=1 loops=1 + Worker 2: actual rows=1 loops=1 + Worker 3: actual rows=1 loops=1 + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 (actual rows=1 loops=5) + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) + Heap Fetches: 0 + Worker 0: actual rows=1 loops=1 + Worker 1: actual rows=1 loops=1 + Worker 2: actual rows=1 loops=1 + Worker 3: actual rows=1 loops=1 +(22 rows) + +explain (costs off, verbose) select t.unique1 + from tenk1 t + where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: t.unique1 + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1 + Filter: (t.unique1 = (SubPlan 1)) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(10 rows) + +explain (costs off, verbose) select * + from tenk1 t + order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Gather Merge + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Workers Planned: 4 + -> Sort + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, ((SubPlan 1)) + Sort Key: ((SubPlan 1)) + -> Parallel Seq Scan on public.tenk1 t + Output: t.unique1, t.unique2, t.two, t.four, t.ten, t.twenty, t.hundred, t.thousand, t.twothousand, t.fivethous, t.tenthous, t.odd, t.even, t.stringu1, t.stringu2, t.string4, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(12 rows) + +-- test subplan in join/lateral join +explain (costs off, verbose, timing off) select t.unique1, l.* + from tenk1 t + join lateral ( + select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0) + ) l on true; + QUERY PLAN +---------------------------------------------------------------------- + Gather + Output: t.unique1, ((SubPlan 1)) + Workers Planned: 4 + -> Parallel Index Only Scan using tenk1_unique1 on public.tenk1 t + Output: t.unique1, (SubPlan 1) + SubPlan 1 + -> Index Only Scan using tenk1_unique1 on public.tenk1 + Output: t.unique1 + Index Cond: (tenk1.unique1 = t.unique1) +(9 rows) + -- this is not parallel-safe due to use of random() within SubLink's testexpr: explain (costs off) select * from tenk1 where (unique1 + random())::integer not in diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 019e17e751..c49799b6d4 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -111,6 +111,31 @@ explain (costs off) (select hundred, thousand from tenk2 where thousand > 100); select count(*) from tenk1 where (two, four) not in (select hundred, thousand from tenk2 where thousand > 100); +-- test parallel plans for queries containing correlated subplans +-- where the subplan only needs params available from the current +-- worker's scan. +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t, generate_series(1, 10); +explain (costs off, verbose) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t; +explain (analyze, costs off, summary off, verbose, timing off) select + (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1) + from tenk1 t + limit 1; +explain (costs off, verbose) select t.unique1 + from tenk1 t + where t.unique1 = (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); +explain (costs off, verbose) select * + from tenk1 t + order by (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1); +-- test subplan in join/lateral join +explain (costs off, verbose, timing off) select t.unique1, l.* + from tenk1 t + join lateral ( + select (select t.unique1 from tenk1 where tenk1.unique1 = t.unique1 offset 0) + ) l on true; -- this is not parallel-safe due to use of random() within SubLink's testexpr: explain (costs off) select * from tenk1 where (unique1 + random())::integer not in -- 2.17.1