From 2484cfb3e57c79788448b04bfc1760b59fd20c4d Mon Sep 17 00:00:00 2001 From: Bharath Rupireddy Date: Mon, 21 Dec 2020 15:19:23 +0530 Subject: [PATCH v15 2/4] Tuple Cost Adjustment for Parallel Inserts in CTAS Let the planner know that the SELECT is from CTAS in createas.c so that it can set the number of tuples transferred from the workers to Gather node to 0. With this change, there are chances that the planner may choose the parallel plan. --- src/backend/commands/createas.c | 35 +++++++++++++++++- src/backend/commands/explain.c | 7 ++-- src/backend/commands/prepare.c | 3 +- src/backend/optimizer/path/costsize.c | 22 ++++++++++- src/backend/optimizer/plan/planner.c | 53 +++++++++++++++++++++++++++ src/include/commands/createas.h | 21 ++++++++++- src/include/commands/explain.h | 3 +- src/include/nodes/parsenodes.h | 1 + src/include/optimizer/planner.h | 10 +++++ 9 files changed, 146 insertions(+), 9 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 10f4f2b4d7..210927d4f4 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -344,7 +344,8 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * into the target table. We need plan state to be initialized by the * executor to decide whether to allow parallel inserts or not. */ - ChooseParallelInsertsInCTAS(into, queryDesc); + ChooseParallelInsertsInCTAS(into, queryDesc, + &query->CTASParallelInsInfo); /* run the plan to completion */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); @@ -659,8 +660,11 @@ intorel_destroy(DestReceiver *self) * insertion is possible, if yes set the parallel insert state i.e. push down * the dest receiver to the Gather nodes. */ -void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc) +void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc, + uint8 *tuple_cost_flags) { + bool allow = false; + if (!IS_CTAS(into)) return; @@ -695,5 +699,32 @@ void ChooseParallelInsertsInCTAS(IntoClause *into, QueryDesc *queryDesc) * explain plans. */ queryDesc->planstate->plan->plan_rows = 0; + + allow = true; } + + /* + * It should not happen that in cost_gather we have ignored the parallel + * tuple cost and now we are not allowing the parallel inserts. And also we + * might need assertion only if the top node is Gather. The main intention + * of assertion is to check if we enforced planner to ignore the parallel + * tuple cost (with the intention of choosing parallel inserts) due to + * which the parallel plan may have been chosen, but we do not allow the + * parallel inserts now. + */ + if (!allow && tuple_cost_flags && queryDesc && + IsA(queryDesc->planstate, GatherState)) + { + /* + * If we have correctly ignored parallel tuple cost in planner while + * creating Gather path, then this assertion failure should not occur. + * If it occurs, that means the planner may have chosen this parallel + * plan because of our enforcement to ignore the parallel tuple cost. + */ + Assert(!(*tuple_cost_flags & CTAS_PARALLEL_INS_TUP_COST_IGNORED)); + } + + if (tuple_cost_flags) + *tuple_cost_flags = CTAS_PARALLEL_INS_UNDEF; + } diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index fbd0bc5a81..efdb34d1f0 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -402,7 +402,8 @@ ExplainOneQuery(Query *query, int cursorOptions, /* run it (if needed) and produce output */ ExplainOnePlan(plan, into, es, queryString, params, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + &query->CTASParallelInsInfo); } } @@ -496,7 +497,7 @@ void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage) + const BufferUsage *bufusage, uint8 *ctas_tuple_cost_flags) { DestReceiver *dest; QueryDesc *queryDesc; @@ -562,7 +563,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * target table. We need plan state to be initialized by the executor to * decide whether to allow parallel inserts or not. */ - ChooseParallelInsertsInCTAS(into, queryDesc); + ChooseParallelInsertsInCTAS(into, queryDesc, ctas_tuple_cost_flags); /* Execute the plan for statistics if asked for */ if (es->analyze) diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 89087a7be3..07166479e7 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -672,7 +672,8 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, if (pstmt->commandType != CMD_UTILITY) ExplainOnePlan(pstmt, into, es, query_string, paramLI, queryEnv, - &planduration, (es->buffers ? &bufusage : NULL)); + &planduration, (es->buffers ? &bufusage : NULL), + NULL); else ExplainOneUtility(pstmt->utilityStmt, into, es, query_string, paramLI, queryEnv); diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 22d6935824..800f25903d 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -76,6 +76,7 @@ #include "access/amapi.h" #include "access/htup_details.h" #include "access/tsmapi.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "executor/nodeHash.h" @@ -378,6 +379,7 @@ cost_gather(GatherPath *path, PlannerInfo *root, { Cost startup_cost = 0; Cost run_cost = 0; + bool ignore_tuple_cost = false; /* Mark the path with the correct row estimate */ if (rows) @@ -393,7 +395,25 @@ cost_gather(GatherPath *path, PlannerInfo *root, /* Parallel setup and communication cost. */ startup_cost += parallel_setup_cost; - run_cost += parallel_tuple_cost * path->path.rows; + + /* + * Do not consider tuple cost in case of we intend to perform parallel + * inserts by workers. We would have set ignore flag in + * apply_scanjoin_target_to_paths before generating Gather path for the + * upper level SELECT part of the CTAS. + */ + if ((root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT) && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + ignore_tuple_cost = true; + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_IGNORED; + } + + if (!ignore_tuple_cost) + run_cost += parallel_tuple_cost * path->path.rows; path->path.startup_cost = startup_cost; path->path.total_cost = (startup_cost + run_cost); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 1a94b58f8b..f1134711b0 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -28,6 +28,7 @@ #include "catalog/pg_inherits.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/createas.h" #include "executor/executor.h" #include "executor/nodeAgg.h" #include "foreign/fdwapi.h" @@ -7338,6 +7339,37 @@ can_partial_agg(PlannerInfo *root) return true; } +/* + * ignore_parallel_tuple_cost + * + * Gather node will not receive any tuples from the workers in case each worker + * inserts them in parallel. So, we set a flag to ignore parallel tuple cost by + * the Gather path in cost_gather if the SELECT is for CTAS and we are + * generating an upper level Gather path. +*/ +static bool +ignore_parallel_tuple_cost(PlannerInfo *root) +{ + if (root->query_level == 1 && + (root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT)) + { + /* + * In each of the HAS_PARENT_PATH_GENERATING_CLAUSE cases, a parent + * path will be generated for the upper Gather path(in + * grouping_planner), in which case we can not let parallel inserts + * happen. So we do not set ignore tuple cost flag. + */ + if (HAS_PARENT_PATH_GENERATING_CLAUSE(root)) + return false; + + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + + return true; + } + + return false; +} + /* * apply_scanjoin_target_to_paths * @@ -7557,8 +7589,29 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) + { + /* + * Set a flag to ignore parallel tuple cost by the Gather path in + * cost_gather if the SELECT is for CTAS and we are generating an upper + * level Gather path. + */ + bool ignore = ignore_parallel_tuple_cost(root); + generate_useful_gather_paths(root, rel, false); + /* + * Reset the ignore flag, in case we set it but + * generate_useful_gather_paths returned without reaching cost_gather. + */ + if (ignore && + (root->parse->CTASParallelInsInfo & + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN)) + { + root->parse->CTASParallelInsInfo &= + ~CTAS_PARALLEL_INS_TUP_COST_CAN_IGN; + } + } + /* * Reassess which paths are the cheapest, now that we've potentially added * new Gather (or Gather Merge) and/or Append (or MergeAppend) paths to diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index ed4690305b..0ae2f49e0c 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -39,6 +39,24 @@ typedef struct Oid object_id; } DR_intorel; +/* + * Information sent to the planner from CTAS to account for the cost + * calculations in cost_gather. We need to do this because, no tuples will be + * received by the Gather node if the workers insert the tuples in parallel. + */ +typedef enum CTASParallelInsertOpt +{ + CTAS_PARALLEL_INS_UNDEF = 0, /* undefined */ + CTAS_PARALLEL_INS_SELECT = 1 << 0, /* set to this before planning */ + /* + * Set to this while planning for upper Gather path to ignore parallel + * tuple cost in cost_gather. + */ + CTAS_PARALLEL_INS_TUP_COST_CAN_IGN = 1 << 1, + /* Set to this after the cost is ignored. */ + CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2 +} CTASParallelInsertOpt; + #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause)) #define IS_PARALLEL_CTAS_DEST(dest) (dest && dest->mydest == DestIntoRel && \ IS_CTAS(((DR_intorel *) dest)->into) && \ @@ -53,5 +71,6 @@ extern int GetIntoRelEFlags(IntoClause *intoClause); extern DestReceiver *CreateIntoRelDestReceiver(IntoClause *intoClause); extern void ChooseParallelInsertsInCTAS(IntoClause *into, - QueryDesc *queryDesc); + QueryDesc *queryDesc, + uint8 *tuple_cost_flags); #endif /* CREATEAS_H */ diff --git a/src/include/commands/explain.h b/src/include/commands/explain.h index ba661d32a6..1a1806dbf1 100644 --- a/src/include/commands/explain.h +++ b/src/include/commands/explain.h @@ -91,7 +91,8 @@ extern void ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, const char *queryString, ParamListInfo params, QueryEnvironment *queryEnv, const instr_time *planduration, - const BufferUsage *bufusage); + const BufferUsage *bufusage, + uint8 *ctas_tuple_cost_flags); extern void ExplainPrintPlan(ExplainState *es, QueryDesc *queryDesc); extern void ExplainPrintTriggers(ExplainState *es, QueryDesc *queryDesc); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 48a79a7657..81b148c383 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -180,6 +180,7 @@ typedef struct Query */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ + uint8 CTASParallelInsInfo; /* parallel insert in CTAS info */ } Query; diff --git a/src/include/optimizer/planner.h b/src/include/optimizer/planner.h index beb7dbbcbe..74b2563828 100644 --- a/src/include/optimizer/planner.h +++ b/src/include/optimizer/planner.h @@ -21,6 +21,16 @@ #include "nodes/pathnodes.h" #include "nodes/plannodes.h" +#define HAS_PARENT_PATH_GENERATING_CLAUSE(root) \ + (root->parse->rowMarks || \ + limit_needed(root->parse) || \ + root->parse->sortClause || \ + root->parse->distinctClause || \ + root->parse->hasWindowFuncs || \ + root->parse->groupClause || \ + root->parse->groupingSets || \ + root->parse->hasAggs || \ + root->hasHavingQual) /* Hook for plugins to get control in planner() */ typedef PlannedStmt *(*planner_hook_type) (Query *parse, -- 2.25.1