diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8eb02da..0940354 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -18,6 +18,7 @@ #include "access/xact.h" #include "commands/prepare.h" #include "commands/trigger.h" +#include "executor/executor.h" #include "executor/tstoreReceiver.h" #include "miscadmin.h" #include "pg_trace.h" @@ -38,11 +39,13 @@ Portal ActivePortal = NULL; static void ProcessQuery(PlannedStmt *plan, const char *sourceText, ParamListInfo params, + List *derived, DestReceiver *dest, char *completionTag); static void FillPortalStore(Portal portal, bool isTopLevel); static uint32 RunFromStore(Portal portal, ScanDirection direction, long count, DestReceiver *dest); +static void FillDerivedTables(Portal portal, PlannedStmt *pstmt); static long PortalRunSelect(Portal portal, bool forward, long count, DestReceiver *dest); static void PortalRunUtility(Portal portal, Node *utilityStmt, bool isTopLevel, @@ -67,7 +70,8 @@ CreateQueryDesc(PlannedStmt *plannedstmt, Snapshot crosscheck_snapshot, DestReceiver *dest, ParamListInfo params, - int instrument_options) + int instrument_options, + List *derived) { QueryDesc *qd = (QueryDesc *) palloc(sizeof(QueryDesc)); @@ -82,6 +86,7 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->params = params; /* parameter values passed into query */ qd->instrument_options = instrument_options; /* instrumentation * wanted? */ + qd->derived = derived; /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; @@ -162,6 +167,7 @@ static void ProcessQuery(PlannedStmt *plan, const char *sourceText, ParamListInfo params, + List *derived, DestReceiver *dest, char *completionTag) { @@ -179,7 +185,7 @@ ProcessQuery(PlannedStmt *plan, */ queryDesc = CreateQueryDesc(plan, sourceText, GetActiveSnapshot(), InvalidSnapshot, - dest, params, 0); + dest, params, 0, derived); /* * Set up to collect AFTER triggers @@ -272,6 +278,9 @@ ChoosePortalStrategy(List *stmts) { Query *query = (Query *) stmt; + if (query->hasDerived) + return PORTAL_MULTI_QUERY; + if (query->canSetTag) { if (query->commandType == CMD_SELECT && @@ -292,7 +301,12 @@ ChoosePortalStrategy(List *stmts) { PlannedStmt *pstmt = (PlannedStmt *) stmt; - if (pstmt->canSetTag) + if (pstmt->childStmts) + { + /* run multiple queries to fill DerivedTables */ + return PORTAL_MULTI_QUERY; + } + else if (pstmt->canSetTag) { if (pstmt->commandType == CMD_SELECT && pstmt->utilityStmt == NULL && @@ -517,7 +531,8 @@ PortalStart(Portal portal, ParamListInfo params, Snapshot snapshot) InvalidSnapshot, None_Receiver, params, - 0); + 0, + NIL); /* * We do *not* call AfterTriggerBeginQuery() here. We assume @@ -1148,6 +1163,62 @@ RunFromStore(Portal portal, ScanDirection direction, long count, } /* + * FillDerivedTables + */ +static void +FillDerivedTables(Portal portal, PlannedStmt *pstmt) +{ + ListCell *l; + DestReceiver *ddtreceiver; + + ddtreceiver = CreateDestReceiver(DestTuplestore); + + foreach(l, pstmt->childStmts) + { + DerivedTable *dt; + Tuplestorestate *tuplestore; + PlannedStmt *child_pstmt = (PlannedStmt *) lfirst(l); + List *targetlist; + + /* + * If we got a cancel signal in prior command, quit + */ + CHECK_FOR_INTERRUPTS(); + + tuplestore = tuplestore_begin_heap(true, false, work_mem); + tuplestore_set_eflags(tuplestore, EXEC_FLAG_REWIND); + SetTuplestoreDestReceiverParams(ddtreceiver, + tuplestore, + portal->heap, + true); + ProcessQuery(child_pstmt, + portal->sourceText, + portal->portalParams, + portal->ddts, + ddtreceiver, NULL); + + dt = (DerivedTable *) palloc(sizeof(DerivedTable)); + dt->dtTuplestore = tuplestore; + targetlist = FetchStatementTargetList((Node *) child_pstmt); + dt->dtTupleDesc = ExecTypeFromTL(targetlist, false); + + portal->ddts = lappend(portal->ddts, dt); + + /* always increment; parent stmt will be executed after children */ + CommandCounterIncrement(); + + /* + * Clear subsidiary contexts to recover temporary memory. + */ + Assert(PortalGetHeapMemory(portal) == CurrentMemoryContext); + + MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); + } + + (*ddtreceiver->rDestroy) (ddtreceiver); +} + +/* * PortalRunUtility * Execute a utility statement inside a portal. */ @@ -1262,12 +1333,18 @@ PortalRunMulti(Portal portal, bool isTopLevel, if (log_executor_stats) ResetUsage(); + if (pstmt->childStmts) + { + /* pre-execute derived queries in any case */ + FillDerivedTables(portal, pstmt); + } if (pstmt->canSetTag) { /* statement can set tag string */ ProcessQuery(pstmt, portal->sourceText, portal->portalParams, + portal->ddts, dest, completionTag); } else @@ -1276,6 +1353,7 @@ PortalRunMulti(Portal portal, bool isTopLevel, ProcessQuery(pstmt, portal->sourceText, portal->portalParams, + portal->ddts, altdest, NULL); }