From 83b8b55c979a83440b4135e6a755331343585870 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 4 Jan 2021 13:24:24 +0530 Subject: [PATCH v20 4/4] Enable CTAS Parallel Inserts For Append This patch allows pushing down the CTAS dest receiver even if there exists Gather node under Top Append node. It also add the code for influencing the planner to consider parallel tuple cost zero and asserts for wrong enforcement if later parallel insertion is not possible. Test cases are also included in this patch. --- src/backend/executor/execParallel.c | 152 ++-- src/backend/optimizer/path/allpaths.c | 31 + src/backend/optimizer/plan/planner.c | 12 +- src/include/executor/execParallel.h | 4 +- src/test/regress/expected/write_parallel.out | 722 +++++++++++++++++++ src/test/regress/sql/write_parallel.sql | 222 ++++++ 6 files changed, 1086 insertions(+), 57 deletions(-) diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 2846df66e6..5f298c4328 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -151,6 +151,9 @@ static void SaveParallelInsCmdFixedInfo(ParallelExecutorInfo *pei, static void SaveParallelInsCmdInfo(ParallelContext *pcxt, ParallelInsertCmdKind ins_cmd, void *ins_info); +static bool PushDownParallelInsertState(DestReceiver *dest, PlanState *ps, + ParallelInsertCmdKind ins_cmd, + bool *gather_exists); /* Helper functions that run in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); @@ -1748,6 +1751,84 @@ IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info) return false; } +/* + * Push the dest receiver to Gather node when it is either at the top of the + * plan or under top Append node and if it does not have any projections to do. + * Required information from the pushed dest receiver is sent to workers so + * that they can perform parallel insertions into the target table. + * + * If the top node is Append, then this function recursively checks the sub + * plans for Gather nodes, when found one(and if it does not have projections), + * then sets the dest receiver information. + * + * In this function we only care about Append and Gather nodes. This function + * returns true if at least one Gather node can allow parallel insertions by + * the workers. Otherwise returns false. It also sets gather_exists to true if + * at least one Gather node exists. + */ +static bool +PushDownParallelInsertState(DestReceiver *dest, PlanState *ps, + ParallelInsertCmdKind ins_cmd, bool *gather_exists) +{ + bool parallel = false; + + if (ps == NULL) + return parallel; + + if (IsA(ps, AppendState)) + { + AppendState *aps = (AppendState *) ps; + + for (int i = 0; i < aps->as_nplans; i++) + { + parallel |= PushDownParallelInsertState(dest, aps->appendplans[i], + ins_cmd, gather_exists); + } + } + else if (IsA(ps, GatherState)) + { + GatherState *gstate = (GatherState *) ps; + + /* + * Set to true if there exists at least one Gather node either at the + * top of the plan or as a direct sub node under Append node. + */ + *gather_exists |= true; + + if (!gstate->ps.ps_ProjInfo) + { + parallel = true; + + /* Okay to parallelize inserts, so mark it. */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = true; + + /* + * For parallelizing inserts in CTAS we must send information such + * as into clause (to build separate dest receiver), object id (to + * open the created table) to each workers. Since this information + * is available in the CTAS dest receiver, store a reference to it + * in the Gather state so that it will be used in + * ExecInitParallelPlan to pick the required information. + */ + gstate->dest = dest; + } + else + { + /* + * Gather node has projections, so parallel insertions are not + * allowed. + */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = false; + + gstate->dest = NULL; + } + } + + return parallel; +} + /* * Set the parallel insert state, if the upper node is Gather and it doesn't * have any projections. The parallel insert state includes information such as @@ -1758,67 +1839,32 @@ void SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc, uint8 *tuple_cost_opts) { - GatherState *gstate; - DestReceiver *dest; + bool allow = false; + bool gather_exists = false; Assert(queryDesc && (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS)); - gstate = (GatherState *) queryDesc->planstate; - dest = queryDesc->dest; - - /* - * Parallel insertions are possible only if the upper node is Gather. - */ - if (!IsA(gstate, GatherState)) - return; - - if (tuple_cost_opts && gstate->ps.ps_ProjInfo) - Assert(!(*tuple_cost_opts & PARALLEL_INSERT_TUP_COST_IGNORED)); + allow = PushDownParallelInsertState(queryDesc->dest, queryDesc->planstate, + ins_cmd, &gather_exists); /* - * Parallelize inserts only when the upper Gather node has no projections. + * When parallel insertion is not allowed, before returning ensure that + * we have not done wrong parallel tuple cost enforcement in the planner. + * Main reason for this assertion is to check if we enforced the planner to + * ignore the parallel tuple cost (with the intention of choosing parallel + * inserts) due to which the parallel plan may have been chosen, but we do + * not allow the parallel inserts now. + * + * If we have correctly ignored parallel tuple cost in the planner while + * creating Gather path, then this assertion failure should not occur. In + * case it occurs, that means the planner may have chosen this parallel + * plan because of our wrong enforcement. So let's try to catch that here. */ - if (!gstate->ps.ps_ProjInfo) + if (!allow && gather_exists) { - /* Okay to parallelize inserts, so mark it. */ - if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) - ((DR_intorel *) dest)->is_parallel = true; - - /* - * For parallelizing inserts, we must send some information so that the - * workers can build their own dest receivers. For CTAS, this info is - * into clause, object id (to open the created table). - * - * Since the required information is available in the dest receiver, - * store a reference to it in the Gather state so that it will be used - * in ExecInitParallelPlan to pick the information. - */ - gstate->dest = dest; - } - else - { - /* - * Upper Gather node has projections, so parallel insertions are not - * allowed. - */ - if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) - ((DR_intorel *) dest)->is_parallel = false; - - gstate->dest = NULL; - /* - * Before returning, ensure that we have not done wrong parallel tuple - * cost enforcement in the planner. Main reason for this assertion is - * to check if we enforced the planner to ignore the parallel tuple - * cost (with the intention of choosing parallel inserts) due to which - * the parallel plan may have been chosen, but we do not allow the - * parallel inserts now. - * - * If we have correctly ignored parallel tuple cost in the planner - * while creating Gather path, then this assertion failure should not - * occur. In case it occurs, that means the planner may have chosen - * this parallel plan because of our wrong enforcement. So let's try to - * catch that here. + * Parallel insertion is not allowed, but gather node exists, check if + * we have done wrong tuple cost enforcement. */ Assert(tuple_cost_opts && !(*tuple_cost_opts & PARALLEL_INSERT_TUP_COST_IGNORED)); diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 026a4b0848..96b5ce81c9 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -23,6 +23,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "executor/execParallel.h" #include "foreign/fdwapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -1103,6 +1104,36 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, if (root->glob->parallelModeOK && rel->consider_parallel) set_rel_consider_parallel(root, childrel, childRTE); + /* + * When subplan is subquery, It's possible to do parallel insert if top + * node of subquery is Gather, so we turn on a flag to ignore parallel + * tuple cost in cost_gather if the SELECT is for CTAS. + */ + if (childrel->rtekind == RTE_SUBQUERY) + { + /* + * When there is no parent path generating clause(such as limit, + * sort, distinct...), we can turn on the flag for two cases: + * i) query_level is 1 + * ii) query_level > 1 then turn on the flag in the parent_root. + * The case ii) is to check append under append: + * Append + * ->Append + * ->Gather + * ->Other plan + */ + if (root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_SELECT_QUERY && + (root->query_level == 1 || + root->parent_root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_CAN_IGN_TUP_COST_APPEND) && + !(HAS_PARENT_PATH_GENERATING_CLAUSE(root))) + { + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_CAN_IGN_TUP_COST_APPEND; + } + } + /* * Compute the child's size. */ diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index d1b7347de2..423619735b 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -7351,9 +7351,15 @@ can_partial_agg(PlannerInfo *root) static void ignore_parallel_tuple_cost(PlannerInfo *root) { - if (root->query_level == 1 && - (root->parse->parallelInsCmdTupleCostOpt & - PARALLEL_INSERT_SELECT_QUERY)) + if (root->query_level != 1 && + (root->parent_root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_CAN_IGN_TUP_COST_APPEND)) + { + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_SELECT_QUERY; + } + + if (root->parse->parallelInsCmdTupleCostOpt & PARALLEL_INSERT_SELECT_QUERY) { /* * In each of the HAS_PARENT_PATH_GENERATING_CLAUSE cases, a parent diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index f76b5c2ffd..41f116bbf5 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -65,7 +65,9 @@ typedef enum ParallelInsertCmdTupleCostOpt */ PARALLEL_INSERT_CAN_IGN_TUP_COST = 1 << 1, /* Turn on this after the cost is ignored. */ - PARALLEL_INSERT_TUP_COST_IGNORED = 1 << 2 + PARALLEL_INSERT_TUP_COST_IGNORED = 1 << 2, + /* Turn on this in case tuple cost needs to be ignored for Append cases. */ + PARALLEL_INSERT_CAN_IGN_TUP_COST_APPEND = 1 << 3 } ParallelInsertCmdTupleCostOpt; /* diff --git a/src/test/regress/expected/write_parallel.out b/src/test/regress/expected/write_parallel.out index 38a18c5a9b..356a2d0002 100644 --- a/src/test/regress/expected/write_parallel.out +++ b/src/test/regress/expected/write_parallel.out @@ -631,6 +631,728 @@ drop table parallel_write; reset enable_nestloop; reset enable_mergejoin; reset enable_hashjoin; +-- test cases for performing parallel inserts when Append node is at the top +-- and Gather node is in one of its direct sub plans. +-- case 1: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. +-- Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Parallel Seq Scan +select explain_pictas( +'create table parallel_write as + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5 union all + select * from temp2 where col2 = 5;'); + explain_pictas +---------------------------------------------------------------------- + Append (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = 5) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) + Filter: (col2 = 5) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp2 temp2_1 (actual rows=N loops=N) + Filter: (col2 = 5) + Rows Removed by Filter: N +(22 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +drop table parallel_write; +-- case 2: parallel inserts must occur at the top Gather node as we can push +-- the CTAS dest receiver to it. +-- Gather +-- ->Parallel Append +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select * from temp2;'); + explain_pictas +------------------------------------------------------------------------ + Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Append (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) + -> Parallel Seq Scan on temp2 temp2_1 (actual rows=N loops=N) +(8 rows) + +select count(*) from parallel_write; + count +------- + 15 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select (select col2 from temp2 limit 1) col2 from temp1 union all + select (select col2 from temp2 limit 1) col2 from temp1 union all + select * from temp2;'); + explain_pictas +-------------------------------------------------------------------------------- + Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $1, $3 + Workers Launched: N + -> Create parallel_write + InitPlan 1 (returns $1) + -> Limit (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 temp2_1 (actual rows=N loops=N) + InitPlan 2 (returns $3) + -> Limit (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 temp2_2 (actual rows=N loops=N) + -> Parallel Append (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) +(21 rows) + +select count(*) from parallel_write; + count +------- + 15 +(1 row) + +drop table parallel_write; +-- case 3: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. Non-Gather nodes will do inserts by sending tuples to +-- Append and from there to CTAS dest receiver. +-- Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Seq Scan / Join / any other non-Gather node +-- ->Gather +-- ->Parallel Seq Scan +select explain_pictas( +'create table parallel_write as + select * from temp1 where col1 = 5 union all + select (select temp1.col1 from temp2 limit 1) col2 from temp1 union all + select * from temp1 where col1 = 5;'); + explain_pictas +------------------------------------------------------------------------------ + Append (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + Filter: (col1 = 5) + Rows Removed by Filter: N + -> Seq Scan on temp1 (actual rows=N loops=N) + SubPlan 1 + -> Limit (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp1 temp1_2 (actual rows=N loops=N) + Filter: (col1 = 5) + Rows Removed by Filter: N +(22 rows) + +select count(*) from parallel_write; + count +------- + 7 +(1 row) + +drop table parallel_write; +alter table temp2 set (parallel_workers = 0); +select explain_pictas( +'create table parallel_write as select * from temp1 where col1 = (select 1) union all + select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2);'); + explain_pictas +------------------------------------------------------------------------ + Append (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0 + Workers Launched: N + -> Create parallel_write + InitPlan 1 (returns $0) + -> Result (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = $0) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $1 + Workers Launched: N + -> Create parallel_write + InitPlan 2 (returns $1) + -> Result (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + Filter: (col1 = $1) + Rows Removed by Filter: N + -> Seq Scan on temp2 (actual rows=N loops=N) + Filter: (col2 = $2) + Rows Removed by Filter: N + InitPlan 3 (returns $2) + -> Result (actual rows=N loops=N) +(26 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +alter table temp2 reset (parallel_workers); +drop table parallel_write; +-- case 4: parallel inserts must not occur as there will be no direct Gather +-- node under Append node. Non-Gather nodes will do inserts by sending tuples +-- to Append and from there to CTAS dest receiver. +-- Append +-- ->Seq Scan / Join / any other non-Gather node +-- ->Seq Scan / Join / any other non-Gather node +-- ->Seq Scan / Join / any other non-Gather node +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select (select temp1.col1 from temp2 limit 1) col2 from temp1;'); + explain_pictas +-------------------------------------------------------------------------------------- + Append (actual rows=N loops=N) + -> Seq Scan on temp1 (actual rows=N loops=N) + -> Seq Scan on temp2 (actual rows=N loops=N) + -> Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + SubPlan 1 + -> Limit (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 temp2_1 (actual rows=N loops=N) +(10 rows) + +select count(*) from parallel_write; + count +------- + 15 +(1 row) + +drop table parallel_write; +-- case 5: parallel inserts must occur at the top Gather node as we can push +-- the CTAS dest receiver to it. +-- Gather +-- ->Parallel Append +-- ->Seq Scan / Join / any other non-Gather node +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan +alter table temp2 set (parallel_workers = 0); +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select * from temp1;'); + explain_pictas +------------------------------------------------------------------------ + Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Append (actual rows=N loops=N) + -> Seq Scan on temp2 (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) +(8 rows) + +select count(*) from parallel_write; + count +------- + 15 +(1 row) + +drop table parallel_write; +alter table temp2 reset (parallel_workers); +-- case 6: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. +-- Append +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Gather +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp2 where col2 = (select 2) union all + select * from temp2 where col2 = (select 2);'); + explain_pictas +---------------------------------------------------------------------------- + Append (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + InitPlan 1 (returns $0) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $1 + Workers Launched: N + -> Create parallel_write + InitPlan 2 (returns $1) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($1 = $0) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = $0) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $2 + Workers Launched: N + -> Create parallel_write + InitPlan 3 (returns $2) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($2 = $0) + -> Parallel Seq Scan on temp2 (never executed) + Filter: (col2 = $0) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $3 + Workers Launched: N + -> Create parallel_write + InitPlan 4 (returns $3) + -> Result (actual rows=N loops=N) + -> Parallel Seq Scan on temp2 temp2_1 (actual rows=N loops=N) + Filter: (col2 = $3) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $4 + Workers Launched: N + -> Create parallel_write + InitPlan 5 (returns $4) + -> Result (actual rows=N loops=N) + -> Parallel Seq Scan on temp2 temp2_2 (actual rows=N loops=N) + Filter: (col2 = $4) + Rows Removed by Filter: N +(47 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5;'); + explain_pictas +------------------------------------------------------------------------------------ + Append (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + InitPlan 1 (returns $0) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $1 + Workers Launched: N + -> Create parallel_write + InitPlan 2 (returns $1) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($1 = $0) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = $0) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $2 + Workers Launched: N + -> Create parallel_write + InitPlan 3 (returns $2) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($2 = $0) + -> Parallel Seq Scan on temp2 (never executed) + Filter: (col2 = $0) + -> Append (actual rows=N loops=N) + InitPlan 4 (returns $3) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $3, $4 + Workers Launched: N + -> Create parallel_write + InitPlan 5 (returns $4) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($4 = $3) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + Filter: (col1 = $3) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $3, $5 + Workers Launched: N + -> Create parallel_write + InitPlan 6 (returns $5) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($5 = $3) + -> Parallel Seq Scan on temp2 temp2_1 (never executed) + Filter: (col2 = $3) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp1 temp1_2 (actual rows=N loops=N) + Filter: (col1 = 5) + Rows Removed by Filter: N + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp2 temp2_2 (actual rows=N loops=N) + Filter: (col2 = 5) + Rows Removed by Filter: N +(67 rows) + +select count(*) from parallel_write; + count +------- + 4 +(1 row) + +drop table parallel_write; +-- case 7: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. Non-Gather nodes will do inserts by sending tuples +-- to Append and from there to CTAS dest receiver. +-- Append +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Seq Scan / Join / any other non-Gather node +-- ->Gather +alter table temp2 set (parallel_workers = 0); +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp2 where col2 = (select 2) union all + select * from temp1 where col1 = (select 2);'); + explain_pictas +---------------------------------------------------------------------------- + Append (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + InitPlan 1 (returns $0) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $1 + Workers Launched: N + -> Create parallel_write + InitPlan 2 (returns $1) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($1 = $0) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = $0) + Rows Removed by Filter: N + -> Result (actual rows=N loops=N) + One-Time Filter: ($2 = $0) + InitPlan 3 (returns $2) + -> Result (actual rows=N loops=N) + -> Seq Scan on temp2 (never executed) + Filter: (col2 = $0) + -> Seq Scan on temp2 temp2_1 (actual rows=N loops=N) + Filter: (col2 = $3) + Rows Removed by Filter: N + InitPlan 4 (returns $3) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $4 + Workers Launched: N + -> Create parallel_write + InitPlan 5 (returns $4) + -> Result (actual rows=N loops=N) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + Filter: (col1 = $4) + Rows Removed by Filter: N +(37 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5;'); + explain_pictas +------------------------------------------------------------------------------------ + Append (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + InitPlan 1 (returns $0) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $0, $1 + Workers Launched: N + -> Create parallel_write + InitPlan 2 (returns $1) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($1 = $0) + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + Filter: (col1 = $0) + Rows Removed by Filter: N + -> Result (actual rows=N loops=N) + One-Time Filter: ($2 = $0) + InitPlan 3 (returns $2) + -> Result (actual rows=N loops=N) + -> Seq Scan on temp2 (never executed) + Filter: (col2 = $0) + -> Append (actual rows=N loops=N) + InitPlan 4 (returns $3) + -> Result (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Params Evaluated: $3, $4 + Workers Launched: N + -> Create parallel_write + InitPlan 5 (returns $4) + -> Result (actual rows=N loops=N) + -> Result (actual rows=N loops=N) + One-Time Filter: ($4 = $3) + -> Parallel Seq Scan on temp1 temp1_1 (actual rows=N loops=N) + Filter: (col1 = $3) + Rows Removed by Filter: N + -> Result (actual rows=N loops=N) + One-Time Filter: ($5 = $3) + InitPlan 6 (returns $5) + -> Result (actual rows=N loops=N) + -> Seq Scan on temp2 temp2_1 (never executed) + Filter: (col2 = $3) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Create parallel_write + -> Parallel Seq Scan on temp1 temp1_2 (actual rows=N loops=N) + Filter: (col1 = 5) + Rows Removed by Filter: N + -> Seq Scan on temp2 temp2_2 (actual rows=N loops=N) + Filter: (col2 = 5) + Rows Removed by Filter: N +(53 rows) + +select count(*) from parallel_write; + count +------- + 4 +(1 row) + +drop table parallel_write; +alter table temp2 reset (parallel_workers); +-- case 8: parallel inserts must not occur because there is no Gather or Append +-- node at the top for union, except/except all, intersect/intersect all +-- cases. +select explain_pictas( +'create table parallel_write as + select * from temp1 union + select * from temp2;'); + explain_pictas +---------------------------------------------------------------------- + HashAggregate (actual rows=N loops=N) + Group Key: temp1.col1 + Batches: 1 Memory Usage: 217kB + -> Append (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) +(12 rows) + +select count(*) from parallel_write; + count +------- + 5 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from temp1 except + select * from temp2 where col2 < 3;'); + explain_pictas +---------------------------------------------------------------------------- + HashSetOp Except (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 1" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 2" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) + Filter: (col2 < 3) + Rows Removed by Filter: N +(14 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from temp1 except all + select * from temp2 where col2 < 3;'); + explain_pictas +---------------------------------------------------------------------------- + HashSetOp Except All (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 1" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 2" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) + Filter: (col2 < 3) + Rows Removed by Filter: N +(14 rows) + +select count(*) from parallel_write; + count +------- + 3 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from temp1 intersect + select * from temp2;'); + explain_pictas +---------------------------------------------------------------------------- + HashSetOp Intersect (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 1" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 2" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) +(12 rows) + +select count(*) from parallel_write; + count +------- + 5 +(1 row) + +drop table parallel_write; +select explain_pictas( +'create table parallel_write as + select * from temp1 intersect all + select * from temp2;'); + explain_pictas +---------------------------------------------------------------------------- + HashSetOp Intersect All (actual rows=N loops=N) + -> Append (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 1" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp1 (actual rows=N loops=N) + -> Subquery Scan on "*SELECT* 2" (actual rows=N loops=N) + -> Gather (actual rows=N loops=N) + Workers Planned: 3 + Workers Launched: N + -> Parallel Seq Scan on temp2 (actual rows=N loops=N) +(12 rows) + +select count(*) from parallel_write; + count +------- + 5 +(1 row) + +drop table parallel_write; drop table temp1; drop table temp2; drop table temp3; diff --git a/src/test/regress/sql/write_parallel.sql b/src/test/regress/sql/write_parallel.sql index 40aadafc2a..32e6ad8636 100644 --- a/src/test/regress/sql/write_parallel.sql +++ b/src/test/regress/sql/write_parallel.sql @@ -246,6 +246,228 @@ reset enable_nestloop; reset enable_mergejoin; reset enable_hashjoin; +-- test cases for performing parallel inserts when Append node is at the top +-- and Gather node is in one of its direct sub plans. + +-- case 1: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. +-- Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Parallel Seq Scan + +select explain_pictas( +'create table parallel_write as + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5 union all + select * from temp2 where col2 = 5;'); +select count(*) from parallel_write; +drop table parallel_write; + +-- case 2: parallel inserts must occur at the top Gather node as we can push +-- the CTAS dest receiver to it. +-- Gather +-- ->Parallel Append +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan + +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select * from temp2;'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select (select col2 from temp2 limit 1) col2 from temp1 union all + select (select col2 from temp2 limit 1) col2 from temp1 union all + select * from temp2;'); +select count(*) from parallel_write; +drop table parallel_write; + +-- case 3: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. Non-Gather nodes will do inserts by sending tuples to +-- Append and from there to CTAS dest receiver. +-- Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Seq Scan / Join / any other non-Gather node +-- ->Gather +-- ->Parallel Seq Scan + +select explain_pictas( +'create table parallel_write as + select * from temp1 where col1 = 5 union all + select (select temp1.col1 from temp2 limit 1) col2 from temp1 union all + select * from temp1 where col1 = 5;'); +select count(*) from parallel_write; +drop table parallel_write; + +alter table temp2 set (parallel_workers = 0); +select explain_pictas( +'create table parallel_write as select * from temp1 where col1 = (select 1) union all + select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2);'); +select count(*) from parallel_write; +alter table temp2 reset (parallel_workers); +drop table parallel_write; + +-- case 4: parallel inserts must not occur as there will be no direct Gather +-- node under Append node. Non-Gather nodes will do inserts by sending tuples +-- to Append and from there to CTAS dest receiver. +-- Append +-- ->Seq Scan / Join / any other non-Gather node +-- ->Seq Scan / Join / any other non-Gather node +-- ->Seq Scan / Join / any other non-Gather node + +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select (select temp1.col1 from temp2 limit 1) col2 from temp1;'); +select count(*) from parallel_write; +drop table parallel_write; + +-- case 5: parallel inserts must occur at the top Gather node as we can push +-- the CTAS dest receiver to it. +-- Gather +-- ->Parallel Append +-- ->Seq Scan / Join / any other non-Gather node +-- ->Parallel Seq Scan +-- ->Parallel Seq Scan + +alter table temp2 set (parallel_workers = 0); + +select explain_pictas( +'create table parallel_write as + select * from temp1 union all + select * from temp2 union all + select * from temp1;'); +select count(*) from parallel_write; +drop table parallel_write; + +alter table temp2 reset (parallel_workers); + +-- case 6: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. +-- Append +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Gather +-- ->Gather + +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp2 where col2 = (select 2) union all + select * from temp2 where col2 = (select 2);'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5;'); +select count(*) from parallel_write; +drop table parallel_write; + +-- case 7: parallel inserts must occur at each Gather node as we can push the +-- CTAS dest receiver. Non-Gather nodes will do inserts by sending tuples +-- to Append and from there to CTAS dest receiver. +-- Append +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Append +-- ->Gather +-- ->Parallel Seq Scan +-- ->Seq Scan / Join / any other non-Gather node +-- ->Gather + +alter table temp2 set (parallel_workers = 0); + +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp2 where col2 = (select 2) union all + select * from temp1 where col1 = (select 2);'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from (select * from temp1 where col1 = (select 1) union all + select * from temp2 where col2 = (select 2)) as tt + where col1 = (select 1) union all + select * from temp1 where col1 = 5 union all + select * from temp2 where col2 = 5;'); +select count(*) from parallel_write; +drop table parallel_write; + +alter table temp2 reset (parallel_workers); + +-- case 8: parallel inserts must not occur because there is no Gather or Append +-- node at the top for union, except/except all, intersect/intersect all +-- cases. + +select explain_pictas( +'create table parallel_write as + select * from temp1 union + select * from temp2;'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from temp1 except + select * from temp2 where col2 < 3;'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from temp1 except all + select * from temp2 where col2 < 3;'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from temp1 intersect + select * from temp2;'); +select count(*) from parallel_write; +drop table parallel_write; + +select explain_pictas( +'create table parallel_write as + select * from temp1 intersect all + select * from temp2;'); +select count(*) from parallel_write; +drop table parallel_write; + drop table temp1; drop table temp2; drop table temp3; -- 2.25.1