From cfa70d2414dd5ad3c447af700ce55ae18a77f774 Mon Sep 17 00:00:00 2001 From: Tomas Vondra Date: Tue, 22 Jul 2025 20:56:27 +0200 Subject: [PATCH v1] PoC: introduce CustomJoin as separate from CustomScan Introduce a new CustomJoin node, intended for out-of-core join nodes. The CustomScan is supposed to allow that, but there's a couple pieces that make it inconvenient. The targetlist uses INDEX_VAR, so it's not clear which relation the Vars come from, etc. So this adds a new node, descended from Join (not Scan), with all the processing specific to joins (in setrefs etc.). Experimental. Does are not updated, some bits may be irrelevant for joins, etc. --- src/backend/commands/explain.c | 77 ++++++++- src/backend/executor/execAmi.c | 4 + src/backend/executor/execParallel.c | 20 +++ src/backend/executor/execProcnode.c | 12 ++ src/backend/executor/nodeCustom.c | 199 +++++++++++++++++++++++- src/backend/nodes/extensible.c | 25 +++ src/backend/nodes/gen_node_support.pl | 26 +++- src/backend/nodes/nodeFuncs.c | 7 + src/backend/optimizer/plan/createplan.c | 73 +++++++++ src/backend/optimizer/plan/setrefs.c | 44 ++++++ src/backend/optimizer/plan/subselect.c | 26 ++++ src/backend/utils/adt/ruleutils.c | 2 + src/include/executor/nodeCustom.h | 20 +++ src/include/nodes/execnodes.h | 30 +++- src/include/nodes/extensible.h | 60 ++++++- src/include/nodes/plannodes.h | 38 +++++ src/tools/pgindent/typedefs.list | 6 +- 17 files changed, 653 insertions(+), 16 deletions(-) diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 7e2792ead71..86bff53692d 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -159,8 +159,10 @@ static void ExplainMemberNodes(PlanState **planstates, int nplans, static void ExplainMissingMembers(int nplans, int nchildren, ExplainState *es); static void ExplainSubPlans(List *plans, List *ancestors, const char *relationship, ExplainState *es); -static void ExplainCustomChildren(CustomScanState *css, - List *ancestors, ExplainState *es); +static void ExplainCustomScanChildren(CustomScanState *css, + List *ancestors, ExplainState *es); +static void ExplainCustomJoinChildren(CustomJoinState *cjs, + List *ancestors, ExplainState *es); static ExplainWorkersState *ExplainCreateWorkersState(int num_workers); static void ExplainOpenWorker(int n, ExplainState *es); static void ExplainCloseWorker(int n, ExplainState *es); @@ -1210,6 +1212,10 @@ ExplainPreScanNode(PlanState *planstate, Bitmapset **rels_used) *rels_used = bms_add_members(*rels_used, ((CustomScan *) plan)->custom_relids); break; + case T_CustomJoin: + *rels_used = bms_add_members(*rels_used, + ((CustomJoin *) plan)->custom_relids); + break; case T_ModifyTable: *rels_used = bms_add_member(*rels_used, ((ModifyTable *) plan)->nominalRelation); @@ -1305,6 +1311,18 @@ plan_is_disabled(Plan *plan) child_disabled_nodes += subplan->disabled_nodes; } } + else if (IsA(plan, CustomJoin)) + { + ListCell *lc; + CustomJoin *cplan = (CustomJoin *) plan; + + foreach(lc, cplan->custom_plans) + { + Plan *subplan = lfirst(lc); + + child_disabled_nodes += subplan->disabled_nodes; + } + } else { /* @@ -1513,6 +1531,15 @@ ExplainNode(PlanState *planstate, List *ancestors, else pname = sname; break; + case T_CustomJoin: + /* XXX jointype switch adds another "Join" (so it's there twice) */ + sname = "Custom Join"; + custom_name = ((CustomJoin *) plan)->methods->CustomName; + if (custom_name) + pname = psprintf("Custom Join (%s)", custom_name); + else + pname = sname; + break; case T_Material: pname = sname = "Materialize"; break; @@ -1712,6 +1739,7 @@ ExplainNode(PlanState *planstate, List *ancestors, case T_NestLoop: case T_MergeJoin: case T_HashJoin: + case T_CustomJoin: { const char *jointype; @@ -2155,6 +2183,25 @@ ExplainNode(PlanState *planstate, List *ancestors, css->methods->ExplainCustomScan(css, ancestors, es); } break; + case T_CustomJoin: + { + CustomJoinState *cjs = (CustomJoinState *) planstate; + + show_upper_qual(((CustomJoin *) plan)->join.joinqual, + "Join Filter", planstate, ancestors, es); + if (((CustomJoin *) plan)->join.joinqual) + show_instrumentation_count("Rows Removed by Join Filter", 1, + planstate, es); + + show_scan_qual(plan->qual, "Filter", planstate, ancestors, es); + if (plan->qual) + show_instrumentation_count("Rows Removed by Filter", 1, + planstate, es); + + if (cjs->methods->ExplainCustomJoin) + cjs->methods->ExplainCustomJoin(cjs, ancestors, es); + } + break; case T_NestLoop: show_upper_qual(((NestLoop *) plan)->join.joinqual, "Join Filter", planstate, ancestors, es); @@ -2354,6 +2401,8 @@ ExplainNode(PlanState *planstate, List *ancestors, IsA(plan, SubqueryScan) || (IsA(planstate, CustomScanState) && ((CustomScanState *) planstate)->custom_ps != NIL) || + (IsA(planstate, CustomJoinState) && + ((CustomJoinState *) planstate)->custom_ps != NIL) || planstate->subPlan; if (haschildren) { @@ -2404,8 +2453,12 @@ ExplainNode(PlanState *planstate, List *ancestors, "Subquery", NULL, es); break; case T_CustomScan: - ExplainCustomChildren((CustomScanState *) planstate, - ancestors, es); + ExplainCustomScanChildren((CustomScanState *) planstate, + ancestors, es); + break; + case T_CustomJoin: + ExplainCustomJoinChildren((CustomJoinState *) planstate, + ancestors, es); break; default: break; @@ -4814,7 +4867,7 @@ ExplainSubPlans(List *plans, List *ancestors, * Explain a list of children of a CustomScan. */ static void -ExplainCustomChildren(CustomScanState *css, List *ancestors, ExplainState *es) +ExplainCustomScanChildren(CustomScanState *css, List *ancestors, ExplainState *es) { ListCell *cell; const char *label = @@ -4824,6 +4877,20 @@ ExplainCustomChildren(CustomScanState *css, List *ancestors, ExplainState *es) ExplainNode((PlanState *) lfirst(cell), ancestors, label, NULL, es); } +/* + * Explain a list of children of a CustomJoin. + */ +static void +ExplainCustomJoinChildren(CustomJoinState *cjs, List *ancestors, ExplainState *es) +{ + ListCell *cell; + const char *label = + (list_length(cjs->custom_ps) != 1 ? "children" : "child"); + + foreach(cell, cjs->custom_ps) + ExplainNode((PlanState *) lfirst(cell), ancestors, label, NULL, es); +} + /* * Create a per-plan-node workspace for collecting per-worker data. * diff --git a/src/backend/executor/execAmi.c b/src/backend/executor/execAmi.c index 1d0e8ad57b4..10da51834f5 100644 --- a/src/backend/executor/execAmi.c +++ b/src/backend/executor/execAmi.c @@ -238,6 +238,10 @@ ExecReScan(PlanState *node) ExecReScanCustomScan((CustomScanState *) node); break; + case T_CustomJoinState: + ExecReScanCustomJoin((CustomJoinState *) node); + break; + case T_NestLoopState: ExecReScanNestLoop((NestLoopState *) node); break; diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index fc76f22fb82..9650310e852 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -276,6 +276,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecCustomScanEstimate((CustomScanState *) planstate, e->pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinEstimate((CustomJoinState *) planstate, + e->pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapEstimate((BitmapHeapScanState *) planstate, @@ -503,6 +508,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecCustomScanInitializeDSM((CustomScanState *) planstate, d->pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinInitializeDSM((CustomJoinState *) planstate, + d->pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeDSM((BitmapHeapScanState *) planstate, @@ -1003,6 +1013,11 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecCustomScanReInitializeDSM((CustomScanState *) planstate, pcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinReInitializeDSM((CustomJoinState *) planstate, + pcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, @@ -1371,6 +1386,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecCustomScanInitializeWorker((CustomScanState *) planstate, pwcxt); break; + case T_CustomJoinState: + if (planstate->plan->parallel_aware) + ExecCustomJoinInitializeWorker((CustomJoinState *) planstate, + pwcxt); + break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c index f5f9cfbeead..c005974507c 100644 --- a/src/backend/executor/execProcnode.c +++ b/src/backend/executor/execProcnode.c @@ -309,6 +309,11 @@ ExecInitNode(Plan *node, EState *estate, int eflags) estate, eflags); break; + case T_CustomJoin: + result = (PlanState *) ExecInitCustomJoin((CustomJoin *) node, + estate, eflags); + break; + /* * materialization nodes */ @@ -699,6 +704,10 @@ ExecEndNode(PlanState *node) ExecEndHashJoin((HashJoinState *) node); break; + case T_CustomJoinState: + ExecEndCustomJoin((CustomJoinState *) node); + break; + /* * materialization nodes */ @@ -817,6 +826,9 @@ ExecShutdownNode_walker(PlanState *node, void *context) case T_HashJoinState: ExecShutdownHashJoin((HashJoinState *) node); break; + case T_CustomJoinState: + ExecShutdownCustomJoin((CustomJoinState *) node); + break; default: break; } diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index ac2196b64c7..38d75fc6341 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -20,6 +20,7 @@ #include "utils/rel.h" static TupleTableSlot *ExecCustomScan(PlanState *pstate); +static TupleTableSlot *ExecCustomJoin(PlanState *pstate); CustomScanState * @@ -160,7 +161,7 @@ ExecCustomRestrPos(CustomScanState *node) void ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->EstimateDSMCustomScan) { @@ -173,7 +174,7 @@ ExecCustomScanEstimate(CustomScanState *node, ParallelContext *pcxt) void ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->InitializeDSMCustomScan) { @@ -189,7 +190,7 @@ ExecCustomScanInitializeDSM(CustomScanState *node, ParallelContext *pcxt) void ExecCustomScanReInitializeDSM(CustomScanState *node, ParallelContext *pcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->ReInitializeDSMCustomScan) { @@ -205,7 +206,7 @@ void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->InitializeWorkerCustomScan) { @@ -220,8 +221,196 @@ ExecCustomScanInitializeWorker(CustomScanState *node, void ExecShutdownCustomScan(CustomScanState *node) { - const CustomExecMethods *methods = node->methods; + const CustomScanExecMethods *methods = node->methods; if (methods->ShutdownCustomScan) methods->ShutdownCustomScan(node); } + +CustomJoinState * +ExecInitCustomJoin(CustomJoin *cjoin, EState *estate, int eflags) +{ + CustomJoinState *cjs; + const TupleTableSlotOps *slotOps; + Relation scan_rel = NULL; + // FIXME + Index scanrelid = 0; + // Index scanrelid = cscan->scan.scanrelid; + // int tlistvarno; + + /* + * Allocate the CustomJoinState object. We let the custom scan provider + * do the palloc, in case it wants to make a larger object that embeds + * CustomJoinState as the first field. It must set the node tag and the + * methods field correctly at this time. Other standard fields should be + * set to zero. + */ + cjs = castNode(CustomJoinState, + cjoin->methods->CreateCustomJoinState(cjoin)); + + /* ensure flags is filled correctly */ + cjs->flags = cjoin->flags; + + /* fill up fields of ScanState */ + cjs->js.ps.plan = &cjoin->join.plan; + cjs->js.ps.state = estate; + cjs->js.ps.ExecProcNode = ExecCustomJoin; + + /* create expression context for node */ + ExecAssignExprContext(estate, &cjs->js.ps); + + /* + * open the scan relation, if any + */ + if (scanrelid > 0) + { + scan_rel = ExecOpenScanRelation(estate, scanrelid, eflags); + // FIXME + // cjs->js.ss_currentRelation = scan_rel; + } + + /* + * Use a custom slot if specified in CustomJoinState or use virtual slot + * otherwise. + */ + slotOps = cjs->slotOps; + if (!slotOps) + slotOps = &TTSOpsVirtual; + + /* + * Determine the scan tuple type. If the custom scan provider provided a + * targetlist describing the scan tuples, use that; else use base + * relation's rowtype. + */ + if (cjoin->custom_join_tlist != NIL || scan_rel == NULL) + { + // TupleDesc scan_tupdesc; + + // FIXME + // scan_tupdesc = ExecTypeFromTL(cjoin->custom_scan_tlist); + // ExecInitScanTupleSlot(estate, &cjs->js, scan_tupdesc, slotOps); + /* Node's targetlist will contain Vars with varno = INDEX_VAR */ + // tlistvarno = INDEX_VAR; + } + else + { + // FIXME + // ExecInitScanTupleSlot(estate, &cjs->js, RelationGetDescr(scan_rel), + // slotOps); + /* Node's targetlist will contain Vars with varno = scanrelid */ + // tlistvarno = scanrelid; + } + + /* + * Initialize result slot, type and projection. + */ + ExecInitResultTupleSlotTL(&cjs->js.ps, &TTSOpsVirtual); + // FIXME + // ExecAssignScanProjectionInfoWithVarno(&cjs->js, tlistvarno); + + /* initialize child expressions */ + cjs->js.ps.qual = + ExecInitQual(cjoin->join.plan.qual, (PlanState *) cjs); + + /* + * The callback of custom-scan provider applies the final initialization + * of the custom-scan-state node according to its logic. + */ + cjs->methods->BeginCustomJoin(cjs, estate, eflags); + + return cjs; +} + +static TupleTableSlot * +ExecCustomJoin(PlanState *pstate) +{ + CustomJoinState *node = castNode(CustomJoinState, pstate); + + CHECK_FOR_INTERRUPTS(); + + Assert(node->methods->ExecCustomJoin != NULL); + return node->methods->ExecCustomJoin(node); +} + +void +ExecEndCustomJoin(CustomJoinState *node) +{ + Assert(node->methods->EndCustomJoin != NULL); + node->methods->EndCustomJoin(node); +} + +void +ExecReScanCustomJoin(CustomJoinState *node) +{ + Assert(node->methods->ReScanCustomJoin != NULL); + node->methods->ReScanCustomJoin(node); +} + +void +ExecCustomJoinEstimate(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->EstimateDSMCustomJoin) + { + node->pscan_len = methods->EstimateDSMCustomJoin(node, pcxt); + shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } +} + +void +ExecCustomJoinInitializeDSM(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->InitializeDSMCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); + methods->InitializeDSMCustomJoin(node, pcxt, coordinate); + shm_toc_insert(pcxt->toc, plan_node_id, coordinate); + } +} + +void +ExecCustomJoinReInitializeDSM(CustomJoinState *node, ParallelContext *pcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->ReInitializeDSMCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); + methods->ReInitializeDSMCustomJoin(node, pcxt, coordinate); + } +} + +void +ExecCustomJoinInitializeWorker(CustomJoinState *node, + ParallelWorkerContext *pwcxt) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->InitializeWorkerCustomJoin) + { + int plan_node_id = node->js.ps.plan->plan_node_id; + void *coordinate; + + coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); + methods->InitializeWorkerCustomJoin(node, pwcxt->toc, coordinate); + } +} + +void +ExecShutdownCustomJoin(CustomJoinState *node) +{ + const CustomJoinExecMethods *methods = node->methods; + + if (methods->ShutdownCustomJoin) + methods->ShutdownCustomJoin(node); +} diff --git a/src/backend/nodes/extensible.c b/src/backend/nodes/extensible.c index 3ede1ee0f5d..04d18dd091b 100644 --- a/src/backend/nodes/extensible.c +++ b/src/backend/nodes/extensible.c @@ -25,6 +25,7 @@ static HTAB *extensible_node_methods = NULL; static HTAB *custom_scan_methods = NULL; +static HTAB *custom_join_methods = NULL; typedef struct { @@ -93,6 +94,18 @@ RegisterCustomScanMethods(const CustomScanMethods *methods) methods); } +/* + * Register a new type of custom join node + */ +void +RegisterCustomJoinMethods(const CustomJoinMethods *methods) +{ + RegisterExtensibleNodeEntry(&custom_join_methods, + "Custom Join Methods", + methods->CustomName, + methods); +} + /* * An internal routine to get an ExtensibleNodeEntry by the given identifier */ @@ -141,3 +154,15 @@ GetCustomScanMethods(const char *CustomName, bool missing_ok) CustomName, missing_ok); } + +/* + * Get the methods for a given name of CustomJoinMethods + */ +const CustomJoinMethods * +GetCustomJoinMethods(const char *CustomName, bool missing_ok) +{ + return (const CustomJoinMethods *) + GetExtensibleNodeEntry(custom_join_methods, + CustomName, + missing_ok); +} diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl index 9ecddb14231..681db989d32 100644 --- a/src/backend/nodes/gen_node_support.pl +++ b/src/backend/nodes/gen_node_support.pl @@ -862,7 +862,8 @@ _equal${n}(const $n *a, const $n *b) print $eff "\tCOMPARE_ARRAY_FIELD($f);\n" unless $equal_ignore; } elsif ($t eq 'struct CustomPathMethods*' - || $t eq 'struct CustomScanMethods*') + || $t eq 'struct CustomScanMethods*' + || $t eq 'struct CustomJoinMethods*') { # Fields of these types are required to be a pointer to a # static table of callback functions. So we don't copy @@ -1222,6 +1223,29 @@ _read${n}(void) methods = GetCustomScanMethods(custom_name, false); local_node->methods = methods; } +! unless $no_read; + } + elsif ($t eq 'struct CustomJoinMethods*') + { + # FIXME it's a bit weird the condition above has both + # CustomPathMethods and CustomScanMethods, and this + # only has CustomJoinMethods, possibly incomplete + print $off q{ + /* CustomName is a key to lookup CustomJoinMethods */ + appendStringInfoString(str, " :methods "); + outToken(str, node->methods->CustomName); +}; + print $rff q! + { + /* Lookup CustomJoinMethods by CustomName */ + char *custom_name; + const CustomJoinMethods *methods; + token = pg_strtok(&length); /* skip methods: */ + token = pg_strtok(&length); /* CustomName */ + custom_name = nullable_string(token, length); + methods = GetCustomJoinMethods(custom_name, false); + local_node->methods = methods; + } ! unless $no_read; } else diff --git a/src/backend/nodes/nodeFuncs.c b/src/backend/nodes/nodeFuncs.c index 7bc823507f1..abc0933f38f 100644 --- a/src/backend/nodes/nodeFuncs.c +++ b/src/backend/nodes/nodeFuncs.c @@ -4789,6 +4789,13 @@ planstate_tree_walker_impl(PlanState *planstate, return true; } break; + case T_CustomJoin: + foreach(lc, ((CustomJoinState *) planstate)->custom_ps) + { + if (PSWALK(lfirst(lc))) + return true; + } + break; default: break; } diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index 8a9f1d7a943..49fd9fd5f9d 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -160,6 +160,8 @@ static ForeignScan *create_foreignscan_plan(PlannerInfo *root, ForeignPath *best static CustomScan *create_customscan_plan(PlannerInfo *root, CustomPath *best_path, List *tlist, List *scan_clauses); +static CustomJoin *create_customjoin_plan(PlannerInfo *root, + CustomPath *best_path); static NestLoop *create_nestloop_plan(PlannerInfo *root, NestPath *best_path); static MergeJoin *create_mergejoin_plan(PlannerInfo *root, MergePath *best_path); static HashJoin *create_hashjoin_plan(PlannerInfo *root, HashPath *best_path); @@ -415,6 +417,7 @@ create_plan_recurse(PlannerInfo *root, Path *best_path, int flags) case T_HashJoin: case T_MergeJoin: case T_NestLoop: + case T_CustomJoin: plan = create_join_plan(root, (JoinPath *) best_path); break; @@ -1097,6 +1100,10 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path) plan = (Plan *) create_nestloop_plan(root, (NestPath *) best_path); break; + case T_CustomJoin: + plan = (Plan *) create_customjoin_plan(root, + (CustomPath *) best_path); + break; default: elog(ERROR, "unrecognized node type: %d", (int) best_path->path.pathtype); @@ -4380,6 +4387,72 @@ create_customscan_plan(PlannerInfo *root, CustomPath *best_path, return cplan; } +/* + * create_customjoin_plan + * + * Transform a CustomPath into a Plan. + * + * XXX likely very incomplete, need to look at the other join nodes + */ +static CustomJoin * +create_customjoin_plan(PlannerInfo *root, CustomPath *best_path) +{ + CustomJoin *cplan; + RelOptInfo *rel = best_path->path.parent; + List *custom_plans = NIL; + ListCell *lc; + List *tlist = build_path_tlist(root, &best_path->path); + List *scan_clauses = NIL; + + /* Recursively transform child paths. */ + foreach(lc, best_path->custom_paths) + { + Plan *plan = create_plan_recurse(root, (Path *) lfirst(lc), + CP_EXACT_TLIST); + + custom_plans = lappend(custom_plans, plan); + } + + /* + * Invoke custom plan provider to create the Plan node represented by the + * CustomPath. + */ + cplan = castNode(CustomJoin, + best_path->methods->PlanCustomPath(root, + rel, + best_path, + tlist, + scan_clauses, + custom_plans)); + + /* + * Copy cost data from Path to Plan; no need to make custom-plan providers + * do this + */ + copy_generic_path_info(&cplan->join.plan, &best_path->path); + + /* Likewise, copy the relids that are represented by this custom scan */ + cplan->custom_relids = best_path->path.parent->relids; + + /* + * Replace any outer-relation variables with nestloop params in the qual + * and custom_exprs expressions. We do this last so that the custom-plan + * provider doesn't have to be involved. (Note that parts of custom_exprs + * could have come from join clauses, so doing this beforehand on the + * scan_clauses wouldn't work.) We assume custom_scan_tlist contains no + * such variables. + */ + if (best_path->path.param_info) + { + cplan->join.plan.qual = (List *) + replace_nestloop_params(root, (Node *) cplan->join.plan.qual); + cplan->custom_exprs = (List *) + replace_nestloop_params(root, (Node *) cplan->custom_exprs); + } + + return cplan; +} + /***************************************************************************** * diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 846e44186c3..805cd26107b 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -149,6 +149,9 @@ static void set_foreignscan_references(PlannerInfo *root, static void set_customscan_references(PlannerInfo *root, CustomScan *cscan, int rtoffset); +static void set_customjoin_references(PlannerInfo *root, + CustomJoin *cjoin, + int rtoffset); static Plan *set_append_references(PlannerInfo *root, Append *aplan, int rtoffset); @@ -852,6 +855,7 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) case T_NestLoop: case T_MergeJoin: case T_HashJoin: + case T_CustomJoin: set_join_references(root, (Join *) plan, rtoffset); break; @@ -1738,6 +1742,41 @@ set_customscan_references(PlannerInfo *root, cscan->custom_relids = offset_relid_set(cscan->custom_relids, rtoffset); } +/* + * set_customscan_references + * Do set_plan_references processing on a CustomJoin + * + * FIXME some of this is probably duplicate with set_join_references + */ +static void +set_customjoin_references(PlannerInfo *root, + CustomJoin *cjoin, + int rtoffset) +{ + ListCell *lc; + + if (cjoin->custom_join_tlist != NIL) + { + /* custom_scan_tlist itself just needs fix_scan_list() adjustments */ + cjoin->custom_join_tlist = + fix_scan_list(root, cjoin->custom_join_tlist, + rtoffset, NUM_EXEC_TLIST((Plan *) cjoin)); + } + + /* Adjust custom_exprs in the standard way */ + cjoin->custom_exprs = + fix_scan_list(root, cjoin->custom_exprs, + rtoffset, NUM_EXEC_QUAL((Plan *) cjoin)); + + /* Adjust child plan-nodes recursively, if needed */ + foreach(lc, cjoin->custom_plans) + { + lfirst(lc) = set_plan_refs(root, (Plan *) lfirst(lc), rtoffset); + } + + cjoin->custom_relids = offset_relid_set(cjoin->custom_relids, rtoffset); +} + /* * register_partpruneinfo * Subroutine for set_append_references and set_mergeappend_references @@ -2423,6 +2462,11 @@ set_join_references(PlannerInfo *root, Join *join, int rtoffset) NRM_EQUAL, NUM_EXEC_QUAL((Plan *) join)); } + else if (IsA(join, CustomJoin)) + { + /* FIXME needs to do something more? */ + set_customjoin_references(root, (CustomJoin *) join, rtoffset); + } /* * Now we need to fix up the targetlist and qpqual, which are logically diff --git a/src/backend/optimizer/plan/subselect.c b/src/backend/optimizer/plan/subselect.c index d71ed958e31..7d5f5961a04 100644 --- a/src/backend/optimizer/plan/subselect.c +++ b/src/backend/optimizer/plan/subselect.c @@ -2844,6 +2844,32 @@ finalize_plan(PlannerInfo *root, Plan *plan, &context); break; + case T_CustomJoin: + /* FIXME needs to worry about joinqual etc? */ + { + CustomJoin *cjoin = (CustomJoin *) plan; + ListCell *lc; + + finalize_primnode((Node *) cjoin->custom_exprs, + &context); + /* We assume custom_scan_tlist cannot contain Params */ + context.paramids = + bms_add_members(context.paramids, scan_params); + + /* child nodes if any */ + foreach(lc, cjoin->custom_plans) + { + context.paramids = + bms_add_members(context.paramids, + finalize_plan(root, + (Plan *) lfirst(lc), + gather_param, + valid_params, + scan_params)); + } + } + break; + case T_Hash: finalize_primnode((Node *) ((Hash *) plan)->hashkeys, &context); diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index 3d6e6bdbfd2..0f449d4f35c 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -5220,6 +5220,8 @@ set_deparse_plan(deparse_namespace *dpns, Plan *plan) dpns->index_tlist = ((ForeignScan *) plan)->fdw_scan_tlist; else if (IsA(plan, CustomScan)) dpns->index_tlist = ((CustomScan *) plan)->custom_scan_tlist; + else if (IsA(plan, CustomJoin)) + dpns->index_tlist = ((CustomJoin *) plan)->custom_join_tlist; else dpns->index_tlist = NIL; } diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index bc0ca512e09..55e328c6b6f 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -26,6 +26,14 @@ extern void ExecReScanCustomScan(CustomScanState *node); extern void ExecCustomMarkPos(CustomScanState *node); extern void ExecCustomRestrPos(CustomScanState *node); + +extern CustomJoinState *ExecInitCustomJoin(CustomJoin *cscan, + EState *estate, int eflags); +extern void ExecEndCustomJoin(CustomJoinState *node); + +extern void ExecReScanCustomJoin(CustomJoinState *node); + + /* * Parallel execution support */ @@ -39,4 +47,16 @@ extern void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownCustomScan(CustomScanState *node); + +extern void ExecCustomJoinEstimate(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinInitializeDSM(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinReInitializeDSM(CustomJoinState *node, + ParallelContext *pcxt); +extern void ExecCustomJoinInitializeWorker(CustomJoinState *node, + ParallelWorkerContext *pwcxt); +extern void ExecShutdownCustomJoin(CustomJoinState *node); + + #endif /* NODECUSTOM_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index e107d6e5f81..6c5ab8f5229 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -2129,7 +2129,7 @@ typedef struct ForeignScanState * the BeginCustomScan method. * ---------------- */ -struct CustomExecMethods; +struct CustomScanExecMethods; typedef struct CustomScanState { @@ -2138,7 +2138,7 @@ typedef struct CustomScanState * nodes/extensible.h */ List *custom_ps; /* list of child PlanState nodes, if any */ Size pscan_len; /* size of parallel coordination information */ - const struct CustomExecMethods *methods; + const struct CustomScanExecMethods *methods; const struct TupleTableSlotOps *slotOps; } CustomScanState; @@ -2275,6 +2275,32 @@ typedef struct HashJoinState bool hj_OuterNotEmpty; } HashJoinState; +/* ---------------- + * CustomJoinState information + * + * CustomJoin nodes are used to execute custom code within executor. + * + * Core code must avoid assuming that the CustomJoinState is only as large as + * the structure declared here; providers are allowed to make it the first + * element in a larger structure, and typically would need to do so. The + * struct is actually allocated by the CreateCustomJoinState method associated + * with the plan node. Any additional fields can be initialized there, or in + * the BeginCustomJoin method. + * ---------------- + */ +struct CustomJoinExecMethods; + +typedef struct CustomJoinState +{ + JoinState js; + uint32 flags; /* mask of CUSTOMPATH_* flags, see + * nodes/extensible.h */ + List *custom_ps; /* list of child PlanState nodes, if any */ + Size pscan_len; /* size of parallel coordination information */ + const struct CustomJoinExecMethods *methods; + const struct TupleTableSlotOps *slotOps; +} CustomJoinState; + /* ---------------------------------------------------------------- * Materialization State Information diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 1129c4ba4b1..a4c0a33a3f3 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -121,7 +121,7 @@ typedef struct CustomScanMethods * Execution-time methods for a CustomScanState. This is more complex than * what we need for a custom path or scan. */ -typedef struct CustomExecMethods +typedef struct CustomScanExecMethods { const char *CustomName; @@ -155,10 +155,66 @@ typedef struct CustomExecMethods void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); -} CustomExecMethods; +} CustomScanExecMethods; extern void RegisterCustomScanMethods(const CustomScanMethods *methods); extern const CustomScanMethods *GetCustomScanMethods(const char *CustomName, bool missing_ok); +/* + * Custom join. Here again, there's not much to do: we need to be able to + * generate a JoinState corresponding to the join. + */ +typedef struct CustomJoinMethods +{ + const char *CustomName; + + /* Create execution state (CustomJoinState) from a CustomJoin plan node */ + Node *(*CreateCustomJoinState) (CustomJoin *cscan); +} CustomJoinMethods; + +/* + * Execution-time methods for a CustomJoinState. This is more complex than + * what we need for a custom path or scan. + */ +typedef struct CustomJoinExecMethods +{ + const char *CustomName; + + /* Required executor methods */ + void (*BeginCustomJoin) (CustomJoinState *node, + EState *estate, + int eflags); + TupleTableSlot *(*ExecCustomJoin) (CustomJoinState *node); + void (*EndCustomJoin) (CustomJoinState *node); + void (*ReScanCustomJoin) (CustomJoinState *node); + + /* Optional methods: needed if mark/restore is supported */ + void (*MarkPosCustomJoin) (CustomJoinState *node); + void (*RestrPosCustomJoin) (CustomJoinState *node); + + /* Optional methods: needed if parallel execution is supported */ + Size (*EstimateDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt); + void (*InitializeDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt, + void *coordinate); + void (*ReInitializeDSMCustomJoin) (CustomJoinState *node, + ParallelContext *pcxt, + void *coordinate); + void (*InitializeWorkerCustomJoin) (CustomJoinState *node, + shm_toc *toc, + void *coordinate); + void (*ShutdownCustomJoin) (CustomJoinState *node); + + /* Optional: print additional information in EXPLAIN */ + void (*ExplainCustomJoin) (CustomJoinState *node, + List *ancestors, + ExplainState *es); +} CustomJoinExecMethods; + +extern void RegisterCustomJoinMethods(const CustomJoinMethods *methods); +extern const CustomJoinMethods *GetCustomJoinMethods(const char *CustomName, + bool missing_ok); + #endif /* EXTENSIBLE_H */ diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 46e2e09ea35..0f10514c835 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -1024,6 +1024,44 @@ typedef struct HashJoin List *hashkeys; } HashJoin; +/* ---------------- + * CustomJoin node + * + * The comments for ForeignScan's fdw_exprs, fdw_private, fdw_scan_tlist, + * and fs_relids fields apply equally to CustomJoin's custom_exprs, + * custom_private, custom_join_tlist, and custom_relids fields. + * + * Note that since Plan trees can be copied, custom join providers *must* + * fit all plan data they need into those fields; embedding CustomJoin in + * a larger struct will not work. + * ---------------- + */ +struct CustomJoinMethods; + +typedef struct CustomJoin +{ + Join join; + /* mask of CUSTOMPATH_* flags, see nodes/extensible.h */ + uint32 flags; + /* list of Plan nodes, if any */ + List *custom_plans; + /* expressions that custom code may evaluate */ + List *custom_exprs; + /* private data for custom code */ + List *custom_private; + /* optional tlist describing scan tuple */ + List *custom_join_tlist; + /* RTIs generated by this scan */ + Bitmapset *custom_relids; + + /* + * NOTE: The method field of CustomJoin is required to be a pointer to a + * static table of callback functions. So we don't copy the table itself, + * just reference the original one. + */ + const struct CustomJoinMethods *methods; +} CustomJoin; + /* ---------------- * materialization node * ---------------- diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 4353befab99..8b8d3a6529a 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -579,7 +579,11 @@ CteState CtlCommand CtxtHandle CurrentOfExpr -CustomExecMethods +CustomJoin +CustomJoinMethods +CustomJoinState +CustomJoinExecMethods +CustomScanExecMethods CustomOutPtrType CustomPath CustomScan -- 2.50.1