From 23a6b88d2d42913d559ca61b0050588cdd0db0d4 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 14 Dec 2020 06:31:12 -0500 Subject: [PATCH] patch for pinsert in append --- src/backend/commands/createas.c | 35 +++++++++++++++++++++++++++++++---- src/backend/commands/explain.c | 3 +-- src/backend/optimizer/path/allpaths.c | 34 ++++++++++++++++++++++++++++++++++ src/backend/optimizer/plan/planner.c | 2 +- src/include/commands/createas.h | 3 ++- 5 files changed, 69 insertions(+), 8 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 3ffea41..6e6c467 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -351,9 +351,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * into the target table. We need plan state to be initialized by the * executor to decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc, - &query->CTASParallelInsInfo)) - SetCTASParallelInsertState(queryDesc); + IsParallelInsertInCTASAllowed(into, queryDesc, &query->CTASParallelInsInfo); /* run the plan to completion */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); @@ -663,6 +661,35 @@ intorel_destroy(DestReceiver *self) pfree(self); } +static bool +PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps) +{ + bool parallel = false; + + if(ps == NULL) + return parallel; + + if(IsA(ps, AppendState)) + { + AppendState *aps = (AppendState *) ps; + for(int i = 0; i < aps->as_nplans; i++) + { + parallel |= PushDownCTASParallelInsertState(dest, aps->appendplans[i]); + } + } + else if(IsA(ps, GatherState) && !ps->ps_ProjInfo) + { + GatherState *gstate = (GatherState *) ps; + parallel = true; + + ((DR_intorel *) dest)->is_parallel = true; + gstate->dest = dest; + ps->plan->plan_rows = 0; + } + + return parallel; +} + /* * IsParallelInsertInCTASAllowed --- determine whether or not parallel * insertion is possible. @@ -698,7 +725,7 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc, * final phase i.e. merge the results by workers, so we do not allow * parallel inserts. */ - allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo; + allow = PushDownCTASParallelInsertState(queryDesc->dest, ps); /* * It should not happen that in cost_gather we have ignored the diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index d0152de..136f6f4 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -570,8 +570,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * target table. We need plan state to be initialized by the executor to * decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags)) - SetCTASParallelInsertState(queryDesc); + IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags); /* Execute the plan for statistics if asked for */ if (es->analyze) diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 84a69b0..fe3332e 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -23,6 +23,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "commands/createas.h" #include "foreign/fdwapi.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -1103,11 +1104,44 @@ set_append_rel_size(PlannerInfo *root, RelOptInfo *rel, if (root->glob->parallelModeOK && rel->consider_parallel) set_rel_consider_parallel(root, childrel, childRTE); + if(childrel->rtekind == RTE_SUBQUERY) + { + if(root->query_level != 1) + { + if (root->parent_root && + (root->parent_root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND) && + !(root->parse->rowMarks || + limit_needed(root->parse) || + root->parse->sortClause || + root->parse->distinctClause || + root->parse->hasWindowFuncs || + root->parse->groupClause || + root->parse->groupingSets || + root->parse->hasAggs || + root->hasHavingQual)) + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND; + } + else + { + if (!(root->parse->rowMarks || + limit_needed(root->parse) || + root->parse->sortClause || + root->parse->distinctClause || + root->parse->hasWindowFuncs || + root->parse->groupClause || + root->parse->groupingSets || + root->parse->hasAggs || + root->hasHavingQual)) + root->parse->CTASParallelInsInfo |= CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND; + } + } /* * Compute the child's size. */ set_rel_size(root, childrel, childRTindex, childRTE); + if(root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND) + root->parse->CTASParallelInsInfo &= ~CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND; /* * It is possible that constraint exclusion detected a contradiction * within a child subquery, even though we didn't prove one above. If diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index d287b6b..da5ce1f 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -7350,7 +7350,7 @@ can_partial_agg(PlannerInfo *root) static bool ignore_parallel_tuple_cost(PlannerInfo *root) { - if (root->query_level == 1 && + if ((root->query_level == 1 || (root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND)) && (root->parse->CTASParallelInsInfo & CTAS_PARALLEL_INS_SELECT)) { /* diff --git a/src/include/commands/createas.h b/src/include/commands/createas.h index e01a615..7a50b18 100644 --- a/src/include/commands/createas.h +++ b/src/include/commands/createas.h @@ -54,7 +54,8 @@ typedef enum CTASParallelInsertOpt */ CTAS_PARALLEL_INS_TUP_COST_CAN_IGN = 1 << 1, - CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2 + CTAS_PARALLEL_INS_TUP_COST_IGNORED = 1 << 2, + CTAS_PARALLEL_INS_IGN_TUP_COST_APPEND = 1 << 3 } CTASParallelInsertOpt; #define IS_CTAS(intoclause) (intoclause && IsA(intoclause, IntoClause)) -- 1.8.3.1