From e5cb1c34706ef21c8942087511726d9c06e1fd65 Mon Sep 17 00:00:00 2001 From: root Date: Tue, 15 Dec 2020 07:14:21 -0500 Subject: [PATCH 1/2] support pctas in append parallel inserts --- src/backend/commands/createas.c | 35 +++++++++++++++++++++++++++++++---- src/backend/commands/explain.c | 3 +-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index 3ffea41..6e6c467 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -351,9 +351,7 @@ ExecCreateTableAs(ParseState *pstate, CreateTableAsStmt *stmt, * into the target table. We need plan state to be initialized by the * executor to decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc, - &query->CTASParallelInsInfo)) - SetCTASParallelInsertState(queryDesc); + IsParallelInsertInCTASAllowed(into, queryDesc, &query->CTASParallelInsInfo); /* run the plan to completion */ ExecutorRun(queryDesc, ForwardScanDirection, 0L, true); @@ -663,6 +661,35 @@ intorel_destroy(DestReceiver *self) pfree(self); } +static bool +PushDownCTASParallelInsertState(DestReceiver *dest, PlanState *ps) +{ + bool parallel = false; + + if(ps == NULL) + return parallel; + + if(IsA(ps, AppendState)) + { + AppendState *aps = (AppendState *) ps; + for(int i = 0; i < aps->as_nplans; i++) + { + parallel |= PushDownCTASParallelInsertState(dest, aps->appendplans[i]); + } + } + else if(IsA(ps, GatherState) && !ps->ps_ProjInfo) + { + GatherState *gstate = (GatherState *) ps; + parallel = true; + + ((DR_intorel *) dest)->is_parallel = true; + gstate->dest = dest; + ps->plan->plan_rows = 0; + } + + return parallel; +} + /* * IsParallelInsertInCTASAllowed --- determine whether or not parallel * insertion is possible. @@ -698,7 +725,7 @@ bool IsParallelInsertInCTASAllowed(IntoClause *into, QueryDesc *queryDesc, * final phase i.e. merge the results by workers, so we do not allow * parallel inserts. */ - allow = ps && IsA(ps, GatherState) && !ps->ps_ProjInfo; + allow = PushDownCTASParallelInsertState(queryDesc->dest, ps); /* * It should not happen that in cost_gather we have ignored the diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index d0152de..136f6f4 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -570,8 +570,7 @@ ExplainOnePlan(PlannedStmt *plannedstmt, IntoClause *into, ExplainState *es, * target table. We need plan state to be initialized by the executor to * decide whether to allow parallel inserts or not. */ - if (IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags)) - SetCTASParallelInsertState(queryDesc); + IsParallelInsertInCTASAllowed(into, queryDesc, ctas_tuple_cost_flags); /* Execute the plan for statistics if asked for */ if (es->analyze) -- 1.8.3.1