From f9af1af05ffdecea8d01aa0581c1180d69aecc31 Mon Sep 17 00:00:00 2001 From: houzj Date: Tue, 6 Jul 2021 11:23:45 +0800 Subject: [PATCH v14 3/4] get-parallel-safety-functions Provide a utility function "pg_get_table_parallel_dml_safety(regclass)" that returns records of (objid, classid, parallel_safety) for all parallel unsafe/restricted table-related objects from which the table's parallel DML safety is determined. The user can use this information during development in order to accurately declare a table's parallel DML safety. Or to identify any problematic objects if a parallel DML fails or behaves unexpectedly. When the use of an index-related parallel unsafe/restricted function is detected, both the function oid and the index oid are returned. Provide a utility function "pg_get_table_max_parallel_dml_hazard(regclass)" that returns the worst parallel DML safety hazard that can be found in the given relation. Users can use this function to do a quick check without caring about specific parallel-related objects. --- src/backend/optimizer/util/clauses.c | 655 ++++++++++++++++++++++++++- src/backend/utils/adt/misc.c | 94 ++++ src/backend/utils/cache/typcache.c | 17 + src/include/catalog/pg_proc.dat | 22 +- src/include/optimizer/clauses.h | 10 + src/include/utils/typcache.h | 2 + 6 files changed, 795 insertions(+), 5 deletions(-) diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index ac0f243bf1..960670bf82 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,15 +19,20 @@ #include "postgres.h" +#include "access/amapi.h" +#include "access/genam.h" #include "access/htup_details.h" #include "access/table.h" #include "access/xact.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" +#include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -46,6 +51,8 @@ #include "parser/parse_coerce.h" #include "parser/parse_func.h" #include "parser/parsetree.h" +#include "partitioning/partdesc.h" +#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -54,6 +61,7 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -92,6 +100,9 @@ typedef struct char max_hazard; /* worst proparallel hazard found so far */ char max_interesting; /* worst proparallel hazard of interest */ List *safe_param_ids; /* PARAM_EXEC Param IDs to treat as safe */ + bool check_all; /* whether collect all the unsafe/restricted objects */ + List *objects; /* parallel unsafe/restricted objects */ + PartitionDirectory partition_directory; /* partition descriptors */ } max_parallel_hazard_context; static bool contain_agg_clause_walker(Node *node, void *context); @@ -102,6 +113,24 @@ static bool contain_volatile_functions_walker(Node *node, void *context); static bool contain_volatile_functions_not_nextval_walker(Node *node, void *context); static bool max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context); +static bool target_rel_parallel_hazard_recurse(Relation relation, + max_parallel_hazard_context *context, + bool is_partition); +static bool target_rel_trigger_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static bool index_expr_parallel_hazard(Relation index_rel, + List *ii_Expressions, + List *ii_Predicate, + max_parallel_hazard_context *context); +static bool target_rel_index_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); +static bool target_rel_domain_parallel_hazard(Oid typid, + max_parallel_hazard_context *context); +static bool target_rel_partitions_parallel_hazard(Relation rel, + max_parallel_hazard_context *context, + bool is_partition); +static bool target_rel_chk_constr_parallel_hazard(Relation rel, + max_parallel_hazard_context *context); static bool contain_nonstrict_functions_walker(Node *node, void *context); static bool contain_exec_param_walker(Node *node, List *param_ids); static bool contain_context_dependent_node(Node *clause); @@ -156,6 +185,7 @@ static Query *substitute_actual_srf_parameters(Query *expr, static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); static bool max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context); +static safety_object *make_safety_object(Oid objid, Oid classid, char proparallel); /***************************************************************************** @@ -629,6 +659,9 @@ max_parallel_hazard(Query *parse) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_UNSAFE; context.safe_param_ids = NIL; + context.check_all = false; + context.objects = NIL; + context.partition_directory = NULL; max_hazard_found = max_parallel_hazard_walker((Node *) parse, &context); @@ -681,6 +714,9 @@ is_parallel_safe(PlannerInfo *root, Node *node) context.max_hazard = PROPARALLEL_SAFE; context.max_interesting = PROPARALLEL_RESTRICTED; context.safe_param_ids = NIL; + context.check_all = false; + context.objects = NIL; + context.partition_directory = NULL; /* * The params that refer to the same or parent query level are considered @@ -712,7 +748,7 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context) break; case PROPARALLEL_RESTRICTED: /* increase max_hazard to RESTRICTED */ - Assert(context->max_hazard != PROPARALLEL_UNSAFE); + Assert(context->check_all || context->max_hazard != PROPARALLEL_UNSAFE); context->max_hazard = proparallel; /* done if we are not expecting any unsafe functions */ if (context->max_interesting == proparallel) @@ -729,6 +765,82 @@ max_parallel_hazard_test(char proparallel, max_parallel_hazard_context *context) return false; } +/* + * make_safety_object + * + * Creates an safety_object given object id, class id and parallel safety. + */ +static safety_object * +make_safety_object(Oid objid, Oid classid, char proparallel) +{ + safety_object *object = (safety_object *) palloc(sizeof(safety_object)); + + object->objid = objid; + object->classid = classid; + object->proparallel = proparallel; + + return object; +} + +/* check_functions_in_node callback */ +static bool +parallel_hazard_checker(Oid func_id, void *context) +{ + char proparallel; + max_parallel_hazard_context *cont = (max_parallel_hazard_context *) context; + + proparallel = func_parallel(func_id); + + if (max_parallel_hazard_test(proparallel, cont) && !cont->check_all) + return true; + else if (proparallel != PROPARALLEL_SAFE) + { + safety_object *object = make_safety_object(func_id, + ProcedureRelationId, + proparallel); + cont->objects = lappend(cont->objects, object); + } + + return false; +} + +/* + * parallel_hazard_walker + * + * Recursively search an expression tree which is defined as partition key or + * index or constraint or column default expression for PARALLEL + * UNSAFE/RESTRICTED table-related objects. + * + * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED + * table-related objects. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) +{ + if (node == NULL) + return false; + + /* Check for hazardous functions in node itself */ + if (check_functions_in_node(node, parallel_hazard_checker, + context)) + return true; + + if (IsA(node, CoerceToDomain)) + { + CoerceToDomain *domain = (CoerceToDomain *) node; + + if (target_rel_domain_parallel_hazard(domain->resulttype, context)) + return true; + } + + /* Recurse to check arguments */ + return expression_tree_walker(node, + parallel_hazard_walker, + context); +} + /* check_functions_in_node callback */ static bool max_parallel_hazard_checker(Oid func_id, void *context) @@ -884,6 +996,547 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +/* + * target_rel_parallel_hazard + * + * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED + * table-related objects. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +List* +target_rel_parallel_hazard(Oid relOid, bool findall, + char max_interesting, char *max_hazard) +{ + max_parallel_hazard_context context; + Relation targetRel; + + context.check_all = findall; + context.objects = NIL; + context.max_hazard = PROPARALLEL_SAFE; + context.max_interesting = max_interesting; + context.safe_param_ids = NIL; + context.partition_directory = NULL; + + targetRel = table_open(relOid, AccessShareLock); + + (void) target_rel_parallel_hazard_recurse(targetRel, &context, false); + if (context.partition_directory) + DestroyPartitionDirectory(context.partition_directory); + + table_close(targetRel, AccessShareLock); + + *max_hazard = context.max_hazard; + + return context.objects; +} + +/* + * target_rel_parallel_hazard_recurse + * + * Recursively search all table-related objects for PARALLEL UNSAFE/RESTRICTED + * objects. + * + * If context->find_all is true, then detect all PARALLEL UNSAFE/RESTRICTED + * table-related objects. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_parallel_hazard_recurse(Relation rel, + max_parallel_hazard_context *context, + bool is_partition) +{ + TupleDesc tupdesc; + int attnum; + + /* + * We can't support table modification in a parallel worker if it's a + * foreign table/partition (no FDW API for supporting parallel access) or + * a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context) && + !context->check_all) + return true; + else + { + safety_object *object = make_safety_object(rel->rd_rel->oid, + RelationRelationId, + PROPARALLEL_RESTRICTED); + context->objects = lappend(context->objects, object); + } + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (target_rel_partitions_parallel_hazard(rel, context, is_partition)) + return true; + + /* + * If there are any index expressions or index predicate, check that they + * are parallel-mode safe. + */ + if (target_rel_index_parallel_hazard(rel, context)) + return true; + + /* + * If any triggers exist, check that they are parallel-safe. + */ + if (target_rel_trigger_parallel_hazard(rel, context)) + return true; + + /* + * Column default expressions are only applicable to INSERT and UPDATE. + * Note that even though column defaults may be specified separately for + * each partition in a partitioned table, a partition's default value is + * not applied when inserting a tuple through a partitioned table. + */ + + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atthasdef && !is_partition) + { + Node *defaultexpr; + + defaultexpr = build_column_default(rel, attnum + 1); + if (parallel_hazard_walker((Node *) defaultexpr, context)) + return true; + } + + /* + * If the column is of a DOMAIN type, determine whether that + * domain has any CHECK expressions that are not parallel-mode + * safe. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + if (target_rel_domain_parallel_hazard(att->atttypid, context)) + return true; + } + } + + /* + * CHECK constraints are only applicable to INSERT and UPDATE. If any + * CHECK constraints exist, determine if they are parallel-safe. + */ + if (target_rel_chk_constr_parallel_hazard(rel, context)) + return true; + + return false; +} + +/* + * target_rel_trigger_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for the specified relation's trigger data. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_trigger_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + int i; + char proparallel; + + if (rel->trigdesc == NULL) + return false; + + /* + * Care is needed here to avoid using the same relcache TriggerDesc field + * across other cache accesses, because relcache doesn't guarantee that it + * won't move. + */ + for (i = 0; i < rel->trigdesc->numtriggers; i++) + { + Oid tgfoid = rel->trigdesc->triggers[i].tgfoid; + Oid tgoid = rel->trigdesc->triggers[i].tgoid; + + proparallel = func_parallel(tgfoid); + + if (max_parallel_hazard_test(proparallel, context) && + !context->check_all) + return true; + else if (proparallel != PROPARALLEL_SAFE) + { + safety_object *object, + *parent_object; + + object = make_safety_object(tgfoid, ProcedureRelationId, + proparallel); + parent_object = make_safety_object(tgoid, TriggerRelationId, + proparallel); + + context->objects = lappend(context->objects, object); + context->objects = lappend(context->objects, parent_object); + } + } + + return false; +} + +/* + * index_expr_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for the input index expression and index predicate. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +index_expr_parallel_hazard(Relation index_rel, + List *ii_Expressions, + List *ii_Predicate, + max_parallel_hazard_context *context) +{ + int i; + Form_pg_index indexStruct; + ListCell *index_expr_item; + + indexStruct = index_rel->rd_index; + index_expr_item = list_head(ii_Expressions); + + /* Check parallel-safety of index expression */ + for (i = 0; i < indexStruct->indnatts; i++) + { + int keycol = indexStruct->indkey.values[i]; + + if (keycol == 0) + { + /* Found an index expression */ + Node *index_expr; + + Assert(index_expr_item != NULL); + if (index_expr_item == NULL) /* shouldn't happen */ + elog(ERROR, "too few entries in indexprs list"); + + index_expr = (Node *) lfirst(index_expr_item); + + if (parallel_hazard_walker(index_expr, context)) + return true; + + index_expr_item = lnext(ii_Expressions, index_expr_item); + } + } + + /* Check parallel-safety of index predicate */ + if (parallel_hazard_walker((Node *) ii_Predicate, context)) + return true; + + return false; +} + +/* + * target_rel_index_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for any existing index expressions or index predicate of a specified + * relation. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_index_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + List *index_oid_list; + ListCell *lc; + LOCKMODE lockmode = AccessShareLock; + bool max_hazard_found; + + index_oid_list = RelationGetIndexList(rel); + foreach(lc, index_oid_list) + { + Relation index_rel; + List *ii_Expressions; + List *ii_Predicate; + List *temp_objects; + char temp_hazard; + Oid index_oid = lfirst_oid(lc); + + temp_objects = context->objects; + context->objects = NIL; + temp_hazard = context->max_hazard; + context->max_hazard = PROPARALLEL_SAFE; + + index_rel = index_open(index_oid, lockmode); + + /* Check index expression */ + ii_Expressions = RelationGetIndexExpressions(index_rel); + ii_Predicate = RelationGetIndexPredicate(index_rel); + + max_hazard_found = index_expr_parallel_hazard(index_rel, + ii_Expressions, + ii_Predicate, + context); + + index_close(index_rel, lockmode); + + if (max_hazard_found) + return true; + + /* Add the index itself to the objects list */ + else if (context->objects != NIL) + { + safety_object *object; + + object = make_safety_object(index_oid, IndexRelationId, + context->max_hazard); + context->objects = lappend(context->objects, object); + } + + (void) max_parallel_hazard_test(temp_hazard, context); + + context->objects = list_concat(context->objects, temp_objects); + list_free(temp_objects); + } + + list_free(index_oid_list); + + return false; +} + +/* + * target_rel_domain_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for the specified DOMAIN type. Only any CHECK expressions are + * examined for parallel-safety. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_domain_parallel_hazard(Oid typid, + max_parallel_hazard_context *context) +{ + ListCell *lc; + List *domain_list; + List *temp_objects; + char temp_hazard; + + domain_list = GetDomainConstraints(typid); + + foreach(lc, domain_list) + { + DomainConstraintState *r = (DomainConstraintState *) lfirst(lc); + + temp_objects = context->objects; + context->objects = NIL; + temp_hazard = context->max_hazard; + context->max_hazard = PROPARALLEL_SAFE; + + if (parallel_hazard_walker((Node *) r->check_expr, context)) + return true; + + /* Add the constraint itself to the objects list */ + else if (context->objects != NIL) + { + safety_object *object; + Oid constr_oid = get_domain_constraint_oid(typid, + r->name, + false); + + object = make_safety_object(constr_oid, + ConstraintRelationId, + context->max_hazard); + context->objects = lappend(context->objects, object); + } + + (void) max_parallel_hazard_test(temp_hazard, context); + + context->objects = list_concat(context->objects, temp_objects); + list_free(temp_objects); + } + + return false; + +} + +/* + * target_rel_partitions_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for any partitions of a specified relation. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_partitions_parallel_hazard(Relation rel, + max_parallel_hazard_context *context, + bool is_partition) +{ + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs, + *qual; + + /* + * The partition check expression is composed of its parent table's + * partition key expression, we do not need to check it again for a + * partition because we already checked the parallel safety of its parent + * table's partition key expression. + */ + if (!is_partition) + { + qual = RelationGetPartitionQual(rel); + if (parallel_hazard_walker((Node *) qual, context)) + return true; + } + + if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) + return false; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + Oid funcOid = pkey->partsupfunc[i].fn_oid; + + if (OidIsValid(funcOid)) + { + char proparallel = func_parallel(funcOid); + + if (max_parallel_hazard_test(proparallel, context) && + !context->check_all) + return true; + + else if (proparallel != PROPARALLEL_SAFE) + { + safety_object *object; + + object = make_safety_object(funcOid, ProcedureRelationId, + proparallel); + context->objects = lappend(context->objects, object); + } + } + + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *check_expr = (Node *) lfirst(partexprs_item); + + if (parallel_hazard_walker(check_expr, context)) + return true; + + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + + /* Create the PartitionDirectory infrastructure if we didn't already */ + if (context->partition_directory == NULL) + context->partition_directory = + CreatePartitionDirectory(CurrentMemoryContext, false); + + pdesc = PartitionDirectoryLookup(context->partition_directory, rel); + + for (i = 0; i < pdesc->nparts; i++) + { + Relation part_rel; + bool max_hazard_found; + + part_rel = table_open(pdesc->oids[i], AccessShareLock); + max_hazard_found = target_rel_parallel_hazard_recurse(part_rel, + context, + true); + table_close(part_rel, AccessShareLock); + + if (max_hazard_found) + return true; + } + + return false; +} + +/* + * target_rel_chk_constr_parallel_hazard + * + * If context->find_all is true, then find all the PARALLEL UNSAFE/RESTRICTED + * objects for any CHECK expressions or CHECK constraints related to the + * specified relation. + * + * If context->find_all is false, then find the worst parallel-hazard level. + */ +static bool +target_rel_chk_constr_parallel_hazard(Relation rel, + max_parallel_hazard_context *context) +{ + char temp_hazard; + int i; + TupleDesc tupdesc; + List *temp_objects; + ConstrCheck *check; + + tupdesc = RelationGetDescr(rel); + + if (tupdesc->constr == NULL) + return false; + + check = tupdesc->constr->check; + + /* + * Determine if there are any CHECK constraints which are not + * parallel-safe. + */ + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check[i].ccbin); + + temp_objects = context->objects; + context->objects = NIL; + temp_hazard = context->max_hazard; + context->max_hazard = PROPARALLEL_SAFE; + + if (parallel_hazard_walker((Node *) check_expr, context)) + return true; + + /* Add the constraint itself to the objects list */ + if (context->objects != NIL) + { + Oid constr_oid; + safety_object *object; + + constr_oid = get_relation_constraint_oid(rel->rd_rel->oid, + check->ccname, + true); + + object = make_safety_object(constr_oid, + ConstraintRelationId, + context->max_hazard); + + context->objects = lappend(context->objects, object); + } + + (void) max_parallel_hazard_test(temp_hazard, context); + + context->objects = list_concat(context->objects, temp_objects); + list_free(temp_objects); + } + + return false; +} + /* * is_parallel_allowed_for_modify * diff --git a/src/backend/utils/adt/misc.c b/src/backend/utils/adt/misc.c index 88faf4dfd7..06d859c966 100644 --- a/src/backend/utils/adt/misc.c +++ b/src/backend/utils/adt/misc.c @@ -23,6 +23,8 @@ #include "access/sysattr.h" #include "access/table.h" #include "catalog/catalog.h" +#include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_tablespace.h" #include "catalog/pg_type.h" #include "catalog/system_fk_info.h" @@ -31,6 +33,7 @@ #include "common/keywords.h" #include "funcapi.h" #include "miscadmin.h" +#include "optimizer/clauses.h" #include "parser/scansup.h" #include "pgstat.h" #include "postmaster/syslogger.h" @@ -43,6 +46,7 @@ #include "utils/lsyscache.h" #include "utils/ruleutils.h" #include "utils/timestamp.h" +#include "utils/varlena.h" /* * Common subroutine for num_nulls() and num_nonnulls(). @@ -605,6 +609,96 @@ pg_collation_for(PG_FUNCTION_ARGS) PG_RETURN_TEXT_P(cstring_to_text(generate_collation_name(collid))); } +/* + * Find the worst parallel-hazard level in the given relation + * + * Returns the worst parallel hazard level (the earliest in this list: + * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE) that can + * be found in the given relation. + */ +Datum +pg_get_table_max_parallel_dml_hazard(PG_FUNCTION_ARGS) +{ + char max_parallel_hazard; + Oid relOid = PG_GETARG_OID(0); + + (void) target_rel_parallel_hazard(relOid, false, + PROPARALLEL_UNSAFE, + &max_parallel_hazard); + + PG_RETURN_CHAR(max_parallel_hazard); +} + +/* + * Determine whether the target relation is safe to execute parallel modification. + * + * Return all the PARALLEL RESTRICTED/UNSAFE objects. + */ +Datum +pg_get_table_parallel_dml_safety(PG_FUNCTION_ARGS) +{ +#define PG_GET_PARALLEL_SAFETY_COLS 3 + List *objects; + ListCell *object; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + ReturnSetInfo *rsinfo; + char max_parallel_hazard; + Oid relOid = PG_GETARG_OID(0); + + rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + objects = target_rel_parallel_hazard(relOid, true, + PROPARALLEL_UNSAFE, + &max_parallel_hazard); + foreach(object, objects) + { + Datum values[PG_GET_PARALLEL_SAFETY_COLS]; + bool nulls[PG_GET_PARALLEL_SAFETY_COLS]; + safety_object *sobject = (safety_object *) lfirst(object); + + memset(nulls, 0, sizeof(nulls)); + + values[0] = sobject->objid; + values[1] = sobject->classid; + values[2] = sobject->proparallel; + + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + /* * pg_relation_is_updatable - determine which update events the specified diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 326fae62e2..02a8f70b4f 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -2534,6 +2534,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2) return 0; } +/* + * GetDomainConstraints --- get DomainConstraintState list of specified domain type + */ +List * +GetDomainConstraints(Oid type_id) +{ + TypeCacheEntry *typentry; + List *constraints = NIL; + + typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); + + if(typentry->domainData != NULL) + constraints = typentry->domainData->constraints; + + return constraints; +} + /* * Load (or re-load) the enumData member of the typcache entry. */ diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 8cd0252082..4483cd1fc3 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -3770,6 +3770,20 @@ provolatile => 's', prorettype => 'regclass', proargtypes => 'regclass', prosrc => 'pg_get_replica_identity_index' }, +{ oid => '6122', + descr => 'parallel unsafe/restricted objects in the target relation', + proname => 'pg_get_table_parallel_dml_safety', prorows => '100', + proretset => 't', provolatile => 'v', proparallel => 'u', + prorettype => 'record', proargtypes => 'regclass', + proallargtypes => '{regclass,oid,oid,char}', + proargmodes => '{i,o,o,o}', + proargnames => '{table_name, objid, classid, proparallel}', + prosrc => 'pg_get_table_parallel_dml_safety' }, + +{ oid => '6123', descr => 'worst parallel-hazard level in the given relation for DML', + proname => 'pg_get_table_max_parallel_dml_hazard', prorettype => 'char', proargtypes => 'regclass', + prosrc => 'pg_get_table_max_parallel_dml_hazard', provolatile => 'v', proparallel => 'u' }, + # Deferrable unique constraint trigger { oid => '1250', descr => 'deferred UNIQUE constraint check', proname => 'unique_key_recheck', provolatile => 'v', prorettype => 'trigger', @@ -3777,11 +3791,11 @@ # Generic referential integrity constraint triggers { oid => '1644', descr => 'referential integrity FOREIGN KEY ... REFERENCES', - proname => 'RI_FKey_check_ins', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'RI_FKey_check_ins' }, + proname => 'RI_FKey_check_ins', provolatile => 'v', proparallel => 'r', + prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_ins' }, { oid => '1645', descr => 'referential integrity FOREIGN KEY ... REFERENCES', - proname => 'RI_FKey_check_upd', provolatile => 'v', prorettype => 'trigger', - proargtypes => '', prosrc => 'RI_FKey_check_upd' }, + proname => 'RI_FKey_check_upd', provolatile => 'v', proparallel => 'r', + prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_check_upd' }, { oid => '1646', descr => 'referential integrity ON DELETE CASCADE', proname => 'RI_FKey_cascade_del', provolatile => 'v', prorettype => 'trigger', proargtypes => '', prosrc => 'RI_FKey_cascade_del' }, diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 32b56565e5..67e8f5026a 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -23,6 +23,13 @@ typedef struct List **windowFuncs; /* lists of WindowFuncs for each winref */ } WindowFuncLists; +typedef struct safety_object +{ + Oid objid; + Oid classid; + char proparallel; +} safety_object; + extern bool contain_agg_clause(Node *clause); extern bool contain_window_function(Node *clause); @@ -54,5 +61,8 @@ extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); extern bool is_parallel_allowed_for_modify(Query *parse); +extern List *target_rel_parallel_hazard(Oid relOid, bool findall, + char max_interesting, + char *max_hazard); #endif /* CLAUSES_H */ diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 1d68a9a4b7..28ca7d8a6e 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern List *GetDomainConstraints(Oid type_id); + extern size_t SharedRecordTypmodRegistryEstimate(void); extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, -- 2.27.0