From 79e818f8041aeb57ae154e916b1844cb9fe427b9 Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Thu, 22 Oct 2020 14:58:56 +1100 Subject: [PATCH v3] Enable parallel SELECT for "INSERT INTO ... SELECT ...", where it is safe to do so. Parallel SELECT can't be utilized in the following cases: - INSERT statement uses ON CONFLICT ... DO UPDATE ... - Target table is a foreign or temporary table - Target table has a: - Parallel-unsafe trigger - Foreign key trigger (RI_TRIGGER_FK) - Parallel-unsafe index expression - Parallel-unsafe column default expression - Parallel-unsafe check constraint - Partitioned table or partition with any of the above parallel-unsafe features - Partitioned table with parallel-unsafe partition key expressions or support functions Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/transam/xact.c | 13 ++ src/backend/executor/execMain.c | 10 + src/backend/optimizer/plan/planner.c | 23 +- src/backend/optimizer/util/clauses.c | 416 +++++++++++++++++++++++++++++++++++ src/include/access/xact.h | 15 ++ src/include/optimizer/clauses.h | 1 + 6 files changed, 473 insertions(+), 5 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..7c37be8 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1015,6 +1015,19 @@ IsInParallelMode(void) } /* + * PrepareParallelModeForModify + * + * Prepare for entering parallel mode by assigning a FullTransactionId, to be + * included in the transaction state that is serialized in the parallel DSM. + */ +void PrepareParallelModeForModify(CmdType commandType) +{ + Assert(!IsInParallelMode()); + + (void)GetCurrentFullTransactionId(); +} + +/* * CommandCounterIncrement */ void diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index aea0479..1a2a675 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1526,7 +1526,17 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) + { + /* + * Supported table-modification commands may require additional steps + * prior to entering parallel mode, such as assigning a FullTransactionId. + */ + if (IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)) + { + PrepareParallelModeForModify(estate->es_plannedstmt->commandType); + } EnterParallelMode(); + } /* * Loop until we've processed the proper number of tuples from the plan. diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 986d7a5..7c8c3db 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -318,11 +318,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data using a CTE, or if this is a cursor operation, or if + * GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE + * (Note that we do allow CREATE TABLE AS, INSERT, SELECT INTO, and CREATE * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader * backend writes into a completely new table. In the future, we can * extend it to allow workers to write into the table. However, to allow @@ -336,7 +336,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(parse->commandType)) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) @@ -344,6 +345,18 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); + + /* + * Additional parallel-mode safety checks are required in order to + * allowing an underlying parallel query to be used for a + * supported table-modification command. + */ + if (glob->parallelModeOK && + IsModifySupportedInParallelMode(parse->commandType)) + { + glob->maxParallelHazard = MaxParallelHazardForModify(parse, &glob->maxParallelHazard); + glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); + } } else { diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index e7d8146..7a2b7dc 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,13 +19,19 @@ #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" +#include "access/table.h" +#include "catalog/index.h" +#include "catalog/indexing.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -42,7 +48,11 @@ #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" +#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -50,6 +60,8 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -157,6 +169,13 @@ static Query *substitute_actual_srf_parameters(Query *expr, static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); +static char MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc, + max_parallel_hazard_context *context); +static char MaxIndexExprsParallelHazardForModify(Relation rel, + max_parallel_hazard_context *context); +static char MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context); +static char MaxRelParallelHazardForModify(Oid relid, CmdType commandType, + max_parallel_hazard_context *context); /***************************************************************************** * Aggregate-function clause manipulation @@ -1073,6 +1092,403 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * conditions. + */ +static pg_attribute_always_inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} + +/* + * MaxTriggerDataParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for the specified trigger data. + */ +static char +MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc, + max_parallel_hazard_context *context) +{ + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype; + + if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context)) + break; + + /* + * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in + * the relation, and this would result in creation of new CommandIds + * on insert/update/delete and this isn't supported during + * parallel-mode. + */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_FK) + { + context->max_hazard = PROPARALLEL_UNSAFE; + break; + } + } + + return context->max_hazard; +} + +/* + * MaxIndexExprsParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for any existing index + * expressions of a specified relation. + */ +static char +MaxIndexExprsParallelHazardForModify(Relation rel, + max_parallel_hazard_context *context) +{ + List *indexOidList; + ListCell *lc; + LOCKMODE lockmode = AccessShareLock; + + indexOidList = RelationGetIndexList(rel); + foreach(lc, indexOidList) + { + Oid indexOid = lfirst_oid(lc); + Relation indexRel; + IndexInfo *indexInfo; + + if (ConditionalLockRelationOid(indexOid, lockmode)) + { + indexRel = index_open(indexOid, NoLock); + } + else + { + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + indexInfo = BuildIndexInfo(indexRel); + + if (indexInfo->ii_Expressions != NIL) + { + int i; + ListCell *indexExprItem = list_head(indexInfo->ii_Expressions); + + for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) + { + int keycol = indexInfo->ii_IndexAttrNumbers[i]; + if (keycol == 0) + { + /* Found an index expression */ + + Node *indexExpr; + + if (indexExprItem == NULL) /* shouldn't happen */ + elog(ERROR, "too few entries in indexprs list"); + + indexExpr = (Node *)lfirst(indexExprItem); + indexExpr = (Node *)expression_planner((Expr *)indexExpr); + + if (max_parallel_hazard_walker(indexExpr, context) == PROPARALLEL_UNSAFE) + { + index_close(indexRel, lockmode); + return context->max_hazard; + } + + indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem); + } + } + } + index_close(indexRel, lockmode); + } + + return context->max_hazard; +} + +/* + * MaxDomainParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for the specified DOMAIN type. + * Only any CHECK expressions are examined for parallel safety. + * DEFAULT values of DOMAIN-type columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan of the + * query tree in standard_planner(). + * + */ +static char +MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context) +{ + Relation conRel; + ScanKeyData key[1]; + SysScanDesc scan; + HeapTuple tup; + + LOCKMODE lockmode = AccessShareLock; + + conRel = table_open(ConstraintRelationId, lockmode); + + ScanKeyInit(&key[0], + Anum_pg_constraint_contypid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(typid)); + scan = systable_beginscan(conRel, ConstraintTypidIndexId, true, + NULL, 1, key); + + while (HeapTupleIsValid((tup = systable_getnext(scan)))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup); + + if (con->contype == CONSTRAINT_CHECK) + { + char *conbin; + Datum val; + bool isnull; + Expr *checkExpr; + + val = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_conbin, &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", con->oid); + conbin = TextDatumGetCString(val); + checkExpr = stringToNode(conbin); + if (max_parallel_hazard_walker((Node *)checkExpr, context)) + { + break; + } + } + } + + systable_endscan(scan); + table_close(conRel, lockmode); + return context->max_hazard; +} + +/* + * MaxRelParallelHazardForModify + * + * Determines the maximum parallel-mode hazard level for modification + * of a specified relation. + */ +static char +MaxRelParallelHazardForModify(Oid relid, + CmdType commandType, + max_parallel_hazard_context *context) +{ + Relation rel; + TupleDesc tupdesc; + int attnum; + + LOCKMODE lockmode = AccessShareLock; + + /* + * It's possible that this relation is locked for exclusive access + * in another concurrent transaction (e.g. as a result of a + * ALTER TABLE ... operation) until that transaction completes. + * If a share-lock can't be acquired on it now, we have to assume this + * could be the worst-case, so to avoid blocking here until that + * transaction completes, conditionally try to acquire the lock and + * assume and return UNSAFE on failure. + */ + if (ConditionalLockRelationOid(relid, lockmode)) + { + rel = table_open(relid, NoLock); + } + else + { + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + /* + * We can't support table modification in parallel-mode if it's a + * foreign table/partition (no FDW API for supporting parallel access) + * or a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + table_close(rel, lockmode); + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + /* Check parallel-safety of partition key support functions */ + if (OidIsValid(pkey->partsupfunc[i].fn_oid)) + { + if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *checkExpr = (Node *) lfirst(partexprs_item); + if (max_parallel_hazard_walker(checkExpr, context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + pdesc = RelationGetPartitionDesc(rel); + for (i = 0; i < pdesc->nparts; i++) + { + if (MaxRelParallelHazardForModify(pdesc->oids[i], commandType, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + + /* + * If there are any index expressions, check that they are parallel-mode + * safe. + */ + if (MaxIndexExprsParallelHazardForModify(rel, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + /* + * If any triggers exist, check that they are parallel safe. + */ + if (rel->trigdesc != NULL && + MaxTriggerDataParallelHazardForModify(rel->trigdesc, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + if (commandType == CMD_INSERT || commandType == CMD_UPDATE) + { + /* + * Column default expressions for columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan of the + * query tree in standard_planner(). + */ + + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + /* + * If the column is of a DOMAIN type, determine whether that domain + * has any CHECK expressions that are not parallel-mode safe. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + if (MaxDomainParallelHazardForModify(att->atttypid, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + } + + /* + * Check if there are any CHECK constraints which are not parallel-safe. + */ + if ((commandType == CMD_INSERT || commandType == CMD_UPDATE) && + tupdesc->constr != NULL && + tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *checkExpr = stringToNode(check->ccbin); + if (max_parallel_hazard_walker((Node *)checkExpr, context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + + table_close(rel, lockmode); + return context->max_hazard; +} + +/* + * MaxParallelHazardForModify + * + * Determines the worst parallel-mode hazard level for the specified + * table-modification statement, based on the statement attributes and + * target table. An initial max parallel hazard level may optionally be + * supplied. The search returns the earliest in the following list: + * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE + */ +char +MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard) +{ + RangeTblEntry *rte; + max_parallel_hazard_context context; + + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + */ + if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE) + return PROPARALLEL_UNSAFE; + + /* + * Setup the context used in finding the max parallel-mode hazard. + */ + Assert(initialMaxParallelHazard == NULL || + *initialMaxParallelHazard == PROPARALLEL_SAFE || + *initialMaxParallelHazard == PROPARALLEL_RESTRICTED); + context.max_hazard = initialMaxParallelHazard == NULL ? + PROPARALLEL_SAFE : *initialMaxParallelHazard; + context.max_interesting = PROPARALLEL_UNSAFE; + context.safe_param_ids = NIL; + + rte = rt_fetch(parse->resultRelation, parse->rtable); + return (MaxRelParallelHazardForModify(rte->relid, parse->commandType, &context)); +} /***************************************************************************** * Check clauses for nonstrict functions diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7320de3..a926fff 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -466,5 +466,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void PrepareParallelModeForModify(CmdType commandType); + +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * parallel-safety conditions. + */ +static inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} #endif /* XACT_H */ diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 7ef8cce..12662eb 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -55,5 +55,6 @@ extern void CommuteOpExpr(OpExpr *clause); extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); +extern char MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard); #endif /* CLAUSES_H */ -- 1.8.3.1