From 297de9f38c1b4b1dbdb7812081b6c8058aff6b33 Mon Sep 17 00:00:00 2001 From: Alvaro Herrera Date: Wed, 20 Oct 2010 16:27:36 -0300 Subject: [PATCH 2/2] The remainder of Marko's patch --- src/backend/executor/functions.c | 261 ++++++++++++++++++++++++++------------ src/backend/executor/spi.c | 18 +++- src/backend/tcop/pquery.c | 36 ++++- 3 files changed, 223 insertions(+), 92 deletions(-) diff --git a/src/backend/executor/functions.c b/src/backend/executor/functions.c index 46433d0..6310fa1 100644 --- a/src/backend/executor/functions.c +++ b/src/backend/executor/functions.c @@ -90,18 +90,18 @@ typedef struct ParamListInfo paramLI; /* Param list representing current args */ Tuplestorestate *tstore; /* where we accumulate result tuples */ + Snapshot snapshot; JunkFilter *junkFilter; /* will be NULL if function returns VOID */ - /* head of linked list of execution_state records */ - execution_state *func_state; + List *func_state; } SQLFunctionCache; typedef SQLFunctionCache *SQLFunctionCachePtr; /* non-export function prototypes */ -static execution_state *init_execution_state(List *queryTree_list, +static List *init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK); static void init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK); @@ -123,61 +123,78 @@ static void sqlfunction_destroy(DestReceiver *self); /* Set up the list of per-query execution_state records for a SQL function */ -static execution_state * +static List * init_execution_state(List *queryTree_list, SQLFunctionCachePtr fcache, bool lazyEvalOK) { - execution_state *firstes = NULL; - execution_state *preves = NULL; + execution_state *firstes; + execution_state *preves; execution_state *lasttages = NULL; - ListCell *qtl_item; + List *eslist; + ListCell *lc1; + ListCell *lc2; + List *qtlist; + Query *queryTree; + + + eslist = NIL; - foreach(qtl_item, queryTree_list) + foreach(lc1, queryTree_list) { - Query *queryTree = (Query *) lfirst(qtl_item); - Node *stmt; - execution_state *newes; + qtlist = (List *) lfirst(lc1); + firstes = NULL; + preves = NULL; - Assert(IsA(queryTree, Query)); + foreach(lc2, qtlist) + { + Node *stmt; + execution_state *newes; - if (queryTree->commandType == CMD_UTILITY) - stmt = queryTree->utilityStmt; - else - stmt = (Node *) pg_plan_query(queryTree, 0, NULL); + queryTree = (Query *) lfirst(lc2); - /* Precheck all commands for validity in a function */ - if (IsA(stmt, TransactionStmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a SQL function", - CreateCommandTag(stmt)))); + Assert(IsA(queryTree, Query)); - if (fcache->readonly_func && !CommandIsReadOnly(stmt)) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - /* translator: %s is a SQL statement name */ - errmsg("%s is not allowed in a non-volatile function", - CreateCommandTag(stmt)))); + if (queryTree->commandType == CMD_UTILITY) + stmt = queryTree->utilityStmt; + else + stmt = (Node *) pg_plan_query(queryTree, 0, NULL); - newes = (execution_state *) palloc(sizeof(execution_state)); - if (preves) - preves->next = newes; - else - firstes = newes; + /* Precheck all commands for validity in a function */ + if (IsA(stmt, TransactionStmt)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a SQL function", + CreateCommandTag(stmt)))); - newes->next = NULL; - newes->status = F_EXEC_START; - newes->setsResult = false; /* might change below */ - newes->lazyEval = false; /* might change below */ - newes->stmt = stmt; - newes->qd = NULL; + if (fcache->readonly_func && !CommandIsReadOnly(stmt)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + /* translator: %s is a SQL statement name */ + errmsg("%s is not allowed in a non-volatile function", + CreateCommandTag(stmt)))); + + newes = (execution_state *) palloc(sizeof(execution_state)); + if (preves) + preves->next = newes; + else + firstes = newes; - if (queryTree->canSetTag) - lasttages = newes; + newes->next = NULL; + newes->status = F_EXEC_START; + newes->setsResult = false; /* might change below */ + newes->lazyEval = false; /* might change below */ + newes->stmt = stmt; + newes->qd = NULL; - preves = newes; + if (queryTree->canSetTag) + lasttages = newes; + + preves = newes; + } + + eslist = lappend(eslist, firstes); } /* @@ -210,7 +227,7 @@ init_execution_state(List *queryTree_list, } } - return firstes; + return eslist; } /* Initialize the SQLFunctionCache for a SQL function */ @@ -388,24 +405,11 @@ init_sql_fcache(FmgrInfo *finfo, bool lazyEvalOK) static void postquel_start(execution_state *es, SQLFunctionCachePtr fcache) { - Snapshot snapshot; DestReceiver *dest; Assert(es->qd == NULL); - /* - * In a read-only function, use the surrounding query's snapshot; - * otherwise take a new snapshot for each query. The snapshot should - * include a fresh command ID so that all work to date in this transaction - * is visible. - */ - if (fcache->readonly_func) - snapshot = GetActiveSnapshot(); - else - { - CommandCounterIncrement(); - snapshot = GetTransactionSnapshot(); - } + Assert(ActiveSnapshotSet()); /* * If this query produces the function result, send its output to the @@ -429,13 +433,14 @@ postquel_start(execution_state *es, SQLFunctionCachePtr fcache) if (IsA(es->stmt, PlannedStmt)) es->qd = CreateQueryDesc((PlannedStmt *) es->stmt, fcache->src, - snapshot, InvalidSnapshot, + GetActiveSnapshot(), + InvalidSnapshot, dest, fcache->paramLI, 0); else es->qd = CreateUtilityQueryDesc(es->stmt, fcache->src, - snapshot, + GetActiveSnapshot(), dest, fcache->paramLI); @@ -631,6 +636,8 @@ fmgr_sql(PG_FUNCTION_ARGS) execution_state *es; TupleTableSlot *slot; Datum result; + List *eslist; + ListCell *eslc; /* * Switch to context in which the fcache lives. This ensures that @@ -682,13 +689,13 @@ fmgr_sql(PG_FUNCTION_ARGS) init_sql_fcache(fcinfo->flinfo, lazyEvalOK); fcache = (SQLFunctionCachePtr) fcinfo->flinfo->fn_extra; } - es = fcache->func_state; + eslist = fcache->func_state; /* * Convert params to appropriate format if starting a fresh execution. (If * continuing execution, we can re-use prior params.) */ - if (es && es->status == F_EXEC_START) + if (linitial(eslist) && ((execution_state *) linitial(eslist))->status == F_EXEC_START) postquel_sub_params(fcache, fcinfo); /* @@ -701,8 +708,18 @@ fmgr_sql(PG_FUNCTION_ARGS) /* * Find first unfinished query in function. */ - while (es && es->status == F_EXEC_DONE) - es = es->next; + + es = NULL; /* keep compiler quiet */ + foreach(eslc, eslist) + { + es = (execution_state *) lfirst(eslc); + + while (es && es->status == F_EXEC_DONE) + es = es->next; + + if (es) + break; + } /* * Execute each command in the function one after another until we either @@ -713,8 +730,31 @@ fmgr_sql(PG_FUNCTION_ARGS) bool completed; if (es->status == F_EXEC_START) + { + if (!fcache->readonly_func) + { + /* + * In a read-only function, use the surrounding query's snapshot; + * otherwise take a new snapshot if we don't have one yet. The + * snapshot should include a fresh command ID so that all work to + * date in this transaction is visible. + */ + if (!fcache->snapshot) + { + CommandCounterIncrement(); + fcache->snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(fcache->snapshot); + } + else + PushUpdatedSnapshot(fcache->snapshot); + } + postquel_start(es, fcache); + if (!fcache->readonly_func) + PopActiveSnapshot(); + } + completed = postquel_getnext(es, fcache); /* @@ -740,6 +780,25 @@ fmgr_sql(PG_FUNCTION_ARGS) if (es->status != F_EXEC_DONE) break; es = es->next; + + if (!es) + { + eslc = lnext(eslc); + if (!eslc) + break; + + es = (execution_state *) lfirst(eslc); + + /* make sure we take a new snapshot for this query list */ + if (!fcache->readonly_func) + { + Assert(fcache->snapshot != InvalidSnapshot); + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + } + else + Assert(fcache->snapshot == InvalidSnapshot); + } } /* @@ -808,6 +867,11 @@ fmgr_sql(PG_FUNCTION_ARGS) PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } else { @@ -834,6 +898,11 @@ fmgr_sql(PG_FUNCTION_ARGS) PointerGetDatum(fcache)); fcache->shutdown_reg = false; } + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } } else @@ -864,6 +933,11 @@ fmgr_sql(PG_FUNCTION_ARGS) /* Clear the tuplestore, but keep it for next time */ tuplestore_clear(fcache->tstore); + + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; } /* @@ -872,11 +946,14 @@ fmgr_sql(PG_FUNCTION_ARGS) */ if (es == NULL) { - es = fcache->func_state; - while (es) + foreach(eslc, fcache->func_state) { - es->status = F_EXEC_START; - es = es->next; + es = (execution_state *) lfirst(eslc); + while (es) + { + es->status = F_EXEC_START; + es = es->next; + } } } @@ -927,19 +1004,24 @@ sql_exec_error_callback(void *arg) { execution_state *es; int query_num; + ListCell *lc; - es = fcache->func_state; + es = NULL; /* keep compiler quiet */ query_num = 1; - while (es) + foreach(lc, fcache->func_state) { - if (es->qd) + es = (execution_state *) lfirst(lc); + while (es) { - errcontext("SQL function \"%s\" statement %d", - fcache->fname, query_num); - break; + if (es->qd) + { + errcontext("SQL function \"%s\" statement %d", + fcache->fname, query_num); + break; + } + es = es->next; + query_num++; } - es = es->next; - query_num++; } if (es == NULL) { @@ -970,18 +1052,29 @@ static void ShutdownSQLFunction(Datum arg) { SQLFunctionCachePtr fcache = (SQLFunctionCachePtr) DatumGetPointer(arg); - execution_state *es = fcache->func_state; + execution_state *es; + ListCell *lc; - while (es != NULL) + foreach(lc, fcache->func_state) { - /* Shut down anything still running */ - if (es->status == F_EXEC_RUN) - postquel_end(es); - /* Reset states to START in case we're called again */ - es->status = F_EXEC_START; - es = es->next; + es = (execution_state *) lfirst(lc); + + while (es) + { + /* Shut down anything still running */ + if (es->status == F_EXEC_RUN) + postquel_end(es); + /* Reset states to START in case we're called again */ + es->status = F_EXEC_START; + es = es->next; + } } + /* Unregister snapshot if we have one */ + if (fcache->snapshot != InvalidSnapshot) + UnregisterSnapshot(fcache->snapshot); + fcache->snapshot = InvalidSnapshot; + /* Release tuplestore if we have one */ if (fcache->tstore) tuplestore_end(fcache->tstore); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index c579017..2998b3a 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -1769,6 +1769,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, SPITupleTable *my_tuptable = NULL; int res = 0; bool have_active_snap = ActiveSnapshotSet(); + bool registered_snap = false; ErrorContextCallback spierrcontext; CachedPlan *cplan = NULL; ListCell *lc1; @@ -1872,8 +1873,10 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, } else { - PushActiveSnapshot(GetTransactionSnapshot()); + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); pushed_active_snap = true; + registered_snap = true; } } else @@ -1966,10 +1969,23 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI, */ if (!read_only) CommandCounterIncrement(); + + /* + * If we took a new snapshot for this query list, unregister it and + * make sure we take a new one for the next list. + */ + if (registered_snap) + { + UnregisterSnapshot(snapshot); + snapshot = InvalidSnapshot; + } } fail: + if (registered_snap) + UnregisterSnapshot(snapshot); + /* We no longer need the cached plan refcount, if any */ if (cplan) ReleaseCachedPlan(cplan, true); diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 8eb02da..b475e11 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -170,11 +170,6 @@ ProcessQuery(PlannedStmt *plan, elog(DEBUG3, "ProcessQuery"); /* - * Must always set a snapshot for plannable queries. - */ - PushActiveSnapshot(GetTransactionSnapshot()); - - /* * Create the QueryDesc object */ queryDesc = CreateQueryDesc(plan, sourceText, @@ -234,8 +229,6 @@ ProcessQuery(PlannedStmt *plan, /* Now take care of any queued AFTER triggers */ AfterTriggerEndQuery(queryDesc->estate); - PopActiveSnapshot(); - /* * Now, we close down all the scans and free allocated resources. */ @@ -1220,6 +1213,7 @@ PortalRunMulti(Portal portal, bool isTopLevel, char *completionTag) { ListCell *stmtlist_item; + Snapshot snapshot = InvalidSnapshot; /* * If the destination is DestRemoteExecute, change to DestNone. The @@ -1262,6 +1256,15 @@ PortalRunMulti(Portal portal, bool isTopLevel, if (log_executor_stats) ResetUsage(); + /* if no snapshot is set, grab a new one and register it */ + if (snapshot == InvalidSnapshot) + { + snapshot = RegisterSnapshot(GetTransactionSnapshot()); + PushActiveSnapshot(snapshot); + } + else + PushUpdatedSnapshot(snapshot); + if (pstmt->canSetTag) { /* statement can set tag string */ @@ -1279,6 +1282,8 @@ PortalRunMulti(Portal portal, bool isTopLevel, altdest, NULL); } + PopActiveSnapshot(); + if (log_executor_stats) ShowUsage("EXECUTOR STATISTICS"); @@ -1291,11 +1296,25 @@ PortalRunMulti(Portal portal, bool isTopLevel, * * These are assumed canSetTag if they're the only stmt in the * portal. + * + * NotifyStmt is the only utility statement allowed in a list of + * rewritten queries, and it doesn't need a snapshot so we don't + * need to worry about it. However, if the list has only one + * statement and it's a utility statement, we are not allowed to + * take a snapshot. See the first comment in PortalRunUtility(). */ if (list_length(portal->stmts) == 1) + { + Assert(snapshot == InvalidSnapshot); + PortalRunUtility(portal, stmt, isTopLevel, dest, completionTag); + } else + { + Assert(IsA(stmt, NotifyStmt)); + PortalRunUtility(portal, stmt, isTopLevel, altdest, NULL); + } } /* @@ -1313,6 +1332,9 @@ PortalRunMulti(Portal portal, bool isTopLevel, MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } + if (snapshot != InvalidSnapshot) + UnregisterSnapshot(snapshot); + /* * If a command completion tag was supplied, use it. Otherwise use the * portal's commandTag as the default completion tag. -- 1.7.1