commit 0896fe0115bd591bde415c8c6e18d711cc988808 Author: Robert Haas Date: Sat Feb 7 09:06:54 2015 -0500 Assess parallel safety and necessity and have executor switch on parallel mode when required. diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml index c099fcf..f368f00 100644 --- a/doc/src/sgml/spi.sgml +++ b/doc/src/sgml/spi.sgml @@ -1111,7 +1111,8 @@ SPIPlanPtr SPI_prepare_cursor(const char * command, int < CURSOR_OPT_NO_SCROLL, CURSOR_OPT_FAST_PLAN, CURSOR_OPT_GENERIC_PLAN, and - CURSOR_OPT_CUSTOM_PLAN. Note in particular that + CURSOR_OPT_CUSTOM_PLAN, and + CURSOR_OPT_NO_PARALLEL. Note in particular that CURSOR_OPT_HOLD is ignored. diff --git a/src/backend/catalog/pg_aggregate.c b/src/backend/catalog/pg_aggregate.c index b56cf28..5257e46 100644 --- a/src/backend/catalog/pg_aggregate.c +++ b/src/backend/catalog/pg_aggregate.c @@ -674,6 +674,7 @@ lookup_agg_function(List *fnName, { Oid fnOid; bool retset; + bool parallelsafe; int nvargs; Oid vatype; Oid *true_oid_array; @@ -690,7 +691,7 @@ lookup_agg_function(List *fnName, */ fdresult = func_get_detail(fnName, NIL, NIL, nargs, input_types, false, false, - &fnOid, rettype, &retset, + &fnOid, rettype, &retset, ¶llelsafe, &nvargs, &vatype, &true_oid_array, NULL); diff --git a/src/backend/commands/createas.c b/src/backend/commands/createas.c index c961429..48e3852 100644 --- a/src/backend/commands/createas.c +++ b/src/backend/commands/createas.c @@ -161,8 +161,8 @@ ExecCreateTableAs(CreateTableAsStmt *stmt, const char *queryString, query = (Query *) linitial(rewritten); Assert(query->commandType == CMD_SELECT); - /* plan the query */ - plan = pg_plan_query(query, 0, params); + /* plan the query - suppress parallelism, since we are writing data */ + plan = pg_plan_query(query, CURSOR_OPT_NO_PARALLEL, params); /* * Use a snapshot with an updated command ID to ensure this query sees diff --git a/src/backend/commands/prepare.c b/src/backend/commands/prepare.c index 71b08f0..4f1e550 100644 --- a/src/backend/commands/prepare.c +++ b/src/backend/commands/prepare.c @@ -230,8 +230,14 @@ ExecuteQuery(ExecuteStmt *stmt, IntoClause *intoClause, query_string = MemoryContextStrdup(PortalGetHeapMemory(portal), entry->plansource->query_string); - /* Replan if needed, and increment plan refcount for portal */ - cplan = GetCachedPlan(entry->plansource, paramLI, false); + /* + * Replan if needed, and increment plan refcount for portal. + * + * Because parallel queries can't write data, force a non-parallel plan + * if we've got CREATE TABLE .. AS EXECUTE. + */ + cplan = GetCachedPlan(entry->plansource, paramLI, false, + intoClause == NULL); plan_list = cplan->stmt_list; /* @@ -655,7 +661,7 @@ ExplainExecuteQuery(ExecuteStmt *execstmt, IntoClause *into, ExplainState *es, } /* Replan if needed, and acquire a transient refcount */ - cplan = GetCachedPlan(entry->plansource, paramLI, true); + cplan = GetCachedPlan(entry->plansource, paramLI, true, true); plan_list = cplan->stmt_list; diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 07526e8..e50f642 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -145,6 +145,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->estate == NULL); /* + * If the query requires parallel mode, and we're not already in parallel + * mode, then enter parallel mode now, and set a flag so we remember to + * exit parallel mode. + */ + if (queryDesc->plannedstmt->parallelModeNeeded && + !(eflags & EXEC_FLAG_EXPLAIN_ONLY) && + !IsInParallelMode()) + { + queryDesc->activateParallelMode = true; + EnterParallelMode(); + } + + /* * If the transaction is read-only, we need to check if any writes are * planned to non-temporary tables. EXPLAIN is considered read-only. * @@ -408,6 +421,9 @@ standard_ExecutorFinish(QueryDesc *queryDesc) MemoryContextSwitchTo(oldcontext); + if (queryDesc->activateParallelMode) + ExitParallelMode(); + estate->es_finished = true; } diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 3a93a04..f8a2352 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1259,7 +1259,7 @@ SPI_cursor_open_internal(const char *name, SPIPlanPtr plan, */ /* Replan if needed, and increment plan refcount for portal */ - cplan = GetCachedPlan(plansource, paramLI, false); + cplan = GetCachedPlan(plansource, paramLI, false, true); stmt_list = cplan->stmt_list; /* Pop the error context stack */ @@ -1681,7 +1681,7 @@ SPI_plan_get_cached_plan(SPIPlanPtr plan) error_context_stack = &spierrcontext; /* Get the generic plan for the query */ - cplan = GetCachedPlan(plansource, NULL, plan->saved); + cplan = GetCachedPlan(plansource, NULL, plan->saved, true); Assert(cplan == plansource->gplan); /* Pop the error context stack */ @@ -2078,7 +2078,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, * Replan if needed, and increment plan refcount. If it's a saved * plan, the refcount must be backed by the CurrentResourceOwner. */ - cplan = GetCachedPlan(plansource, paramLI, plan->saved); + cplan = GetCachedPlan(plansource, paramLI, plan->saved, true); stmt_list = cplan->stmt_list; /* diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index f1a24f5..05462df 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -93,6 +93,7 @@ _copyPlannedStmt(const PlannedStmt *from) COPY_NODE_FIELD(relationOids); COPY_NODE_FIELD(invalItems); COPY_SCALAR_FIELD(nParamExec); + COPY_SCALAR_FIELD(parallelModeNeeded); return newnode; } @@ -2513,6 +2514,7 @@ _copyQuery(const Query *from) COPY_SCALAR_FIELD(resultRelation); COPY_SCALAR_FIELD(hasAggs); COPY_SCALAR_FIELD(hasWindowFuncs); + COPY_SCALAR_FIELD(hasParallelUnsafe); COPY_SCALAR_FIELD(hasSubLinks); COPY_SCALAR_FIELD(hasDistinctOn); COPY_SCALAR_FIELD(hasRecursive); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 6e8b308..abe22b5 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -852,6 +852,7 @@ _equalQuery(const Query *a, const Query *b) COMPARE_SCALAR_FIELD(resultRelation); COMPARE_SCALAR_FIELD(hasAggs); COMPARE_SCALAR_FIELD(hasWindowFuncs); + COMPARE_SCALAR_FIELD(hasParallelUnsafe); COMPARE_SCALAR_FIELD(hasSubLinks); COMPARE_SCALAR_FIELD(hasDistinctOn); COMPARE_SCALAR_FIELD(hasRecursive); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index dd1278b..9cf51d8 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -255,6 +255,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(invalItems); WRITE_INT_FIELD(nParamExec); + WRITE_BOOL_FIELD(parallelModeNeeded); } /* @@ -1716,6 +1717,8 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) WRITE_UINT_FIELD(lastPHId); WRITE_UINT_FIELD(lastRowMarkId); WRITE_BOOL_FIELD(transientPlan); + WRITE_BOOL_FIELD(parallelModeOK); + WRITE_BOOL_FIELD(parallelModeNeeded); } static void @@ -2290,6 +2293,7 @@ _outQuery(StringInfo str, const Query *node) WRITE_INT_FIELD(resultRelation); WRITE_BOOL_FIELD(hasAggs); WRITE_BOOL_FIELD(hasWindowFuncs); + WRITE_BOOL_FIELD(hasParallelUnsafe); WRITE_BOOL_FIELD(hasSubLinks); WRITE_BOOL_FIELD(hasDistinctOn); WRITE_BOOL_FIELD(hasRecursive); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index ae24d05..8f4b30e 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -203,6 +203,7 @@ _readQuery(void) READ_INT_FIELD(resultRelation); READ_BOOL_FIELD(hasAggs); READ_BOOL_FIELD(hasWindowFuncs); + READ_BOOL_FIELD(hasParallelUnsafe); READ_BOOL_FIELD(hasSubLinks); READ_BOOL_FIELD(hasDistinctOn); READ_BOOL_FIELD(hasRecursive); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 9cbbcfb..3a8f931 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -178,6 +178,12 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) glob->lastRowMarkId = 0; glob->transientPlan = false; glob->hasRowSecurity = false; + glob->parallelModeOK = IsUnderPostmaster && !parse->hasParallelUnsafe && + !(cursorOptions & CURSOR_OPT_NO_PARALLEL); + glob->parallelModeNeeded = false; +#ifdef FORCE_PARALLEL_MODE + glob->parallelModeNeeded = glob->parallelModeOK; +#endif /* Determine what fraction of the plan is likely to be scanned */ if (cursorOptions & CURSOR_OPT_FAST_PLAN) @@ -256,6 +262,7 @@ standard_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) result->invalItems = glob->invalItems; result->nParamExec = glob->nParamExec; result->hasRowSecurity = glob->hasRowSecurity; + result->parallelModeNeeded = glob->parallelModeNeeded; return result; } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index a68f2e8..82f8c03 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -157,6 +157,9 @@ parse_sub_analyze(Node *parseTree, ParseState *parentParseState, query = transformStmt(pstate, parseTree); + if (pstate->p_hasParallelUnsafe) + parentParseState->p_hasParallelUnsafe = true; + free_parsestate(pstate); return query; @@ -343,6 +346,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->commandType = CMD_DELETE; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -391,6 +396,7 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasSubLinks = pstate->p_hasSubLinks; qry->hasWindowFuncs = pstate->p_hasWindowFuncs; qry->hasAggs = pstate->p_hasAggs; + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; if (pstate->p_hasAggs) parseCheckAggregates(pstate, qry); @@ -426,6 +432,8 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->commandType = CMD_INSERT; pstate->p_is_insert = true; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -523,6 +531,14 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) selectQuery = transformStmt(sub_pstate, stmt->selectStmt); + /* + * Right now this is technically unnecessary because we prohibit + * INSERT in parallel mode categorically. But if that restriction + * is ever lifted then we'll need this. + */ + if (sub_pstate->p_hasParallelUnsafe) + pstate->p_hasParallelUnsafe = true; + free_parsestate(sub_pstate); /* The grammar should have produced a SELECT */ @@ -760,6 +776,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; assign_query_collations(pstate, qry); @@ -1019,6 +1036,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + assign_query_collations(pstate, qry); return qry; @@ -1479,6 +1498,8 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + assign_query_collations(pstate, qry); return qry; @@ -1907,6 +1928,8 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->commandType = CMD_UPDATE; pstate->p_is_update = true; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* process the WITH clause independently of all else */ if (stmt->withClause) { @@ -2012,6 +2035,8 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) assign_query_collations(pstate, qry); + qry->hasParallelUnsafe = pstate->p_hasParallelUnsafe; + return qry; } @@ -2087,6 +2112,13 @@ transformDeclareCursorStmt(ParseState *pstate, DeclareCursorStmt *stmt) (errcode(ERRCODE_INVALID_CURSOR_DEFINITION), errmsg("cannot specify both SCROLL and NO SCROLL"))); + /* + * We don't allow parallelism for prepared queries, because we can't + * support FETCH BACKWARD and similar constructs. Possibly we could + * weaken this to allow some parallel constructs but not others. + */ + pstate->p_hasParallelUnsafe = true; + result = transformStmt(pstate, stmt->query); /* Grammar should not have allowed anything but SELECT */ @@ -2187,6 +2219,8 @@ transformCreateTableAsStmt(ParseState *pstate, CreateTableAsStmt *stmt) Query *result; Query *query; + pstate->p_hasParallelUnsafe = true; /* writes are not parallel-safe */ + /* transform contained query */ query = transformStmt(pstate, stmt->query); stmt->query = (Node *) query; @@ -2348,6 +2382,8 @@ transformLockingClause(ParseState *pstate, Query *qry, LockingClause *lc, Index i; LockingClause *allrels; + pstate->p_hasParallelUnsafe = true; /* tuple locks are not parallel-safe */ + CheckSelectLocking(qry, lc->strength); /* make a clause we can pass down to subqueries to select all rels */ diff --git a/src/backend/parser/parse_func.c b/src/backend/parser/parse_func.c index a200804..00318bb 100644 --- a/src/backend/parser/parse_func.c +++ b/src/backend/parser/parse_func.c @@ -89,6 +89,7 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, List *argdefaults; Node *retval; bool retset; + bool parallelsafe; int nvargs; Oid vatype; FuncDetailCode fdresult; @@ -238,9 +239,11 @@ ParseFuncOrColumn(ParseState *pstate, List *funcname, List *fargs, fdresult = func_get_detail(funcname, fargs, argnames, nargs, actual_arg_types, !func_variadic, true, - &funcid, &rettype, &retset, + &funcid, &rettype, &retset, ¶llelsafe, &nvargs, &vatype, &declared_arg_types, &argdefaults); + if (!parallelsafe) + pstate->p_hasParallelUnsafe = true; if (fdresult == FUNCDETAIL_COERCION) { /* @@ -1256,6 +1259,7 @@ func_get_detail(List *funcname, Oid *funcid, /* return value */ Oid *rettype, /* return value */ bool *retset, /* return value */ + bool *parallelsafe, /* return value */ int *nvargs, /* return value */ Oid *vatype, /* return value */ Oid **true_typeids, /* return value */ @@ -1268,6 +1272,7 @@ func_get_detail(List *funcname, *funcid = InvalidOid; *rettype = InvalidOid; *retset = false; + *parallelsafe = true; *nvargs = 0; *vatype = InvalidOid; *true_typeids = NULL; @@ -1480,6 +1485,14 @@ func_get_detail(List *funcname, *rettype = pform->prorettype; *retset = pform->proretset; *vatype = pform->provariadic; + + /* + * for now, we judge that anything immutable is probably safe for + * paralellism. possibly stable functions should be included also, + * or possibly we should have a dedicated flag just for this purpose. + */ + *parallelsafe = (pform->provolatile == PROVOLATILE_IMMUTABLE); + /* fetch default args if caller wants 'em */ if (argdefaults && best_candidate->ndargs > 0) { diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 8899448..f43f8d7 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -1772,7 +1772,7 @@ exec_bind_message(StringInfo input_message) * will be generated in MessageContext. The plan refcount will be * assigned to the Portal, so it will be released at portal destruction. */ - cplan = GetCachedPlan(psrc, params, false); + cplan = GetCachedPlan(psrc, params, false, true); /* * Now we can define the portal. diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 9c14e8a..8df200b 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,7 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->estate = NULL; qd->planstate = NULL; qd->totaltime = NULL; + qd->activateParallelMode = false; return qd; } diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index c1d860c..73a9a93 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -9128,6 +9128,7 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, Oid p_funcid; Oid p_rettype; bool p_retset; + bool p_parallelsafe; int p_nvargs; Oid p_vatype; Oid *p_true_typeids; @@ -9175,7 +9176,8 @@ generate_function_name(Oid funcid, int nargs, List *argnames, Oid *argtypes, NIL, argnames, nargs, argtypes, !use_variadic, true, &p_funcid, &p_rettype, - &p_retset, &p_nvargs, &p_vatype, + &p_retset, &p_parallelsafe, + &p_nvargs, &p_vatype, &p_true_typeids, NULL); if ((p_result == FUNCDETAIL_NORMAL || p_result == FUNCDETAIL_AGGREGATE || diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 9a26a4e..40a1b4d 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -91,7 +91,7 @@ static void ReleaseGenericPlan(CachedPlanSource *plansource); static List *RevalidateCachedQuery(CachedPlanSource *plansource); static bool CheckCachedPlan(CachedPlanSource *plansource); static CachedPlan *BuildCachedPlan(CachedPlanSource *plansource, List *qlist, - ParamListInfo boundParams); + ParamListInfo boundParams, bool parallel_ok); static bool choose_custom_plan(CachedPlanSource *plansource, ParamListInfo boundParams); static double cached_plan_cost(CachedPlan *plan, bool include_planner); @@ -886,12 +886,13 @@ CheckCachedPlan(CachedPlanSource *plansource) */ static CachedPlan * BuildCachedPlan(CachedPlanSource *plansource, List *qlist, - ParamListInfo boundParams) + ParamListInfo boundParams, bool parallel_ok) { CachedPlan *plan; List *plist; bool snapshot_set; bool spi_pushed; + int cursor_options; MemoryContext plan_context; MemoryContext oldcxt = CurrentMemoryContext; @@ -945,10 +946,15 @@ BuildCachedPlan(CachedPlanSource *plansource, List *qlist, */ spi_pushed = SPI_push_conditional(); + /* Adjust cursor options if parallelism is not OK here. */ + cursor_options = plansource->cursor_options; + if (!parallel_ok) + cursor_options |= CURSOR_OPT_NO_PARALLEL; + /* * Generate the plan. */ - plist = pg_plan_queries(qlist, plansource->cursor_options, boundParams); + plist = pg_plan_queries(qlist, cursor_options, boundParams); /* Clean up SPI state */ SPI_pop_conditional(spi_pushed); @@ -1132,7 +1138,7 @@ cached_plan_cost(CachedPlan *plan, bool include_planner) */ CachedPlan * GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, - bool useResOwner) + bool useResOwner, bool parallel_ok) { CachedPlan *plan; List *qlist; @@ -1148,8 +1154,14 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, /* Make sure the querytree list is valid and we have parse-time locks */ qlist = RevalidateCachedQuery(plansource); - /* Decide whether to use a custom plan */ - customplan = choose_custom_plan(plansource, boundParams); + /* + * Decide whether to use a custom plan. + * + * We expect the !parallel_ok case to be uncommon, so to make things + * simple we just always use a custop plan in that case. Otherwise, + * we make a policy decision. + */ + customplan = !parallel_ok || choose_custom_plan(plansource, boundParams); if (!customplan) { @@ -1162,7 +1174,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, else { /* Build a new generic plan */ - plan = BuildCachedPlan(plansource, qlist, NULL); + plan = BuildCachedPlan(plansource, qlist, NULL, true); /* Just make real sure plansource->gplan is clear */ ReleaseGenericPlan(plansource); /* Link the new generic plan into the plansource */ @@ -1207,7 +1219,7 @@ GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, if (customplan) { /* Build a custom plan */ - plan = BuildCachedPlan(plansource, qlist, boundParams); + plan = BuildCachedPlan(plansource, qlist, boundParams, parallel_ok); /* Accumulate total costs of custom plans, but 'ware overflow */ if (plansource->num_custom_plans < INT_MAX) { diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index a2381cd..f0a0505 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -47,6 +47,7 @@ typedef struct QueryDesc TupleDesc tupDesc; /* descriptor for result tuples */ EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + bool activateParallelMode; /* should we activate parallel mode? */ /* This is always set NULL by the core system, but plugins can change it */ struct Instrumentation *totaltime; /* total time spent in ExecutorRun */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index b1dfa85..ab176b5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -116,6 +116,7 @@ typedef struct Query bool hasAggs; /* has aggregates in tlist or havingQual */ bool hasWindowFuncs; /* has window functions in tlist */ + bool hasParallelUnsafe; /* has parallel-unsafe stuff */ bool hasSubLinks; /* has subquery SubLink */ bool hasDistinctOn; /* distinctClause is from DISTINCT ON */ bool hasRecursive; /* WITH RECURSIVE was specified */ @@ -2182,6 +2183,7 @@ typedef struct SecLabelStmt #define CURSOR_OPT_FAST_PLAN 0x0020 /* prefer fast-start plan */ #define CURSOR_OPT_GENERIC_PLAN 0x0040 /* force use of generic plan */ #define CURSOR_OPT_CUSTOM_PLAN 0x0080 /* force use of custom plan */ +#define CURSOR_OPT_NO_PARALLEL 0x0100 /* suppress use of parallel plan */ typedef struct DeclareCursorStmt { diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 316c9ce..1d6ebab 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -72,6 +72,8 @@ typedef struct PlannedStmt bool hasRowSecurity; /* row security applied? */ + bool parallelModeNeeded; /* run in parallel mode? */ + } PlannedStmt; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 6845a40..57920cd 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -103,6 +103,10 @@ typedef struct PlannerGlobal bool hasRowSecurity; /* row security applied? */ + bool parallelModeOK; /* are we allowed to use parallel mode? */ + + bool parallelModeNeeded; /* do we actually need parallel mode? */ + } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ diff --git a/src/include/parser/parse_func.h b/src/include/parser/parse_func.h index 3264691..db8384f 100644 --- a/src/include/parser/parse_func.h +++ b/src/include/parser/parse_func.h @@ -37,7 +37,7 @@ extern FuncDetailCode func_get_detail(List *funcname, List *fargs, List *fargnames, int nargs, Oid *argtypes, bool expand_variadic, bool expand_defaults, - Oid *funcid, Oid *rettype, + Oid *funcid, Oid *rettype, bool *parallelsafe, bool *retset, int *nvargs, Oid *vatype, Oid **true_typeids, List **argdefaults); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 3103b71..f86640f 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -149,6 +149,7 @@ struct ParseState Node *p_value_substitute; /* what to replace VALUE with, if any */ bool p_hasAggs; bool p_hasWindowFuncs; + bool p_hasParallelUnsafe; bool p_hasSubLinks; bool p_hasModifyingCTE; bool p_is_insert; diff --git a/src/include/utils/plancache.h b/src/include/utils/plancache.h index ef206c4..65c7976 100644 --- a/src/include/utils/plancache.h +++ b/src/include/utils/plancache.h @@ -173,7 +173,7 @@ extern List *CachedPlanGetTargetList(CachedPlanSource *plansource); extern CachedPlan *GetCachedPlan(CachedPlanSource *plansource, ParamListInfo boundParams, - bool useResOwner); + bool useResOwner, bool parallel_ok); extern void ReleaseCachedPlan(CachedPlan *plan, bool useResOwner); #endif /* PLANCACHE_H */ diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c index 492c1ef..1b1d8eb 100644 --- a/src/pl/plperl/plperl.c +++ b/src/pl/plperl/plperl.c @@ -3191,7 +3191,7 @@ plperl_spi_query(char *query) pg_verifymbstr(query, strlen(query), false); /* Create a cursor for the query */ - plan = SPI_prepare(query, 0, NULL); + plan = SPI_prepare_cursor(query, 0, NULL, CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare() failed:%s", SPI_result_code_string(SPI_result)); @@ -3429,7 +3429,8 @@ plperl_spi_prepare(char *query, int argc, SV **argv) /************************************************************ * Prepare the plan and check for errors ************************************************************/ - plan = SPI_prepare(query, argc, qdesc->argtypes); + plan = SPI_prepare_cursor(query, argc, qdesc->argtypes, + CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare() failed:%s", diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c index ae5421f..91287da 100644 --- a/src/pl/plpgsql/src/pl_exec.c +++ b/src/pl/plpgsql/src/pl_exec.c @@ -185,8 +185,8 @@ static Datum exec_eval_expr(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, bool *isNull, Oid *rettype); -static int exec_run_select(PLpgSQL_execstate *estate, - PLpgSQL_expr *expr, long maxtuples, Portal *portalP); +static int exec_run_select(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, + long maxtuples, int cursorOptions, Portal *portalP); static int exec_for_query(PLpgSQL_execstate *estate, PLpgSQL_stmt_forq *stmt, Portal portal, bool prefetch_ok); static ParamListInfo setup_param_list(PLpgSQL_execstate *estate, @@ -1530,7 +1530,7 @@ exec_stmt_perform(PLpgSQL_execstate *estate, PLpgSQL_stmt_perform *stmt) { PLpgSQL_expr *expr = stmt->expr; - (void) exec_run_select(estate, expr, 0, NULL); + (void) exec_run_select(estate, expr, 0, 0, NULL); exec_set_found(estate, (estate->eval_processed != 0)); exec_eval_cleanup(estate); @@ -2076,7 +2076,7 @@ exec_stmt_fors(PLpgSQL_execstate *estate, PLpgSQL_stmt_fors *stmt) /* * Open the implicit cursor for the statement using exec_run_select */ - exec_run_select(estate, stmt->query, 0, &portal); + exec_run_select(estate, stmt->query, 0, CURSOR_OPT_NO_PARALLEL, &portal); /* * Execute the loop @@ -2167,7 +2167,8 @@ exec_stmt_forc(PLpgSQL_execstate *estate, PLpgSQL_stmt_forc *stmt) Assert(query); if (query->plan == NULL) - exec_prepare_plan(estate, query, curvar->cursor_options); + exec_prepare_plan(estate, query, + curvar->cursor_options | CURSOR_OPT_NO_PARALLEL); /* * Set up ParamListInfo (hook function and possibly data values) @@ -2776,7 +2777,7 @@ exec_stmt_return_query(PLpgSQL_execstate *estate, if (stmt->query != NULL) { /* static query */ - exec_run_select(estate, stmt->query, 0, &portal); + exec_run_select(estate, stmt->query, 0, 0, &portal); } else { @@ -3680,7 +3681,7 @@ exec_stmt_dynfors(PLpgSQL_execstate *estate, PLpgSQL_stmt_dynfors *stmt) int rc; portal = exec_dynquery_with_params(estate, stmt->query, stmt->params, - NULL, 0); + NULL, CURSOR_OPT_NO_PARALLEL); /* * Execute the loop @@ -3739,7 +3740,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) */ query = stmt->query; if (query->plan == NULL) - exec_prepare_plan(estate, query, stmt->cursor_options); + exec_prepare_plan(estate, query, + stmt->cursor_options | CURSOR_OPT_NO_PARALLEL); } else if (stmt->dynquery != NULL) { @@ -3751,7 +3753,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) stmt->dynquery, stmt->params, curname, - stmt->cursor_options); + stmt->cursor_options | + CURSOR_OPT_NO_PARALLEL); /* * If cursor variable was NULL, store the generated portal name in it @@ -3807,7 +3810,8 @@ exec_stmt_open(PLpgSQL_execstate *estate, PLpgSQL_stmt_open *stmt) query = curvar->cursor_explicit_expr; if (query->plan == NULL) - exec_prepare_plan(estate, query, curvar->cursor_options); + exec_prepare_plan(estate, query, curvar->cursor_options + | CURSOR_OPT_NO_PARALLEL); } /* @@ -4793,7 +4797,7 @@ exec_eval_expr(PLpgSQL_execstate *estate, /* * Else do it the hard way via exec_run_select */ - rc = exec_run_select(estate, expr, 2, NULL); + rc = exec_run_select(estate, expr, 2, 0, NULL); if (rc != SPI_OK_SELECT) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), @@ -4847,8 +4851,8 @@ exec_eval_expr(PLpgSQL_execstate *estate, * ---------- */ static int -exec_run_select(PLpgSQL_execstate *estate, - PLpgSQL_expr *expr, long maxtuples, Portal *portalP) +exec_run_select(PLpgSQL_execstate *estate, PLpgSQL_expr *expr, + long maxtuples, int cursorOptions, Portal *portalP) { ParamListInfo paramLI; int rc; @@ -4857,7 +4861,7 @@ exec_run_select(PLpgSQL_execstate *estate, * On the first call for this expression generate the plan */ if (expr->plan == NULL) - exec_prepare_plan(estate, expr, 0); + exec_prepare_plan(estate, expr, cursorOptions); /* * Set up ParamListInfo (hook function and possibly data values) diff --git a/src/pl/plpython/plpy_cursorobject.c b/src/pl/plpython/plpy_cursorobject.c index 2c458d3..6a20da0 100644 --- a/src/pl/plpython/plpy_cursorobject.c +++ b/src/pl/plpython/plpy_cursorobject.c @@ -126,7 +126,7 @@ PLy_cursor_query(const char *query) pg_verifymbstr(query, strlen(query), false); - plan = SPI_prepare(query, 0, NULL); + plan = SPI_prepare_cursor(query, 0, NULL, CURSOR_OPT_NO_PARALLEL); if (plan == NULL) elog(ERROR, "SPI_prepare failed: %s", SPI_result_code_string(SPI_result)); diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 465b316..837edd7 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -133,7 +133,8 @@ PLy_spi_prepare(PyObject *self, PyObject *args) } pg_verifymbstr(query, strlen(query), false); - plan->plan = SPI_prepare(query, plan->nargs, plan->types); + plan->plan = SPI_prepare_cursor(query, plan->nargs, plan->types, + CURSOR_OPT_NO_PARALLEL); if (plan->plan == NULL) elog(ERROR, "SPI_prepare failed: %s", SPI_result_code_string(SPI_result)); diff --git a/src/pl/tcl/pltcl.c b/src/pl/tcl/pltcl.c index d6b72f7..739878e 100644 --- a/src/pl/tcl/pltcl.c +++ b/src/pl/tcl/pltcl.c @@ -2174,7 +2174,8 @@ pltcl_SPI_prepare(ClientData cdata, Tcl_Interp *interp, * Prepare the plan and check for errors ************************************************************/ UTF_BEGIN; - qdesc->plan = SPI_prepare(UTF_U2E(argv[1]), nargs, qdesc->argtypes); + qdesc->plan = SPI_prepare_cursor(UTF_U2E(argv[1]), nargs, + qdesc->argtypes, CURSOR_OPT_NO_PARALLEL); UTF_END; if (qdesc->plan == NULL)