diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c index d8d79e972e..fc2b2742fe 100644 --- a/src/backend/executor/execAsync.c +++ b/src/backend/executor/execAsync.c @@ -18,6 +18,7 @@ #include "executor/executor.h" #include "executor/nodeAppend.h" #include "executor/nodeForeignscan.h" +#include "executor/nodeCustom.h" /* * Asynchronously request a tuple from a designed async-capable node. @@ -37,6 +38,9 @@ ExecAsyncRequest(AsyncRequest *areq) case T_ForeignScanState: ExecAsyncForeignScanRequest(areq); break; + case T_CustomScanState: + ExecAsyncCustomScanRequest(areq); + break; default: /* If the node doesn't support async, caller messed up. */ elog(ERROR, "unrecognized node type: %d", @@ -70,6 +74,9 @@ ExecAsyncConfigureWait(AsyncRequest *areq) case T_ForeignScanState: ExecAsyncForeignScanConfigureWait(areq); break; + case T_CustomScanState: + ExecAsyncCustomScanConfigureWait(areq); + break; default: /* If the node doesn't support async, caller messed up. */ elog(ERROR, "unrecognized node type: %d", @@ -96,6 +103,9 @@ ExecAsyncNotify(AsyncRequest *areq) case T_ForeignScanState: ExecAsyncForeignScanNotify(areq); break; + case T_CustomScanState: + ExecAsyncCustomScanNotify(areq); + break; default: /* If the node doesn't support async, caller messed up. */ elog(ERROR, "unrecognized node type: %d", diff --git a/src/backend/executor/nodeCustom.c b/src/backend/executor/nodeCustom.c index 8f56bd8a23..5c52caea95 100644 --- a/src/backend/executor/nodeCustom.c +++ b/src/backend/executor/nodeCustom.c @@ -226,3 +226,36 @@ ExecShutdownCustomScan(CustomScanState *node) if (methods->ShutdownCustomScan) methods->ShutdownCustomScan(node); } + +/* + * Asynchronous execution support + */ +void +ExecAsyncCustomScanRequest(AsyncRequest *areq) +{ + CustomScanState *node = (CustomScanState *) areq->requestee; + const CustomExecMethods *methods = node->methods; + + Assert(methods->CustomScanAsyncRequest != NULL); + methods->CustomScanAsyncRequest(areq); +} + +void +ExecAsyncCustomScanConfigureWait(AsyncRequest *areq) +{ + CustomScanState *node = (CustomScanState *) areq->requestee; + const CustomExecMethods *methods = node->methods; + + Assert(methods->CustomScanAsyncConfigureWait != NULL); + methods->CustomScanAsyncConfigureWait(areq); +} + +void +ExecAsyncCustomScanNotify(AsyncRequest *areq) +{ + CustomScanState *node = (CustomScanState *) areq->requestee; + const CustomExecMethods *methods = node->methods; + + Assert(methods->CustomScanAsyncNotify != NULL); + methods->CustomScanAsyncNotify(areq); +} diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index e37f2933eb..ebd4fbb5b0 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -1175,6 +1175,20 @@ mark_async_capable_plan(Plan *plan, Path *path) ((ProjectionPath *) path)->subpath)) return true; return false; + case T_CustomPath: + { + CustomPath *customPath = castNode(CustomPath, path); + /* + * If the generated plan node includes a Result node for the + * projection, we can't execute it asynchronously. + */ + if (IsA(plan, Result)) + return false; + + if (customPath->flags & CUSTOMPATH_SUPPORT_ASYNC_EXECUTION) + break; + return false; + } default: return false; } diff --git a/src/include/executor/nodeCustom.h b/src/include/executor/nodeCustom.h index 5ef890144f..c2efdd20e6 100644 --- a/src/include/executor/nodeCustom.h +++ b/src/include/executor/nodeCustom.h @@ -39,4 +39,11 @@ extern void ExecCustomScanInitializeWorker(CustomScanState *node, ParallelWorkerContext *pwcxt); extern void ExecShutdownCustomScan(CustomScanState *node); +/* + * Asynchronous execution support + */ +extern void ExecAsyncCustomScanRequest(AsyncRequest *areq); +extern void ExecAsyncCustomScanConfigureWait(AsyncRequest *areq); +extern void ExecAsyncCustomScanNotify(AsyncRequest *areq); + #endif /* NODECUSTOM_H */ diff --git a/src/include/nodes/extensible.h b/src/include/nodes/extensible.h index 34936db894..59f29ed54c 100644 --- a/src/include/nodes/extensible.h +++ b/src/include/nodes/extensible.h @@ -84,6 +84,7 @@ extern const ExtensibleNodeMethods *GetExtensibleNodeMethods(const char *name, #define CUSTOMPATH_SUPPORT_BACKWARD_SCAN 0x0001 #define CUSTOMPATH_SUPPORT_MARK_RESTORE 0x0002 #define CUSTOMPATH_SUPPORT_PROJECTION 0x0004 +#define CUSTOMPATH_SUPPORT_ASYNC_EXECUTION 0x0008 /* * Custom path methods. Mostly, we just need to know how to convert a @@ -155,6 +156,11 @@ typedef struct CustomExecMethods void (*ExplainCustomScan) (CustomScanState *node, List *ancestors, ExplainState *es); + + /* Support functions for asynchronous execution */ + void (*CustomScanAsyncRequest) (AsyncRequest *areq); + void (*CustomScanAsyncConfigureWait) (AsyncRequest *areq); + void (*CustomScanAsyncNotify) (AsyncRequest *areq); } CustomExecMethods; extern void RegisterCustomScanMethods(const CustomScanMethods *methods);