From 7706545576f486a92adc9568ae3042c396a52b68 Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Thu, 7 Jan 2021 11:24:17 +0530 Subject: [PATCH v21 2/4] Tuple Cost Adjustment for Parallel Inserts in CTAS Let the planner know that the SELECT is from CTAS in createas.c so that it can ignore parallel tuple cost in case the workers can insert the tuples in parallel. This is okay because the Gather node will not actually receive any tuples. With the planner ignoring the parallel tuple cost, there are chances that the planner may choose the parallel plan which otherwise would have been costly compared with the non-parallel plans and ignored. --- src/backend/commands/createas.c | 13 +++++- src/backend/commands/explain.c | 22 +++++++-- src/backend/commands/prepare.c | 3 +- src/backend/executor/execParallel.c | 66 ++++++++++++++++++++------- src/backend/optimizer/path/costsize.c | 20 +++++++- src/backend/optimizer/plan/planner.c | 40 ++++++++++++++++ src/include/commands/explain.h | 3 +- src/include/executor/execParallel.h | 22 ++++++++- src/include/nodes/parsenodes.h | 2 + src/include/optimizer/planner.h | 10 ++++ 10 files changed, 177 insertions(+), 24 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index a8050a2767..53ca3010c6 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -310,6 +310,16 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, query = linitial_node(Query, rewritten); Assert(query->commandType == CMD_SELECT); + /* + * Turn on a flag to indicate planner so that it can ignore parallel + * tuple cost while generating Gather path. + */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + query->parallelInsCmdTupleCostOpt |= PARALLEL_INSERT_SELECT_QUERY; + else + query->parallelInsCmdTupleCostOpt = 0; + /* plan the query */ plan = pg_plan_query(query, pstate->p_sourcetext, CURSOR_OPT_PARALLEL_OK, params); @@ -342,7 +352,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * the executor to decide whether to allow parallel inserts or not. */ SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, - queryDesc); + queryDesc, + &query->parallelInsCmdTupleCostOpt); } /* run the plan to completion */ diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index e985ea6db3..d7da07d4f6 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -383,11 +383,25 @@ ExplainOneQuery(Query *query, int cursorOptions, planduration; BufferUsage bufusage_start, bufusage; + ParallelInsertCTASInfo parallel_ins_info; + + parallel_ins_info.intoclause = into; + parallel_ins_info.objectid = InvalidOid; if (es->buffers) bufusage_start = pgBufferUsage; INSTR_TIME_SET_CURRENT(planstart); + /* + * Turn on a flag to indicate planner so that it can ignore parallel + * tuple cost while generating Gather path. + */ + if (IsParallelInsertionAllowed(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, + ¶llel_ins_info)) + query->parallelInsCmdTupleCostOpt |= PARALLEL_INSERT_SELECT_QUERY; + else + query->parallelInsCmdTupleCostOpt = 0; + /* plan the query */ plan = pg_plan_query(query, queryString, cursorOptions, params); @@ -403,7 +417,8 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + &query->parallelInsCmdTupleCostOpt); } } @@ -513,7 +528,8 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, + uint8 *parallel_ins_tuple_cost_opts) { DestReceiver *dest; QueryDesc *queryDesc; @@ -590,7 +606,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * the executor to decide whether to allow parallel inserts or not. */ SetParallelInsertState(PARALLEL_INSERT_CMD_CREATE_TABLE_AS, - queryDesc); + queryDesc, parallel_ins_tuple_cost_opts); } } diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 653ef8e41a..696d3343d4 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -672,7 +672,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + NULL); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ba4508c409..a26c9cdac8 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -1755,7 +1755,8 @@ IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info) * node so that the required information will be picked and sent to workers. */ void -SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc) +SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc, + uint8 *tuple_cost_opts) { GatherState *gstate; DestReceiver *dest; @@ -1766,24 +1767,57 @@ SetParallelInsertState(ParallelInsertCmdKind ins_cmd, QueryDesc *queryDesc) dest = queryDesc->dest; /* - * Parallel insertions are not possible either if the upper node is not - * Gather or it's a Gather but it have some projections to perform. + * Parallel insertions are possible only if the upper node is Gather. */ - if (!IsA(gstate, GatherState) || gstate->ps.ps_ProjInfo) + if (!IsA(gstate, GatherState)) return; - /* Okay to parallelize inserts, so mark it. */ - if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) - ((DR_intorel *) dest)->is_parallel = true; - /* - * For parallelizing inserts, we must send some information so that the - * workers can build their own dest receivers. For CTAS, this info is into - * clause, object id (to open the created table). - * - * Since the required information is available in the dest receiver, store - * a reference to it in the Gather state so that it will be used in - * ExecInitParallelPlan to pick the information. + * Parallelize inserts only when the upper Gather node has no projections. */ - gstate->dest = dest; + if (!gstate->ps.ps_ProjInfo) + { + /* Okay to parallelize inserts, so mark it. */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = true; + + /* + * For parallelizing inserts, we must send some information so that the + * workers can build their own dest receivers. For CTAS, this info is + * into clause, object id (to open the created table). + * + * Since the required information is available in the dest receiver, + * store a reference to it in the Gather state so that it will be used + * in ExecInitParallelPlan to pick the information. + */ + gstate->dest = dest; + } + else + { + /* + * Upper Gather node has projections, so parallel insertions are not + * allowed. + */ + if (ins_cmd == PARALLEL_INSERT_CMD_CREATE_TABLE_AS) + ((DR_intorel *) dest)->is_parallel = false; + + gstate->dest = NULL; + + /* + * Before returning, ensure that we have not done wrong parallel tuple + * cost enforcement in the planner. Main reason for this assertion is + * to check if we enforced the planner to ignore the parallel tuple + * cost (with the intention of choosing parallel inserts) due to which + * the parallel plan may have been chosen, but we do not allow the + * parallel inserts now. + * + * If we have correctly ignored parallel tuple cost in the planner + * while creating Gather path, then this assertion failure should not + * occur. In case it occurs, that means the planner may have chosen + * this parallel plan because of our wrong enforcement. So let's try to + * catch that here. + */ + Assert(tuple_cost_opts && !(*tuple_cost_opts & + PARALLEL_INSERT_TUP_COST_IGNORED)); + } } diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 380336518f..d79842dbf3 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -76,6 +76,7 @@ #include "access/amapi.h" #include "access/htup_details.h" #include "access/tsmapi.h" +#include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" @@ -393,7 +394,24 @@ cost_gather(GatherPath *path, PlannerInfo *root, /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; - run_cost += parallel_tuple_cost * path->path.rows; + + /* + * Do not consider tuple cost in case of we intend to perform parallel + * inserts by workers. We would have turned on the ignore flag in + * apply_scanjoin_target_to_paths before generating Gather path for the + * upper level SELECT part of the query. + */ + if ((root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_SELECT_QUERY) && + (root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_CAN_IGN_TUP_COST)) + { + /* We are ignoring the parallel tuple cost, so mark it. */ + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_TUP_COST_IGNORED; + } + else + run_cost += parallel_tuple_cost * path->path.rows; path->path.startup_cost = startup_cost; path->path.total_cost = (startup_cost + run_cost); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 4e6497ff32..d1b7347de2 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -28,6 +28,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -7338,6 +7339,36 @@ can_partial_agg(PlannerInfo *root) return true; } +/* + * ignore_parallel_tuple_cost + * + * Gather node will not receive any tuples from the workers in case each worker + * inserts them in parallel. So, we turn on a flag to ignore parallel tuple + * cost by the Gather path in cost_gather if the SELECT is for commands in + * which parallel insertion is possible and we are generating an upper level + * Gather path. + */ +static void +ignore_parallel_tuple_cost(PlannerInfo *root) +{ + if (root->query_level == 1 && + (root->parse->parallelInsCmdTupleCostOpt & + PARALLEL_INSERT_SELECT_QUERY)) + { + /* + * In each of the HAS_PARENT_PATH_GENERATING_CLAUSE cases, a parent + * path will be generated for the upper Gather path(in + * grouping_planner), in which case we can not let parallel inserts + * happen. So we do not turn on ignore tuple cost flag. + */ + if (HAS_PARENT_PATH_GENERATING_CLAUSE(root)) + return; + + root->parse->parallelInsCmdTupleCostOpt |= + PARALLEL_INSERT_CAN_IGN_TUP_COST; + } +} + /* * apply_scanjoin_target_to_paths * @@ -7557,7 +7588,16 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) + { + /* + * Turn on a flag to ignore parallel tuple cost by the Gather path in + * cost_gather if the SELECT is for commands in which parallel + * insertion is possible and we are generating an upper level Gather + * path. + */ + ignore_parallel_tuple_cost(root); generate_useful_gather_paths(root, rel, false); + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index e94d9e49cf..1a75c3ced3 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + uint8 *parallel_ins_tuple_cost_opts); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 689f577c08..f76b5c2ffd 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -49,6 +49,25 @@ typedef enum ParallelInsertCmdKind PARALLEL_INSERT_CMD_CREATE_TABLE_AS } ParallelInsertCmdKind; +/* + * Information sent to planner to account for tuple cost calculations in + * cost_gather for parallel insertions in commands such as CTAS. + * + * We need to let the planner know that there will be no tuples received by + * Gather node if workers insert the tuples in parallel. + */ +typedef enum ParallelInsertCmdTupleCostOpt +{ + PARALLEL_INSERT_SELECT_QUERY = 1 << 0, /* turn on this before planning */ + /* + * Turn on this while planning for upper Gather path to ignore parallel + * tuple cost in cost_gather. + */ + PARALLEL_INSERT_CAN_IGN_TUP_COST = 1 << 1, + /* Turn on this after the cost is ignored. */ + PARALLEL_INSERT_TUP_COST_IGNORED = 1 << 2 +} ParallelInsertCmdTupleCostOpt; + /* * For each of the command added to ParallelInsertCmdKind, add a corresponding * structure encompassing the information that's required to be shared across @@ -85,5 +104,6 @@ extern void *GetParallelInsertCmdInfo(DestReceiver *dest, extern bool IsParallelInsertionAllowed(ParallelInsertCmdKind ins_cmd, void *ins_info); extern void SetParallelInsertState(ParallelInsertCmdKind ins_cmd, - QueryDesc *queryDesc); + QueryDesc *queryDesc, + uint8 *tuple_cost_opts); #endif /* EXECPARALLEL_H */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index dc2bb40926..70a78b169b 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -180,6 +180,8 @@ typedef struct Query */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ + /* Parallel insertion tuple cost options. */ + uint8 parallelInsCmdTupleCostOpt; } Query; diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index 9a15de5025..b71d21d334 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -21,6 +21,16 @@ #include "nodes/pathnodes.h" #include "nodes/plannodes.h" +#define HAS_PARENT_PATH_GENERATING_CLAUSE(root) \ + (root->parse->rowMarks || \ + limit_needed(root->parse) || \ + root->parse->sortClause || \ + root->parse->distinctClause || \ + root->parse->hasWindowFuncs || \ + root->parse->groupClause || \ + root->parse->groupingSets || \ + root->parse->hasAggs || \ + root->hasHavingQual) /* Hook for plugins to get control in planner() */ typedef PlannedStmt *(*planner_hook_type) (Query *parse, -- 2.25.1