From 9b201036cfb18e1b729755ddc20e55b8b0f2686f Mon Sep 17 00:00:00 2001 From: Greg Nancarrow Date: Tue, 27 Oct 2020 14:07:13 +1100 Subject: [PATCH v5] Enable parallel INSERT and/or SELECT for "INSERT INTO...SELECT...", where it is safe to do so. Parallel INSERT can't be utilized in the following cases: - INSERT statement uses ON CONFLICT ... DO UPDATE ... - Target table is a foreign or temporary table - Target table has a: - Parallel-unsafe trigger - Foreign key trigger (RI_TRIGGER_FK) - Parallel-unsafe index expression - Parallel-unsafe column default expression - Parallel-unsafe check constraint - Partitioned table or partition with any of the above parallel-unsafe features - Partitioned table with parallel-unsafe partition key expressions or support functions Where the above-mentioned target table features are parallel-restricted, rather than parallel-unsafe, at least parallel SELECT may be utilized. Discussion: https://postgr.es/m/CAJcOf-cXnB5cnMKqWEp2E2z7Mvcd04iLVmV=qpFJrR3AcrTS3g@mail.gmail.com --- src/backend/access/heap/heapam.c | 2 + src/backend/access/transam/xact.c | 44 +++- src/backend/executor/execMain.c | 15 +- src/backend/executor/execParallel.c | 59 ++++- src/backend/executor/nodeGather.c | 63 ++++- src/backend/executor/nodeGatherMerge.c | 11 +- src/backend/executor/nodeModifyTable.c | 44 +++- src/backend/optimizer/path/costsize.c | 46 ++++ src/backend/optimizer/plan/createplan.c | 2 +- src/backend/optimizer/plan/planner.c | 144 +++++++++++- src/backend/optimizer/plan/setrefs.c | 12 +- src/backend/optimizer/util/clauses.c | 405 ++++++++++++++++++++++++++++++++ src/backend/optimizer/util/pathnode.c | 55 ++--- src/include/access/xact.h | 16 ++ src/include/executor/execParallel.h | 1 + src/include/executor/nodeModifyTable.h | 3 +- src/include/nodes/execnodes.h | 4 +- src/include/optimizer/clauses.h | 1 + src/include/optimizer/cost.h | 1 + src/include/optimizer/pathnode.h | 3 +- 20 files changed, 857 insertions(+), 74 deletions(-) diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 1585861..929e2a1 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2049,10 +2049,12 @@ heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, * inserts in general except for the cases where inserts generate a new * CommandId (eg. inserts into a table having a foreign key column). */ +/* if (IsParallelWorker()) ereport(ERROR, (errcode(ERRCODE_INVALID_TRANSACTION_STATE), errmsg("cannot insert tuples in a parallel worker"))); +*/ tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index af6afce..7a6426a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -517,6 +517,20 @@ GetCurrentFullTransactionIdIfAny(void) } /* + * SetCurrentCommandIdUsedForWorker + * + * For a parallel worker, record that the currentCommandId has been used. + * This must only be called at the start of a parallel operation. + */ +void +SetCurrentCommandIdUsedForWorker(void) +{ + Assert(IsParallelWorker() && !currentCommandIdUsed && currentCommandId != InvalidCommandId); + + currentCommandIdUsed = true; +} + +/* * MarkCurrentTransactionIdLoggedIfAny * * Remember that the current xid - if it is assigned - now has been wal logged. @@ -764,12 +778,16 @@ GetCurrentCommandId(bool used) if (used) { /* - * Forbid setting currentCommandIdUsed in a parallel worker, because - * we have no provision for communicating this back to the leader. We - * could relax this restriction when currentCommandIdUsed was already - * true at the start of the parallel operation. + * If in a parallel worker, only allow setting currentCommandIdUsed + * if currentCommandIdUsed was already true at the start of the + * parallel operation (by way of SetCurrentCommandIdUsed()), otherwise + * forbid setting currentCommandIdUsed because we have no provision + * for communicating this back to the leader. Once currentCommandIdUsed + * is set, the commandId used by leader and workers can't be changed, + * because CommandCounterIncrement() then prevents any attempted + * increment of the current commandId. */ - Assert(!IsParallelWorker()); + Assert(!(IsParallelWorker() && !currentCommandIdUsed)); currentCommandIdUsed = true; } return currentCommandId; @@ -1015,6 +1033,22 @@ IsInParallelMode(void) } /* + * PrepareParallelModeForModify + * + * Prepare for entering parallel mode by assigning a FullTransactionId, to be + * included in the transaction state that is serialized in the parallel DSM. + */ +void PrepareParallelModeForModify(CmdType commandType, bool isParallelModifyLeader) +{ + Assert(!IsInParallelMode()); + + if (isParallelModifyLeader) + (void)GetCurrentCommandId(true); + + (void)GetCurrentFullTransactionId(); +} + +/* * CommandCounterIncrement */ void diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 7179f58..0969a66 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -790,7 +790,8 @@ ExecCheckXactReadOnly(PlannedStmt *plannedstmt) PreventCommandIfReadOnly(CreateCommandName((Node *) plannedstmt)); } - if (plannedstmt->commandType != CMD_SELECT || plannedstmt->hasModifyingCTE) + if ((plannedstmt->commandType != CMD_SELECT && + !IsModifySupportedInParallelMode(plannedstmt->commandType)) || plannedstmt->hasModifyingCTE) PreventCommandIfParallelMode(CreateCommandName((Node *) plannedstmt)); } @@ -1526,7 +1527,19 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) + { + bool isParallelModifyLeader = IsA(planstate, GatherState) && IsA(outerPlanState(planstate), ModifyTableState); + + /* + * Supported table-modification commands may require additional steps + * prior to entering parallel mode, such as assigning a FullTransactionId. + */ + if (IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)) + { + PrepareParallelModeForModify(estate->es_plannedstmt->commandType, isParallelModifyLeader); + } EnterParallelMode(); + } /* * Loop until we've processed the proper number of tuples from the plan. diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index befde52..39f60af 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -23,6 +23,7 @@ #include "postgres.h" +#include "access/xact.h" #include "executor/execParallel.h" #include "executor/executor.h" #include "executor/nodeAgg.h" @@ -65,6 +66,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_PROCESSED_COUNT UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -173,9 +175,11 @@ ExecSerializePlan(Plan *plan, EState *estate) * PlannedStmt to start the executor. */ pstmt = makeNode(PlannedStmt); - pstmt->commandType = CMD_SELECT; + Assert(estate->es_plannedstmt->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)); + pstmt->commandType = IsA(plan, ModifyTable) ? castNode(ModifyTable, plan)->operation : CMD_SELECT; pstmt->queryId = UINT64CONST(0); - pstmt->hasReturning = false; + pstmt->hasReturning = estate->es_plannedstmt->hasReturning; pstmt->hasModifyingCTE = false; pstmt->canSetTag = true; pstmt->transientPlan = false; @@ -183,7 +187,7 @@ ExecSerializePlan(Plan *plan, EState *estate) pstmt->parallelModeNeeded = false; pstmt->planTree = plan; pstmt->rtable = estate->es_range_table; - pstmt->resultRelations = NIL; + pstmt->resultRelations = estate->es_plannedstmt->resultRelations; pstmt->appendRelations = NIL; /* @@ -590,6 +594,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *paramlistinfo_space; BufferUsage *bufusage_space; WalUsage *walusage_space; + uint64 *processed_count_space; SharedExecutorInstrumentation *instrumentation = NULL; SharedJitInstrumentation *jit_instrumentation = NULL; int pstmt_len; @@ -675,6 +680,14 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, mul_size(PARALLEL_TUPLE_QUEUE_SIZE, pcxt->nworkers)); shm_toc_estimate_keys(&pcxt->estimator, 1); + if (IsA(planstate->plan, ModifyTable)) + { + /* Estimate space for returned "# of tuples processed" count. */ + shm_toc_estimate_chunk(&pcxt->estimator, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } + /* * Give parallel-aware nodes a chance to add to the estimates, and get a * count of how many PlanState nodes there are. @@ -764,6 +777,19 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, /* We don't need the TupleQueueReaders yet, though. */ pei->reader = NULL; + if (IsA(planstate->plan, ModifyTable)) + { + /* Allocate space for each worker's returned "# of tuples processed" count. */ + processed_count_space = shm_toc_allocate(pcxt->toc, + mul_size(sizeof(uint64), pcxt->nworkers)); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_PROCESSED_COUNT, processed_count_space); + pei->processed_count = processed_count_space; + } + else + { + pei->processed_count = NULL; + } + /* * If instrumentation options were supplied, allocate space for the data. * It only gets partially initialized here; the rest happens during @@ -1152,6 +1178,15 @@ ExecParallelFinish(ParallelExecutorInfo *pei) for (i = 0; i < nworkers; i++) InstrAccumParallelQuery(&pei->buffer_usage[i], &pei->wal_usage[i]); + /* + * Update total # of tuples processed, using counts from each worker. + */ + if (pei->processed_count != NULL) + { + for (i = 0; i < nworkers; i++) + pei->planstate->state->es_processed += pei->processed_count[i]; + } + pei->finished = true; } @@ -1379,6 +1414,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FixedParallelExecutorState *fpes; BufferUsage *buffer_usage; WalUsage *wal_usage; + uint64 *processed_count; DestReceiver *receiver; QueryDesc *queryDesc; SharedExecutorInstrumentation *instrumentation; @@ -1400,6 +1436,16 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) true); queryDesc = ExecParallelGetQueryDesc(toc, receiver, instrument_options); + Assert(queryDesc->operation == CMD_SELECT || IsModifySupportedInParallelMode(queryDesc->operation)); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* + * Record that the CurrentCommandId is used, at the start of + * the parallel operation. + */ + SetCurrentCommandIdUsedForWorker(); + } + /* Setting debug_query_string for individual workers */ debug_query_string = queryDesc->sourceText; @@ -1458,6 +1504,13 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) InstrEndParallelQuery(&buffer_usage[ParallelWorkerNumber], &wal_usage[ParallelWorkerNumber]); + if (IsModifySupportedInParallelMode(queryDesc->operation)) + { + /* Report the # of tuples processed during parallel INSERT execution. */ + processed_count = shm_toc_lookup(toc, PARALLEL_KEY_PROCESSED_COUNT, false); + processed_count[ParallelWorkerNumber] = queryDesc->estate->es_processed; + } + /* Report instrumentation data if any instrumentation options are set. */ if (instrumentation != NULL) ExecParallelReportInstrumentation(queryDesc->planstate, diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c index a01b46a..dfe5442 100644 --- a/src/backend/executor/nodeGather.c +++ b/src/backend/executor/nodeGather.c @@ -35,6 +35,7 @@ #include "executor/execdebug.h" #include "executor/execParallel.h" #include "executor/nodeGather.h" +#include "executor/nodeModifyTable.h" #include "executor/nodeSubplan.h" #include "executor/tqueue.h" #include "miscadmin.h" @@ -60,6 +61,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags) GatherState *gatherstate; Plan *outerNode; TupleDesc tupDesc; + Index varno; /* Gather node doesn't have innerPlan node. */ Assert(innerPlan(node) == NULL); @@ -104,7 +106,9 @@ ExecInitGather(Gather *node, EState *estate, int eflags) * Initialize result type and projection. */ ExecInitResultTypeTL(&gatherstate->ps); - ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); + varno = (IsA(outerNode, ModifyTable) && castNode(ModifyTable, outerNode)->returningLists != NULL) ? + castNode(ModifyTableState, outerPlanState(gatherstate))->resultRelInfo->ri_RangeTableIndex : OUTER_VAR; + ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, varno); /* * Without projections result slot type is not trivially known, see @@ -144,9 +148,19 @@ ExecGather(PlanState *pstate) GatherState *node = castNode(GatherState, pstate); TupleTableSlot *slot; ExprContext *econtext; + ModifyTableState *nodeModifyTableState = NULL; + bool isParallelModifyLeader = false; + bool isParallelModifyWithReturning = false; CHECK_FOR_INTERRUPTS(); + if (IsA(outerPlanState(pstate), ModifyTableState)) + { + nodeModifyTableState = castNode(ModifyTableState, outerPlanState(pstate)); + isParallelModifyLeader = IsModifySupportedInParallelMode(nodeModifyTableState->operation); + isParallelModifyWithReturning = isParallelModifyLeader && nodeModifyTableState->ps.plan->targetlist != NIL; + } + /* * Initialize the parallel context and workers on first execution. We do * this on first execution rather than during node initialization, as it @@ -178,6 +192,16 @@ ExecGather(PlanState *pstate) node->pei, gather->initParam); + if (isParallelModifyLeader) + { + /* + * For a supported parallel table-modification command, if there + * are BEFORE STATEMENT triggers, these must be fired by the leader, + * not by the parallel workers. + */ + fireBSTriggersInLeader(nodeModifyTableState); + } + /* * Register backend workers. We might not get as many as we * requested, or indeed any at all. @@ -188,7 +212,7 @@ ExecGather(PlanState *pstate) node->nworkers_launched = pcxt->nworkers_launched; /* Set up tuple queue readers to read the results. */ - if (pcxt->nworkers_launched > 0) + if (pcxt->nworkers_launched > 0 && !(isParallelModifyLeader && !isParallelModifyWithReturning)) { ExecParallelCreateReaders(node->pei); /* Make a working array showing the active readers */ @@ -200,7 +224,11 @@ ExecGather(PlanState *pstate) } else { - /* No workers? Then never mind. */ + /* + * No workers were launched, or this is a supported parallel + * table-modification command without a RETURNING clause - + * no readers are required. + */ node->nreaders = 0; node->reader = NULL; } @@ -208,7 +236,7 @@ ExecGather(PlanState *pstate) } /* Run plan locally if no workers or enabled and not single-copy. */ - node->need_to_scan_locally = (node->nreaders == 0) + node->need_to_scan_locally = (node->nworkers_launched <= 0) || (!gather->single_copy && parallel_leader_participation); node->initialized = true; } @@ -418,14 +446,31 @@ ExecShutdownGatherWorkers(GatherState *node) void ExecShutdownGather(GatherState *node) { - ExecShutdownGatherWorkers(node); + /* + * If the parallel context has already been destroyed, this + * function must have been previously called, so just + * return. + */ + if (node->pei == NULL) + return; - /* Now destroy the parallel context. */ - if (node->pei != NULL) + bool isParallelModifyLeader = IsA(outerPlanState(node), ModifyTableState) && + IsModifySupportedInParallelMode(castNode(ModifyTableState, outerPlanState(node))->operation); + if (isParallelModifyLeader) { - ExecParallelCleanup(node->pei); - node->pei = NULL; + /* For a supported parallel table-modification command, if there are + * AFTER STATEMENT triggers, these must be fired by the leader, not + * by the parallel workers. + */ + ModifyTableState *nodeModifyTableState = castNode(ModifyTableState, outerPlanState(node)); + fireASTriggersInLeader(nodeModifyTableState); } + + ExecShutdownGatherWorkers(node); + + /* Now destroy the parallel context. */ + ExecParallelCleanup(node->pei); + node->pei = NULL; } /* ---------------------------------------------------------------- diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 4712934..9f4a700 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -1,4 +1,4 @@ -/*------------------------------------------------------------------------- +/*------------------------------------------------------------------------ * * nodeGatherMerge.c * Scan a plan in multiple workers, and do order-preserving merge. @@ -210,6 +210,15 @@ ExecGatherMerge(PlanState *pstate) { ParallelContext *pcxt; + if (IsModifySupportedInParallelMode(estate->es_plannedstmt->commandType)) + { + /* + * Assign FullTransactionId, to be included in the + * transaction state that is serialized in the DSM. + */ + GetCurrentFullTransactionId(); + } + /* Initialize, or re-initialize, shared state needed by workers. */ if (!node->pei) node->pei = ExecInitParallelPlan(node->ps.lefttree, diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index 29e07b7..46402d7 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -39,6 +39,7 @@ #include "access/heapam.h" #include "access/htup_details.h" +#include "access/parallel.h" #include "access/tableam.h" #include "access/xact.h" #include "catalog/catalog.h" @@ -1833,6 +1834,39 @@ fireASTriggers(ModifyTableState *node) } /* + * Process BEFORE EACH STATEMENT triggers, in the leader + */ +void +fireBSTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + if (node->fireBSTriggers) + { + fireBSTriggers(node); + node->fireBSTriggers = false; + + /* + * Disable firing of AFTER STATEMENT triggers by local + * plan execution (ModifyTable processing). These will be + * fired at end of Gather processing. + */ + node->fireASTriggers = false; + } +} + +/* + * Process AFTER EACH STATEMENT triggers, in the leader + */ +void +fireASTriggersInLeader(ModifyTableState *node) +{ + Assert(IsInParallelMode() && !IsParallelWorker()); + + fireASTriggers(node); +} + +/* * Set up the state needed for collecting transition tuples for AFTER * triggers. */ @@ -2158,7 +2192,11 @@ ExecModifyTable(PlanState *pstate) /* * We're done, but fire AFTER STATEMENT triggers before exiting. */ - fireASTriggers(node); + if (node->fireASTriggers) + { + fireASTriggers(node); + node->fireASTriggers = false; + } node->mt_done = true; @@ -2235,7 +2273,9 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* set up epqstate with dummy subplan data for the moment */ EvalPlanQualInit(&mtstate->mt_epqstate, estate, NULL, NIL, node->epqParam); - mtstate->fireBSTriggers = true; + /* Statement-level triggers must not be fired by parallel workers */ + mtstate->fireBSTriggers = !IsParallelWorker(); + mtstate->fireASTriggers = !IsParallelWorker(); /* * Build state for collecting transition tuples. This requires having a diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index 733f7ea..7aa10b9 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -213,6 +213,52 @@ clamp_row_est(double nrows) /* + * cost_modifytable + * Determines and returns the cost of a ModifyTable node. + */ +void +cost_modifytable(ModifyTablePath *path) +{ + double total_size; + ListCell *lc; + + /* + * Compute cost & rowcount as sum of subpath costs & rowcounts. + * + * Currently, we don't charge anything extra for the actual table + * modification work, nor for the WITH CHECK OPTIONS or RETURNING + * expressions if any. + */ + path->path.startup_cost = 0; + path->path.total_cost = 0; + path->path.rows = 0; + total_size = 0; + foreach(lc, path->subpaths) + { + Path *subpath = (Path *) lfirst(lc); + + if (lc == list_head(path->subpaths)) /* first node? */ + path->path.startup_cost = subpath->startup_cost; + path->path.total_cost += subpath->total_cost; + if (path->returningLists != NIL) + { + path->path.rows += subpath->rows; + total_size += subpath->pathtarget->width * subpath->rows; + } + } + + /* + * Set width to the average width of the subpath outputs. XXX this is + * totally wrong: we should return an average of the RETURNING tlist + * widths. But it's what happened historically, and improving it is a task + * for another day. + */ + if (path->path.rows > 0) + total_size /= path->path.rows; + path->path.pathtarget->width = rint(total_size); +} + +/* * cost_seqscan * Determines and returns the cost of scanning a relation sequentially. * diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 94280a7..5893051 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -339,7 +339,7 @@ create_plan(PlannerInfo *root, Path *best_path) * top-level tlist seen at execution time. However, ModifyTable plan * nodes don't have a tlist matching the querytree targetlist. */ - if (!IsA(plan, ModifyTable)) + if (!IsA(plan, ModifyTable) && !(IsA(plan, Gather) && IsA(outerPlan(plan), ModifyTable))) apply_tlist_labeling(plan->targetlist, root->processed_tlist); /* diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 986d7a5..b0e9c34 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -318,11 +318,11 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data using a CTE, or if this is a cursor operation, or if + * GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE + * (Note that we do allow CREATE TABLE AS, INSERT, SELECT INTO, and CREATE * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader * backend writes into a completely new table. In the future, we can * extend it to allow workers to write into the table. However, to allow @@ -336,7 +336,8 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, */ if ((cursorOptions & CURSOR_OPT_PARALLEL_OK) != 0 && IsUnderPostmaster && - parse->commandType == CMD_SELECT && + (parse->commandType == CMD_SELECT || + IsModifySupportedInParallelMode(parse->commandType)) && !parse->hasModifyingCTE && max_parallel_workers_per_gather > 0 && !IsParallelWorker()) @@ -344,6 +345,18 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* all the cheap tests pass, so scan the query tree */ glob->maxParallelHazard = max_parallel_hazard(parse); glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); + + /* + * Additional parallel-mode safety checks are required in order to + * allowing an underlying parallel query to be used for a + * supported table-modification command. + */ + if (glob->parallelModeOK && + IsModifySupportedInParallelMode(parse->commandType)) + { + glob->maxParallelHazard = MaxParallelHazardForModify(parse, &glob->maxParallelHazard); + glob->parallelModeOK = (glob->maxParallelHazard != PROPARALLEL_UNSAFE); + } } else { @@ -1794,7 +1807,8 @@ inheritance_planner(PlannerInfo *root) returningLists, rowMarks, NULL, - assign_special_exec_param(root))); + assign_special_exec_param(root), + 0)); } /*-------------------- @@ -1842,6 +1856,7 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, RelOptInfo *final_rel; FinalPathExtraData extra; ListCell *lc; + int parallel_modify_partial_path_count = 0; /* Tweak caller-supplied tuple_fraction if have LIMIT/OFFSET */ if (parse->limitCount || parse->limitOffset) @@ -2378,13 +2393,102 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, returningLists, rowMarks, parse->onConflict, - assign_special_exec_param(root)); + assign_special_exec_param(root), + 0); } /* And shove it into final_rel */ add_path(final_rel, path); } + /* Consider a supported parallel table-modification command */ + if (IsModifySupportedInParallelMode(parse->commandType) && + !inheritance_update && + final_rel->consider_parallel && + parse->rowMarks == NIL) + { + Index rootRelation; + List *withCheckOptionLists; + List *returningLists; + int parallelModifyWorkers; + + /* + * Generate partial paths for the final_rel. Insert all surviving paths, with + * Limit, and/or ModifyTable steps added if needed. + */ + foreach(lc, current_rel->partial_pathlist) + { + Path *path = (Path *) lfirst(lc); + + /* + * If there is a LIMIT/OFFSET clause, add the LIMIT node. + */ + if (limit_needed(parse)) + { + path = (Path *) create_limit_path(root, final_rel, path, + parse->limitOffset, + parse->limitCount, + parse->limitOption, + offset_est, count_est); + } + + /* + * Add the ModifyTable node. + */ + + /* + * If target is a partition root table, we need to mark the + * ModifyTable node appropriately for that. + */ + if (rt_fetch(parse->resultRelation, parse->rtable)->relkind == + RELKIND_PARTITIONED_TABLE) + rootRelation = parse->resultRelation; + else + rootRelation = 0; + + /* + * Set up the WITH CHECK OPTION and RETURNING lists-of-lists, if + * needed. + */ + if (parse->withCheckOptions) + withCheckOptionLists = list_make1(parse->withCheckOptions); + else + withCheckOptionLists = NIL; + + if (parse->returningList) + returningLists = list_make1(parse->returningList); + else + returningLists = NIL; + + /* + * For the number of workers to use for a parallel INSERT/UPDATE/DELETE, + * it seems resonable to use the same number of workers as estimated + * for the underlying query. + */ + parallelModifyWorkers = path->parallel_workers; + + path = (Path *) + create_modifytable_path(root, final_rel, + parse->commandType, + parse->canSetTag, + parse->resultRelation, + rootRelation, + false, + list_make1_int(parse->resultRelation), + list_make1(path), + list_make1(root), + withCheckOptionLists, + returningLists, + root->rowMarks, + parse->onConflict, + assign_special_exec_param(root), + parallelModifyWorkers); + + add_partial_path(final_rel, path); + parallel_modify_partial_path_count++; + } + } + /* * Generate partial paths for final_rel, too, if outer query levels might * be able to make use of them. @@ -2401,6 +2505,12 @@ grouping_planner(PlannerInfo *root, bool inheritance_update, } } + if (parallel_modify_partial_path_count > 0) + { + final_rel->rows = current_rel->rows; /* ??? why hasn't this been set above somewhere ???? */ + generate_useful_gather_paths(root, final_rel, false); + } + extra.limit_needed = limit_needed(parse); extra.limit_tuples = limit_tuples; extra.count_est = count_est; @@ -7570,7 +7680,25 @@ apply_scanjoin_target_to_paths(PlannerInfo *root, * one of the generated paths may turn out to be the cheapest one. */ if (rel->consider_parallel && !IS_OTHER_REL(rel)) - generate_useful_gather_paths(root, rel, false); + { + if (IsModifySupportedInParallelMode(root->parse->commandType)) + { + if (root->glob->maxParallelHazard == PROPARALLEL_RESTRICTED) + { + /* + * Don't allow a supported parallel table-modification command, + * because it's not safe. However, do allow any underlying query + * to be run by parallel workers. + */ + generate_useful_gather_paths(root, rel, false); + rel->consider_parallel = false; + } + } + else + { + generate_useful_gather_paths(root, rel, false); + } + } /* * Reassess which paths are the cheapest, now that we've potentially added diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 8b43371..921c4bc 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -252,6 +252,7 @@ set_plan_references(PlannerInfo *root, Plan *plan) PlannerGlobal *glob = root->glob; int rtoffset = list_length(glob->finalrtable); ListCell *lc; + Plan *finalPlan; /* * Add all the query's RTEs to the flattened rangetable. The live ones @@ -302,7 +303,16 @@ set_plan_references(PlannerInfo *root, Plan *plan) } /* Now fix the Plan tree */ - return set_plan_refs(root, plan, rtoffset); + finalPlan = set_plan_refs(root, plan, rtoffset); + if (finalPlan != NULL && IsA(finalPlan, Gather)) + { + Plan *subplan = outerPlan(finalPlan); + if (IsA(subplan, ModifyTable) && castNode(ModifyTable, subplan)->returningLists != NULL) + { + finalPlan->targetlist = outerPlan(finalPlan)->targetlist; + } + } + return finalPlan; } /* diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index e7d8146..fad55e4 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -19,13 +19,19 @@ #include "postgres.h" +#include "access/genam.h" #include "access/htup_details.h" +#include "access/table.h" +#include "catalog/index.h" +#include "catalog/indexing.h" #include "catalog/pg_aggregate.h" #include "catalog/pg_class.h" +#include "catalog/pg_constraint.h" #include "catalog/pg_language.h" #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -42,7 +48,11 @@ #include "parser/parse_agg.h" #include "parser/parse_coerce.h" #include "parser/parse_func.h" +#include "parser/parsetree.h" +#include "partitioning/partdesc.h" +#include "rewrite/rewriteHandler.h" #include "rewrite/rewriteManip.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/acl.h" #include "utils/builtins.h" @@ -50,6 +60,8 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/partcache.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/typcache.h" @@ -157,6 +169,13 @@ static Query *substitute_actual_srf_parameters(Query *expr, static Node *substitute_actual_srf_parameters_mutator(Node *node, substitute_actual_srf_parameters_context *context); +static char MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc, + max_parallel_hazard_context *context); +static char MaxIndexExprsParallelHazardForModify(Relation rel, + max_parallel_hazard_context *context); +static char MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context); +static char MaxRelParallelHazardForModify(Oid relid, CmdType commandType, + max_parallel_hazard_context *context); /***************************************************************************** * Aggregate-function clause manipulation @@ -1073,6 +1092,392 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) context); } +/* + * MaxTriggerDataParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for the specified trigger data. + */ +static char +MaxTriggerDataParallelHazardForModify(TriggerDesc *trigdesc, + max_parallel_hazard_context *context) +{ + int i; + + for (i = 0; i < trigdesc->numtriggers; i++) + { + Trigger *trigger = &trigdesc->triggers[i]; + int trigtype; + + if (max_parallel_hazard_test(func_parallel(trigger->tgfoid), context)) + break; + + /* + * If the trigger type is RI_TRIGGER_FK, this indicates a FK exists in + * the relation, and this would result in creation of new CommandIds + * on insert/update/delete and this isn't supported in a parallel + * worker (but is safe in the parallel leader). + */ + trigtype = RI_FKey_trigger_type(trigger->tgfoid); + if (trigtype == RI_TRIGGER_FK) + { + context->max_hazard = PROPARALLEL_RESTRICTED; + /* + * As we're looking for the max parallel hazard, we don't break + * here; examine any further triggers ... + */ + } + } + + return context->max_hazard; +} + +/* + * MaxIndexExprsParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for any existing index + * expressions of a specified relation. + */ +static char +MaxIndexExprsParallelHazardForModify(Relation rel, + max_parallel_hazard_context *context) +{ + List *indexOidList; + ListCell *lc; + LOCKMODE lockmode = AccessShareLock; + + indexOidList = RelationGetIndexList(rel); + foreach(lc, indexOidList) + { + Oid indexOid = lfirst_oid(lc); + Relation indexRel; + IndexInfo *indexInfo; + + if (ConditionalLockRelationOid(indexOid, lockmode)) + { + indexRel = index_open(indexOid, NoLock); + } + else + { + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + indexInfo = BuildIndexInfo(indexRel); + + if (indexInfo->ii_Expressions != NIL) + { + int i; + ListCell *indexExprItem = list_head(indexInfo->ii_Expressions); + + for (i = 0; i < indexInfo->ii_NumIndexAttrs; i++) + { + int keycol = indexInfo->ii_IndexAttrNumbers[i]; + if (keycol == 0) + { + /* Found an index expression */ + + Node *indexExpr; + + if (indexExprItem == NULL) /* shouldn't happen */ + elog(ERROR, "too few entries in indexprs list"); + + indexExpr = (Node *)lfirst(indexExprItem); + indexExpr = (Node *)expression_planner((Expr *)indexExpr); + + if (max_parallel_hazard_walker(indexExpr, context) == PROPARALLEL_UNSAFE) + { + index_close(indexRel, lockmode); + return context->max_hazard; + } + + indexExprItem = lnext(indexInfo->ii_Expressions, indexExprItem); + } + } + } + index_close(indexRel, lockmode); + } + + return context->max_hazard; +} + +/* + * MaxDomainParallelHazardForModify + * + * Finds the maximum parallel-mode hazard level for the specified DOMAIN type. + * Only any CHECK expressions are examined for parallel safety. + * DEFAULT values of DOMAIN-type columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan of the + * query tree in standard_planner(). + * + */ +static char +MaxDomainParallelHazardForModify(Oid typid, max_parallel_hazard_context *context) +{ + Relation conRel; + ScanKeyData key[1]; + SysScanDesc scan; + HeapTuple tup; + + LOCKMODE lockmode = AccessShareLock; + + conRel = table_open(ConstraintRelationId, lockmode); + + ScanKeyInit(&key[0], + Anum_pg_constraint_contypid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(typid)); + scan = systable_beginscan(conRel, ConstraintTypidIndexId, true, + NULL, 1, key); + + while (HeapTupleIsValid((tup = systable_getnext(scan)))) + { + Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(tup); + + if (con->contype == CONSTRAINT_CHECK) + { + char *conbin; + Datum val; + bool isnull; + Expr *checkExpr; + + val = SysCacheGetAttr(CONSTROID, tup, + Anum_pg_constraint_conbin, &isnull); + if (isnull) + elog(ERROR, "null conbin for constraint %u", con->oid); + conbin = TextDatumGetCString(val); + checkExpr = stringToNode(conbin); + if (max_parallel_hazard_walker((Node *)checkExpr, context)) + { + break; + } + } + } + + systable_endscan(scan); + table_close(conRel, lockmode); + return context->max_hazard; +} + +/* + * MaxRelParallelHazardForModify + * + * Determines the maximum parallel-mode hazard level for modification + * of a specified relation. + */ +static char +MaxRelParallelHazardForModify(Oid relid, + CmdType commandType, + max_parallel_hazard_context *context) +{ + Relation rel; + TupleDesc tupdesc; + int attnum; + + LOCKMODE lockmode = AccessShareLock; + + /* + * It's possible that this relation is locked for exclusive access + * in another concurrent transaction (e.g. as a result of a + * ALTER TABLE ... operation) until that transaction completes. + * If a share-lock can't be acquired on it now, we have to assume this + * could be the worst-case, so to avoid blocking here until that + * transaction completes, conditionally try to acquire the lock and + * assume and return UNSAFE on failure. + */ + if (ConditionalLockRelationOid(relid, lockmode)) + { + rel = table_open(relid, NoLock); + } + else + { + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + /* + * We can't support table modification in parallel-mode if it's a + * foreign table/partition (no FDW API for supporting parallel access) + * or a temporary table. + */ + if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE || + RelationUsesLocalBuffers(rel)) + { + table_close(rel, lockmode); + context->max_hazard = PROPARALLEL_UNSAFE; + return context->max_hazard; + } + + /* + * If a partitioned table, check that each partition is safe for + * modification in parallel-mode. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + int i; + PartitionDesc pdesc; + PartitionKey pkey; + ListCell *partexprs_item; + int partnatts; + List *partexprs; + + pkey = RelationGetPartitionKey(rel); + + partnatts = get_partition_natts(pkey); + partexprs = get_partition_exprs(pkey); + + partexprs_item = list_head(partexprs); + for (i = 0; i < partnatts; i++) + { + /* Check parallel-safety of partition key support functions */ + if (OidIsValid(pkey->partsupfunc[i].fn_oid)) + { + if (max_parallel_hazard_test(func_parallel(pkey->partsupfunc[i].fn_oid), context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + + /* Check parallel-safety of any expressions in the partition key */ + if (get_partition_col_attnum(pkey, i) == 0) + { + Node *checkExpr = (Node *) lfirst(partexprs_item); + if (max_parallel_hazard_walker(checkExpr, context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + partexprs_item = lnext(partexprs, partexprs_item); + } + } + + /* Recursively check each partition ... */ + pdesc = RelationGetPartitionDesc(rel); + for (i = 0; i < pdesc->nparts; i++) + { + if (MaxRelParallelHazardForModify(pdesc->oids[i], commandType, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + + /* + * If there are any index expressions, check that they are parallel-mode + * safe. + */ + if (MaxIndexExprsParallelHazardForModify(rel, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + /* + * If any triggers exist, check that they are parallel safe. + */ + if (rel->trigdesc != NULL && + MaxTriggerDataParallelHazardForModify(rel->trigdesc, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + + if (commandType == CMD_INSERT || commandType == CMD_UPDATE) + { + /* + * Column default expressions for columns in the target-list are already + * being checked for parallel-safety in the max_parallel_hazard() scan of the + * query tree in standard_planner(). + */ + + tupdesc = RelationGetDescr(rel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + /* + * If the column is of a DOMAIN type, determine whether that domain + * has any CHECK expressions that are not parallel-mode safe. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + if (MaxDomainParallelHazardForModify(att->atttypid, context) == PROPARALLEL_UNSAFE) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + } + + /* + * Check if there are any CHECK constraints which are not parallel-safe. + */ + if ((commandType == CMD_INSERT || commandType == CMD_UPDATE) && + tupdesc->constr != NULL && + tupdesc->constr->num_check > 0) + { + int i; + + ConstrCheck *check = tupdesc->constr->check; + + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *checkExpr = stringToNode(check->ccbin); + if (max_parallel_hazard_walker((Node *)checkExpr, context)) + { + table_close(rel, lockmode); + return context->max_hazard; + } + } + } + + table_close(rel, lockmode); + return context->max_hazard; +} + +/* + * MaxParallelHazardForModify + * + * Determines the worst parallel-mode hazard level for the specified + * table-modification statement, based on the statement attributes and + * target table. An initial max parallel hazard level may optionally be + * supplied. The search returns the earliest in the following list: + * PROPARALLEL_UNSAFE, PROPARALLEL_RESTRICTED, PROPARALLEL_SAFE + */ +char +MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard) +{ + RangeTblEntry *rte; + max_parallel_hazard_context context; + + + /* + * UPDATE is not currently supported in parallel-mode, so prohibit + * INSERT...ON CONFLICT...DO UPDATE... + */ + if (parse->onConflict != NULL && parse->onConflict->action == ONCONFLICT_UPDATE) + return PROPARALLEL_UNSAFE; + + /* + * Setup the context used in finding the max parallel-mode hazard. + */ + Assert(initialMaxParallelHazard == NULL || + *initialMaxParallelHazard == PROPARALLEL_SAFE || + *initialMaxParallelHazard == PROPARALLEL_RESTRICTED); + context.max_hazard = initialMaxParallelHazard == NULL ? + PROPARALLEL_SAFE : *initialMaxParallelHazard; + context.max_interesting = PROPARALLEL_UNSAFE; + context.safe_param_ids = NIL; + + rte = rt_fetch(parse->resultRelation, parse->rtable); + return (MaxRelParallelHazardForModify(rte->relid, parse->commandType, &context)); +} /***************************************************************************** * Check clauses for nonstrict functions diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index 5281a2f..30dc022 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -3528,6 +3528,7 @@ create_lockrows_path(PlannerInfo *root, RelOptInfo *rel, * 'rowMarks' is a list of PlanRowMarks (non-locking only) * 'onconflict' is the ON CONFLICT clause, or NULL * 'epqParam' is the ID of Param for EvalPlanQual re-eval + * 'parallelWorkers' is the no. of parallel workers to use */ ModifyTablePath * create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, @@ -3538,10 +3539,10 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam) + int epqParam, + int parallelWorkers) { ModifyTablePath *pathnode = makeNode(ModifyTablePath); - double total_size; ListCell *lc; Assert(list_length(resultRelations) == list_length(subpaths)); @@ -3558,47 +3559,21 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, /* For now, assume we are above any joins, so no parameterization */ pathnode->path.param_info = NULL; pathnode->path.parallel_aware = false; - pathnode->path.parallel_safe = false; - pathnode->path.parallel_workers = 0; - pathnode->path.pathkeys = NIL; - - /* - * Compute cost & rowcount as sum of subpath costs & rowcounts. - * - * Currently, we don't charge anything extra for the actual table - * modification work, nor for the WITH CHECK OPTIONS or RETURNING - * expressions if any. It would only be window dressing, since - * ModifyTable is always a top-level node and there is no way for the - * costs to change any higher-level planning choices. But we might want - * to make it look better sometime. - */ - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; - pathnode->path.rows = 0; - total_size = 0; - foreach(lc, subpaths) + pathnode->path.parallel_safe = rel->consider_parallel && parallelWorkers > 0; + if (pathnode->path.parallel_safe) { - Path *subpath = (Path *) lfirst(lc); - - if (lc == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; - if (returningLists != NIL) + foreach (lc, subpaths) { - pathnode->path.rows += subpath->rows; - total_size += subpath->pathtarget->width * subpath->rows; + Path *sp = (Path *)lfirst(lc); + if (!sp->parallel_safe) + { + pathnode->path.parallel_safe = false; + break; + } } } - - /* - * Set width to the average width of the subpath outputs. XXX this is - * totally wrong: we should return an average of the RETURNING tlist - * widths. But it's what happened historically, and improving it is a task - * for another day. - */ - if (pathnode->path.rows > 0) - total_size /= pathnode->path.rows; - pathnode->path.pathtarget->width = rint(total_size); + pathnode->path.parallel_workers = parallelWorkers; + pathnode->path.pathkeys = NIL; pathnode->operation = operation; pathnode->canSetTag = canSetTag; @@ -3614,6 +3589,8 @@ create_modifytable_path(PlannerInfo *root, RelOptInfo *rel, pathnode->onconflict = onconflict; pathnode->epqParam = epqParam; + cost_modifytable(pathnode); + return pathnode; } diff --git a/src/include/access/xact.h b/src/include/access/xact.h index 7320de3..f43a844 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -386,6 +386,7 @@ extern FullTransactionId GetTopFullTransactionId(void); extern FullTransactionId GetTopFullTransactionIdIfAny(void); extern FullTransactionId GetCurrentFullTransactionId(void); extern FullTransactionId GetCurrentFullTransactionIdIfAny(void); +extern void SetCurrentCommandIdUsedForWorker(void); extern void MarkCurrentTransactionIdLoggedIfAny(void); extern bool SubTransactionIsActive(SubTransactionId subxid); extern CommandId GetCurrentCommandId(bool used); @@ -466,5 +467,20 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void PrepareParallelModeForModify(CmdType commandType, bool isParallelModifyLeader); + +/* + * IsModifySupportedInParallelMode + * + * Indicates whether execution of the specified table-modification command + * (INSERT/UPDATE/DELETE) in parallel-mode is supported, subject to certain + * parallel-safety conditions. + */ +static inline bool +IsModifySupportedInParallelMode(CmdType commandType) +{ + /* Currently only INSERT is supported */ + return (commandType == CMD_INSERT); +} #endif /* XACT_H */ diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h index 5a39a5b..afb8a57 100644 --- a/src/include/executor/execParallel.h +++ b/src/include/executor/execParallel.h @@ -27,6 +27,7 @@ typedef struct ParallelExecutorInfo ParallelContext *pcxt; /* parallel context we're using */ BufferUsage *buffer_usage; /* points to bufusage area in DSM */ WalUsage *wal_usage; /* walusage area in DSM */ + uint64 *processed_count; /* processed tuple count area in DSM */ SharedExecutorInstrumentation *instrumentation; /* optional */ struct SharedJitInstrumentation *jit_instrumentation; /* optional */ dsa_area *area; /* points to DSA area in DSM */ diff --git a/src/include/executor/nodeModifyTable.h b/src/include/executor/nodeModifyTable.h index 46a2dc9..e332482 100644 --- a/src/include/executor/nodeModifyTable.h +++ b/src/include/executor/nodeModifyTable.h @@ -22,5 +22,6 @@ extern void ExecComputeStoredGenerated(ResultRelInfo *resultRelInfo, extern ModifyTableState *ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags); extern void ExecEndModifyTable(ModifyTableState *node); extern void ExecReScanModifyTable(ModifyTableState *node); - +extern void fireBSTriggersInLeader(ModifyTableState *node); +extern void fireASTriggersInLeader(ModifyTableState *node); #endif /* NODEMODIFYTABLE_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 6c0a7d6..c558eef 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -1175,8 +1175,8 @@ typedef struct ModifyTableState List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */ EPQState mt_epqstate; /* for evaluating EvalPlanQual rechecks */ - bool fireBSTriggers; /* do we need to fire stmt triggers? */ - + bool fireBSTriggers; /* do we need to fire before stmt triggers? */ + bool fireASTriggers; /* do we need to fire after stmt triggers? */ /* * Slot for storing tuples in the root partitioned table's rowtype during * an UPDATE of a partitioned table. diff --git a/src/include/optimizer/clauses.h b/src/include/optimizer/clauses.h index 7ef8cce..12662eb 100644 --- a/src/include/optimizer/clauses.h +++ b/src/include/optimizer/clauses.h @@ -55,5 +55,6 @@ extern void CommuteOpExpr(OpExpr *clause); extern Query *inline_set_returning_function(PlannerInfo *root, RangeTblEntry *rte); +extern char MaxParallelHazardForModify(Query *parse, const char *initialMaxParallelHazard); #endif /* CLAUSES_H */ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6141654..fafa087 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -69,6 +69,7 @@ extern PGDLLIMPORT int constraint_exclusion; extern double index_pages_fetched(double tuples_fetched, BlockNumber pages, double index_pages, PlannerInfo *root); +extern void cost_modifytable(ModifyTablePath *path); extern void cost_seqscan(Path *path, PlannerInfo *root, RelOptInfo *baserel, ParamPathInfo *param_info); extern void cost_samplescan(Path *path, PlannerInfo *root, RelOptInfo *baserel, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index 715a24a..2d08f0c 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -264,7 +264,8 @@ extern ModifyTablePath *create_modifytable_path(PlannerInfo *root, List *subroots, List *withCheckOptionLists, List *returningLists, List *rowMarks, OnConflictExpr *onconflict, - int epqParam); + int epqParam, + int parallel_workers); extern LimitPath *create_limit_path(PlannerInfo *root, RelOptInfo *rel, Path *subpath, Node *limitOffset, Node *limitCount, -- 1.8.3.1