From c4ba103232ddee1c86ec381e00b24e19c49b311e Mon Sep 17 00:00:00 2001 From: "okbob@github.com" Date: Mon, 11 Apr 2022 20:32:05 +0200 Subject: [PATCH 02/11] session variables Implementation storage and access routines. Session variables are stored in session memory inside dedicated hash table. Two levels of an access are implemented: API level and SQL level. Both levels are implemented in this commit. The most difficult part is cleaning (reset) of session variable. The content of session variable should be cleaned when related session variable is removed from system catalog. But queue of sinval messages can be truncated, or current transaction state can disallow an access to system catalog, so cleaning can be postponed. --- src/backend/access/transam/xact.c | 2 +- src/backend/catalog/dependency.c | 5 + src/backend/commands/sessionvariable.c | 864 +++++++++++++++++++++++- src/backend/executor/execExpr.c | 75 ++ src/backend/executor/execExprInterp.c | 11 + src/backend/executor/execMain.c | 56 ++ src/backend/executor/execParallel.c | 148 +++- src/backend/jit/llvm/llvmjit_expr.c | 6 + src/backend/nodes/copyfuncs.c | 4 + src/backend/nodes/equalfuncs.c | 3 + src/backend/nodes/outfuncs.c | 6 + src/backend/nodes/readfuncs.c | 4 + src/backend/optimizer/plan/planner.c | 8 + src/backend/optimizer/plan/setrefs.c | 118 +++- src/backend/optimizer/util/clauses.c | 75 +- src/backend/parser/analyze.c | 13 +- src/backend/parser/parse_expr.c | 186 ++++- src/backend/tcop/pquery.c | 3 + src/backend/utils/adt/ruleutils.c | 46 ++ src/backend/utils/cache/plancache.c | 19 +- src/backend/utils/fmgr/fmgr.c | 10 +- src/backend/utils/misc/guc.c | 12 + src/include/commands/session_variable.h | 7 + src/include/executor/execExpr.h | 10 + src/include/executor/execdesc.h | 4 + src/include/nodes/execnodes.h | 19 + src/include/nodes/parsenodes.h | 2 + src/include/nodes/pathnodes.h | 3 + src/include/nodes/plannodes.h | 2 + src/include/nodes/primnodes.h | 12 +- src/include/optimizer/planmain.h | 2 + src/include/parser/parse_expr.h | 1 + src/include/parser/parse_node.h | 1 + 33 files changed, 1704 insertions(+), 33 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 21cc02a199..c22c706ca3 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2214,7 +2214,7 @@ CommitTransaction(void) */ smgrDoPendingSyncs(true, is_parallel_worker); - /* Let ON COMMIT DROP */ + /* Let ON COMMIT DROP or ON TRANSACTION END */ AtPreEOXact_SessionVariable_on_xact_actions(true); /* close large objects before lower-level cleanup */ diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index d5d2a294c3..8ffcaabc46 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1889,6 +1889,11 @@ find_expr_references_walker(Node *node, { Param *param = (Param *) node; + /* A variable parameter depends on the session variable */ + if (param->paramkind == PARAM_VARIABLE) + add_object_address(OCLASS_VARIABLE, param->paramvarid, 0, + context->addrs); + /* A parameter must depend on the parameter's datatype */ add_object_address(OCLASS_TYPE, param->paramtype, 0, context->addrs); diff --git a/src/backend/commands/sessionvariable.c b/src/backend/commands/sessionvariable.c index 213a2b9c4c..e26903d135 100644 --- a/src/backend/commands/sessionvariable.c +++ b/src/backend/commands/sessionvariable.c @@ -28,17 +28,100 @@ #include "parser/parse_collate.h" #include "parser/parse_expr.h" #include "parser/parse_type.h" +#include "rewrite/rewriteHandler.h" +#include "storage/lmgr.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" /* - * The life cycle of temporary session variable can be + * Values of session variables are stored in local memory, in + * sessionvars hash table. This local memory has to be cleaned, + * when: + * - a session variable is dropped by the current or another + * session + * - a user enforce it by using the ON TRANSACTION END RESET + * clause. The life cycle of temporary session variable can be * limmited by using clause ON COMMIT DROP. + * + * Although session variables are not transactional, we don't want + * (and cannot) clean the entries in sessionvars hash table + * immediately, when we get the sinval message. Session variables + * usage is protected by heavyweight locks, so there is no risk of + * unwanted invalidation due to a drop variable done in a + * different session. But it's still possible to drop the session + * variable in the current session. Without delayed cleanup we + * would lose the value if the drop command is done in a sub + * transaction that is then rollbacked. The check of session + * variable validity requires access to system catalog, so it can + * only be done in transaction state). + * + * This is why memory cleanup (session variable reset) is + * postponed to the end of transaction, and why we need to hold + * some actions lists. We have to hold two separate action lists: + * one for dropping the session variable from system catalog, and + * another one for resetting its value. Using two lists simplifies + * implementation. The list with dropped variables should be thrown on + * the end of any transaction. Second list with variables that should + * be resetted or rechecked (and possibly resetted) should be + * preserved to end of some next committed transaction. More, the + * reset actions should be stored in second list, bacause processing + * of the drop action makes new reset action. But when we iterate + * over any list, we cannot to append to this list new fields. So + * second list is required. + * + * We want to support the possibility of resetting a session + * variable at the end of transaction, with the ON TRANSACTION END + * RESET option. This ensures the initial state of session + * variables at the begin of each transaction. The reset is + * implemented as a removal of the session variable from + * sessionvars hash table. This enforce full initialization in + * the next usage. Careful though, this is not same as dropping + * the session variable. + * + * Another functionality is dropping temporary session variable + * with the option ON COMMIT DROP. + * + * There are two different ways how to do final access to session + * variables: buffered (indirect) or direct. Buffered access is used + * in queries, where we have to ensure an stability of passed values + * (then the session variable has same behaviour like external query + * parametes, what is consistent with using PL/pgSQL's variables in + * embdded queries in PL/pgSQL). + * + * This is implemented by using aux buffer (an array) that holds an + * copy of values of used (in query) session variables. In the final + * end, the values from this array is passed as constant (EEOP_CONST). + * + * Direct access is used by simple expression evaluation (PLpgSQL). + * In this case we don't need to ensure the stability of passed + * values, and maintaining the buffer with copy of values of session + * variables can be useless overhead. In this case we just read the + * value of session variable directly (EEOP_PARAM_VARIABLE). This + * strategy remove an necessity to modify related PL/pgSQL code to + * supports session variables (the reading of session variables is + * fully transparent for PL/pgSQL). + * + * The session variable hold a value across transactions. There + * can are two possible problems that should be solved. Sinval + * check can be delayed after oid overflow. Second issue is possible + * change of format of stored value. So before returning of any + * value of session variable, we have to check if returned value + * is safe. If not, we will mark the session variable as broken, + * and any read of this variable until end of session or rewriting + * raises an exception. */ typedef enum SVariableXActAction { SVAR_ON_COMMIT_DROP, /* used for ON COMMIT DROP */ + SVAR_ON_COMMIT_RESET, /* used for DROP VARIABLE */ + SVAR_RESET, /* used for ON TRANSACTION END RESET */ + SVAR_RECHECK /* do appropriate cleanup if the session * + * variable doesn't exist */ } SVariableXActAction; typedef struct SVariableXActActionItem @@ -55,12 +138,618 @@ typedef struct SVariableXActActionItem SubTransactionId deleting_subid; } SVariableXActActionItem; -/* List holds fields of SVariableXActActionItem type */ +/* Both lists hold fields of SVariableXActActionItem type */ static List *xact_drop_actions = NIL; +static List *xact_reset_recheck_actions = NIL; + +typedef struct SVariableData +{ + Oid varid; /* pg_variable OID of this sequence (hash key) */ + Oid typid; /* OID of the data type */ + int16 typlen; + bool typbyval; + bool isnull; + bool freeval; + Datum value; + + bool is_rowtype; /* true when variable is composite */ + bool is_not_null; /* don't allow null values */ + bool is_immutable; /* true when variable is immutable */ + bool has_defexpr; /* true when variable has a default value */ + + bool is_valid; /* true when variable was successfully + * initialized */ + + bool is_broken; /* true when we cannot to return a value + safely */ + + uint32 hashvalue; +} SVariableData; + +typedef SVariableData * SVariable; + +static HTAB *sessionvars = NULL; /* hash table for session variables */ +static MemoryContext SVariableMemoryContext = NULL; + +static bool first_time = true; + +static bool sync_sessionvars_all_is_required = false; +static bool recheck_sessionvars_is_required = false; static void register_session_variable_xact_action(Oid varid, SVariableXActAction action); static void unregister_session_variable_xact_action(Oid varid, SVariableXActAction action); +/* + * Releases stored data from session variable, but preserve the hash entry + * in sessionvars. + */ +static void +free_session_variable_value(SVariable svar) +{ + if (svar->freeval) + pfree(DatumGetPointer(svar->value)); + + /* Clean current value */ + svar->value = (Datum) 0; + svar->isnull = true; + svar->freeval = false; + + /* + * We can mark this session variable as valid when + * it has not default expression, and when null is + * allowed. When it has defexpr, then the content + * will be valid after an assignment or defexp evaluation. + */ + svar->is_valid = !svar->has_defexpr && !svar->is_not_null; +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable(SVariable svar) +{ + free_session_variable_value(svar); + + if (hash_search(sessionvars, + (void *) &svar->varid, + HASH_REMOVE, + NULL) == NULL) + elog(DEBUG1, "hash table corrupted"); +} + +/* + * Release the variable defined by varid from sessionvars + * hashtab. + */ +static void +remove_session_variable_by_id(Oid varid) +{ + SVariable svar; + bool found; + + if (!sessionvars) + return; + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + if (found) + remove_session_variable(svar); +} + +/* + * Callback function for session variable invalidation. + * + * Register SVAR_RECHECK actions on appropriate currently cached + * session variables. Those will be processed later, rechecking against the + * catalog to detect dropped session variable. + */ +static void +pg_variable_cache_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + /* + * There is no guarantee of sessionvars being initialized, even when + * receiving an invalidation callback, as DISCARD [ ALL | VARIABLES ] + * destroys the hash table entirely. + */ + if (!sessionvars) + return; + + /* + * When we know so all session variables should be synchronized, + * then is useless to continue; + */ + if (sync_sessionvars_all_is_required) + return; + + /* + * Since we can't guarantee the exact session variable from its hashValue, + * we have to iterate over all currently known session variables to find + * the ones with the same hashValue. On second hand, this can save us + * some CPU later, because we don't need to check any used + * session variable (by current session) against system catalog. + */ + if (hashvalue != 0) + { + HASH_SEQ_STATUS status; + SVariable svar; + + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + if (svar->hashvalue == hashvalue) + { + register_session_variable_xact_action(svar->varid, SVAR_RECHECK); + recheck_sessionvars_is_required = true; + } + + /* + * although it there is low probability, we have to iterate + * over all actively used session variables, because hashvalue + * is not unique identifier. + */ + } + } + else + sync_sessionvars_all_is_required = true; +} + +/* + * When we need to recheck all session variables, then + * the most effective method is seq scan over hash tab. + * We need to check synchronization request before any + * read to be sure so returned data are valid. + */ +static void +sync_sessionvars_all() +{ + HASH_SEQ_STATUS status; + SVariable svar; + ListCell *l; + + if (!sync_sessionvars_all_is_required && + !recheck_sessionvars_is_required) + return; + + /* + * sessionvars is null after DISCARD VARIABLES. + * When we are sure, so there are not any + * active session variable in this session, we + * can reset sync_sessionvars_all flag. + */ + if (!sessionvars) + { + sync_sessionvars_all_is_required = false; + recheck_sessionvars_is_required = false; + return; + } + + /* + * This routine is called before any reading. So the + * session should be in transaction state. This is required + * for access to system catalog. + */ + Assert(IsTransactionState()); + + if (sync_sessionvars_all_is_required) + { + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + HeapTuple tp; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + ReleaseSysCache(tp); + else + remove_session_variable(svar); + } + + sync_sessionvars_all_is_required = false; + + /* clean list of items that should be rechecked */ + if (recheck_sessionvars_is_required) + { + foreach(l, xact_reset_recheck_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); + + if (xact_ai->action == SVAR_RECHECK) + { + xact_reset_recheck_actions = foreach_delete_current(xact_reset_recheck_actions, l); + pfree(xact_ai); + } + } + + recheck_sessionvars_is_required = false; + } + } + else if (recheck_sessionvars_is_required) + { + /* recheck_sessionvars_is_required */ + foreach(l, xact_reset_recheck_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); + + if (xact_ai->action == SVAR_RECHECK) + { + bool found; + + svar = (SVariable) hash_search(sessionvars, &xact_ai->varid, + HASH_FIND, &found); + + if (found) + { + HeapTuple tp; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + ReleaseSysCache(tp); + else + remove_session_variable(svar); + } + + xact_reset_recheck_actions = foreach_delete_current(xact_reset_recheck_actions, l); + pfree(xact_ai); + } + } + + recheck_sessionvars_is_required = false; + } +} + +/* + * Create the hash table for storing session variables + */ +static void +create_sessionvars_hashtable(void) +{ + HASHCTL ctl; + + /* set callbacks */ + if (first_time) + { + /* Read sinval messages */ + CacheRegisterSyscacheCallback(VARIABLEOID, + pg_variable_cache_callback, + (Datum) 0); + /* needs its own long lived memory context */ + SVariableMemoryContext = + AllocSetContextCreate(TopMemoryContext, + "session variables", + ALLOCSET_START_SMALL_SIZES); + + first_time = false; + } + + Assert(SVariableMemoryContext); + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(Oid); + ctl.entrysize = sizeof(SVariableData); + ctl.hcxt = SVariableMemoryContext; + + Assert(sessionvars == NULL); + + sessionvars = hash_create("Session variables", 64, &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * Assign some content to the session variable. It's copied to + * SVariableMemoryContext if necessary. + * + * init_mode is true, when the value of session variable is being initialized + * by default expression or by null. Only in this moment we can allow to + * modify immutable variables with default expression. + */ +static void +set_session_variable(SVariable svar, Datum value, + bool isnull, Oid typid, + bool init_mode) +{ + MemoryContext oldcxt; + Datum newval = value; + + /* Don't allow assignment of null to NOT NULL variable */ + if (isnull && svar->is_not_null) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + if (svar->typid != typid) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("type \"%s\" of assigned value is different than type \"%s\" of session variable \"%s.%s\"", + format_type_be(typid), + format_type_be(svar->typid), + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* + * Don't allow updating of immutable session variable that has assigned + * not null value or has default expression (and then the value should be + * result of default expression always). Don't do this check, when variable + * is being initialized. + */ + if (!init_mode && + (svar->is_immutable && (svar->is_valid || svar->has_defexpr))) + ereport(ERROR, + (errcode(ERRCODE_ERROR_IN_ASSIGNMENT), + errmsg("session variable \"%s.%s\" is declared IMMUTABLE", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* copy value to session persistent context */ + oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + if (!isnull) + newval = datumCopy(value, svar->typbyval, svar->typlen); + MemoryContextSwitchTo(oldcxt); + + free_session_variable_value(svar); + + svar->value = newval; + svar->isnull = isnull; + svar->freeval = newval != value; + svar->is_valid = true; +} + +/* + * Initialize svar from var + * svar - SVariable - holds value + * var - Variable - holds metadata + */ +static void +init_session_variable(SVariable svar, Variable *var) +{ + Assert(OidIsValid(var->oid)); + + svar->varid = var->oid; + svar->typid = var->typid; + + get_typlenbyval(var->typid, + &svar->typlen, + &svar->typbyval); + + svar->isnull = true; + svar->freeval = false; + svar->value = (Datum) 0; + + svar->is_rowtype = type_is_rowtype(var->typid); + svar->is_not_null = var->is_not_null; + svar->is_immutable = var->is_immutable; + svar->has_defexpr = var->has_defexpr; + + svar->hashvalue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(var->oid)); + + /* the value of variable is not known yet */ + svar->is_valid = false; + + if (var->eoxaction == VARIABLE_EOX_RESET || + var->eoxaction == VARIABLE_EOX_DROP) + register_session_variable_xact_action(var->oid, SVAR_RESET); +} + +/* + * Search the given session variable in the hash table. If it doesn't + * exist, then insert it (and calculate defexpr if it exists). + * + * Caller is responsible for doing permission checks. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +static SVariable +prepare_variable_for_reading(Oid varid) +{ + SVariable svar; + Variable var; + bool found; + + var.oid = InvalidOid; + + if (!sessionvars) + create_sessionvars_hashtable(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + /* Ensure so all entries in sessionvars hash table are valid */ + sync_sessionvars_all(); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + /* Return content if it is available and valid */ + if (found && svar->is_valid) + return svar; + + /* We need to load defexpr. */ + initVariable(&var, varid, false); + + if (!found) + init_session_variable(svar, &var); + + /* Raise an error when we cannot initialize variable correctly */ + if (var.is_not_null && !var.defexpr) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("The session variable was not initialized yet."))); + + if (svar->has_defexpr) + { + Datum value = (Datum) 0; + bool isnull; + EState *estate = NULL; + Expr *defexpr; + ExprState *defexprs; + MemoryContext oldcxt; + + /* Prepare default expr */ + estate = CreateExecutorState(); + + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + + defexpr = expression_planner((Expr *) var.defexpr); + defexprs = ExecInitExpr(defexpr, NULL); + value = ExecEvalExprSwitchContext(defexprs, + GetPerTupleExprContext(estate), + &isnull); + + + /* Store result before releasing Executor memory */ + set_session_variable(svar, value, isnull, svar->typid, true); + + MemoryContextSwitchTo(oldcxt); + + FreeExecutorState(estate); + } + else + set_session_variable(svar, (Datum) 0, true, svar->typid, true); + + return svar; +} + +/* + * Store the given value in an SVariable, and cache it if not already present. + * + * Caller is responsible for doing permission checks. + * We try not to break the previous value, if something is wrong. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +void +SetSessionVariable(Oid varid, Datum value, bool isNull, Oid typid) +{ + SVariable svar; + bool found; + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + if (!sessionvars) + create_sessionvars_hashtable(); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + if (!found) + { + Variable var; + + /* We don't need to know defexpr here */ + initVariable(&var, varid, true); + init_session_variable(svar, &var); + } + + set_session_variable(svar, value, isNull, typid, false); +} + +/* + * Wrapper around SetSessionVariable after checking for correct permission. + */ +void +SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull, Oid typid) +{ + AclResult aclresult; + + /* + * Is possible to write to session variable? + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_WRITE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, get_session_variable_name(varid)); + + SetSessionVariable(varid, value, isNull, typid); +} + +/* + * Returns a copy of value of the session variable specified by varid + * Caller is responsible for doing permission checks. + */ +Datum +CopySessionVariable(Oid varid, bool *isNull, Oid *typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + *isNull = svar->isnull; + *typid = svar->typid; + + if (!svar->isnull) + return datumCopy(svar->value, svar->typbyval, svar->typlen); + + return (Datum) 0; +} + +/* + * Returns a copy of value of the session variable specified by varid + * with check of expected type. Like previous function, the caller + * is responsible for doing permission checks. + */ +Datum +CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + if (!svar->isnull) + return datumCopy(svar->value, svar->typbyval, svar->typlen); + + return (Datum) 0; +} + +/* + * Returns a value of session variable identified by varid with + * check of expected type. Like previous function, the called + * is reposible for doing permission check. + */ +Datum +GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + if (svar->isnull) + return (Datum) 0; + + return svar->value; +} + /* * Routines used for manipulation with session variables from * SQL level @@ -197,6 +886,53 @@ RemoveSessionVariable(Oid varid) * again at end of xact time. */ unregister_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + + /* + * And we want to enforce variable clearning when this transaction or + * subtransaction will be committed (we don't need to wait for + * sinval message). The cleaning action for one session variable + * can be repeated in the action list without causing any problem, + * so we don't need to ensure uniqueness. We need a different action + * from RESET, because RESET is executed on any transaction end, + * but we want to execute cleaning only when the current transaction + * will be committed. + */ + register_session_variable_xact_action(varid, SVAR_ON_COMMIT_RESET); +} + +/* + * Fast drop of the complete content of all session variables hash table. + * This is code for DISCARD VARIABLES command. This command + * cannot be run inside transaction, so we don't need to handle + * end of transaction actions. + */ +void +ResetSessionVariables(void) +{ + /* Destroy hash table and reset related memory context */ + if (sessionvars) + { + hash_destroy(sessionvars); + sessionvars = NULL; + } + + /* Release memory allocated by session variables */ + if (SVariableMemoryContext != NULL) + MemoryContextReset(SVariableMemoryContext); + + /* + * There are not any session variables left, so simply trim + * both xact action lists. + */ + list_free_deep(xact_drop_actions); + xact_drop_actions = NIL; + + list_free_deep(xact_reset_recheck_actions); + xact_reset_recheck_actions = NIL; + + /* There are not variables to synchronize */ + sync_sessionvars_all_is_required = false; + recheck_sessionvars_is_required = false; } /* @@ -228,8 +964,10 @@ register_session_variable_xact_action(Oid varid, xact_ai->creating_subid = GetCurrentSubTransactionId(); xact_ai->deleting_subid = InvalidSubTransactionId; - Assert(action == SVAR_ON_COMMIT_DROP); - xact_drop_actions = lcons(xact_ai, xact_drop_actions); + if (action == SVAR_ON_COMMIT_DROP) + xact_drop_actions = lcons(xact_ai, xact_drop_actions); + else + xact_reset_recheck_actions = lcons(xact_ai, xact_reset_recheck_actions); MemoryContextSwitchTo(oldcxt); } @@ -310,6 +1048,95 @@ AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit) */ list_free_deep(xact_drop_actions); xact_drop_actions = NIL; + + foreach(l, xact_reset_recheck_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); + + if (xact_ai->action == SVAR_RECHECK) + { + /* + * We can do the recheck only when the transaction is committed as + * we need to access system catalog. When transaction is + * ROLLBACKed, then we have to postpone the check to next + * transaction. We therefore don't check for a matching + * creating_subid here. + */ + if (isCommit) + { + SVariable svar; + bool found; + + svar = (SVariable) hash_search(sessionvars, &xact_ai->varid, + HASH_FIND, &found); + + if (found) + { + HeapTuple tp; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + ReleaseSysCache(tp); + else + remove_session_variable(svar); + } + + xact_reset_recheck_actions = foreach_delete_current(xact_reset_recheck_actions, l); + pfree(xact_ai); + } + } + else + { + /* + * SVAR_RESET is used for uncoditional reset of session variable on + * on every transaction end. It is used for two cases: + * First case is reset of content of session variable created + * by command with ON TRANSACTION END RESET clause. For this + * case we don't need to clean hashtab entry, but we do it, because + * the session variable initialization is not expensive and it + * is executed only once time per transaction. When we remove + * variable from hashtab, we don't need to maintain special flag for + * forcing xact action reregistration (it is done just inside + * init_session_variable function. + * + * SVAR_RESET is used by variable created with ON COMMIT DROP clause. + * We can clean session memory immediately. We can do it by + * SVAR_RECHECK action too, but recheck requires isCommit=true. + * Cleaning by SVAR_RESET has not this requirment. + */ + if (xact_ai->action == SVAR_RESET) + remove_session_variable_by_id(xact_ai->varid); + + /* + * When we process DROP VARIABLE statement, we create + * SVAR_ON_COMMIT_RESET xact action. We want to process + * this action only when related transaction is commited + * (when DROP VARIABLE statement sucessfully processed). + * We want to preserve variable content, when the transaction + * with DROP VARAIBLE statement was reverted. + */ + else if (xact_ai->action == SVAR_ON_COMMIT_RESET && + xact_ai->deleting_subid == InvalidSubTransactionId && + isCommit) + remove_session_variable_by_id(xact_ai->varid); + + /* + * Any non SVAR_RECHECK action can now be removed. It was either + * explicitly processed, or implicitly due to some ROLLBACK action. + */ + xact_reset_recheck_actions = foreach_delete_current(xact_reset_recheck_actions, l); + pfree(xact_ai); + } + } + + /* + * Now when isCommit, then we can be sure, so all SVAR_RECHECK items are + * processed. + */ + if (isCommit) + recheck_sessionvars_is_required = false; } /* @@ -345,4 +1172,33 @@ AtEOSubXact_SessionVariable_on_xact_actions(bool isCommit, SubTransactionId mySu xact_ai->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId; } } + + /* + * Reset and recheck actions - cleaning memory should be done every time + * (when the variable with short life cycle was used) and then + * cannot be removed from xact action list. + */ + foreach(cur_item, xact_reset_recheck_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(cur_item); + + if (!isCommit && + xact_ai->creating_subid == mySubid && + xact_ai->action == SVAR_ON_COMMIT_DROP) + { + /* cur_item must be removed */ + xact_reset_recheck_actions = + foreach_delete_current(xact_reset_recheck_actions, cur_item); + pfree(xact_ai); + } + else + { + /* cur_item must be preserved */ + if (xact_ai->creating_subid == mySubid) + xact_ai->creating_subid = parentSubid; + if (xact_ai->deleting_subid == mySubid) + xact_ai->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId; + } + } } diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index 2831e7978b..13554c1b20 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -33,6 +33,7 @@ #include "access/nbtree.h" #include "catalog/objectaccess.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -994,6 +995,80 @@ ExecInitExprRec(Expr *node, ExprState *state, scratch.d.param.paramtype = param->paramtype; ExprEvalPushStep(state, &scratch); break; + + case PARAM_VARIABLE: + { + int es_num_session_variables = 0; + SessionVariableValue *es_session_variables = NULL; + + if (state->parent && state->parent->state) + { + es_session_variables = state->parent->state->es_session_variables; + es_num_session_variables = state->parent->state->es_num_session_variables; + } + + if (es_session_variables) + { + SessionVariableValue *var; + + /* + * Use buffered session variables when the + * buffer with copied values is avaiable + * (standard query executor mode) + */ + + /* Parameter sanity checks. */ + if (param->paramid >= es_num_session_variables) + elog(ERROR, "paramid of PARAM_VARIABLE param is out of range"); + + var = &es_session_variables[param->paramid]; + + if (var->typid != param->paramtype) + elog(ERROR, "type of buffered value is different than PARAM_VARIABLE type"); + + /* + * In this case, pass the value like + * a constant. + */ + scratch.opcode = EEOP_CONST; + scratch.d.constval.value = var->value; + scratch.d.constval.isnull = var->isnull; + ExprEvalPushStep(state, &scratch); + } + else + { + AclResult aclresult; + Oid varid = param->paramvarid; + Oid vartype = param->paramtype; + + /* + * When the expression is evaluated directly + * without query executor start (plpgsql simple + * expr evaluation), then the array es_session_variables + * is null. In this case we need to use direct + * access to session variables. The values are + * not protected by using copy, but it is not + * problem (we don't need to emulate stability + * of the value). + * + * In this case we should to do aclcheck, because + * usual aclcheck from standard_ExecutorStart + * is not executed in this case. Fortunately + * it is just once per transaction. + */ + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_READ); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + scratch.opcode = EEOP_PARAM_VARIABLE; + scratch.d.vparam.varid = varid; + scratch.d.vparam.vartype = vartype; + ExprEvalPushStep(state, &scratch); + } + } + break; + case PARAM_EXTERN: /* diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index eaec697bb3..34ddef2b21 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -61,6 +61,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/sequence.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -453,6 +454,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_PARAM_EXEC, &&CASE_EEOP_PARAM_EXTERN, &&CASE_EEOP_PARAM_CALLBACK, + &&CASE_EEOP_PARAM_VARIABLE, &&CASE_EEOP_CASE_TESTVAL, &&CASE_EEOP_MAKE_READONLY, &&CASE_EEOP_IOCOERCE, @@ -1090,6 +1092,15 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_PARAM_VARIABLE) + { + /* direct access to session variable (without buffering) */ + *op->resvalue = GetSessionVariableWithTypeCheck(op->d.vparam.varid, + op->resnull, + op->d.vparam.vartype); + EEO_NEXT(); + } + EEO_CASE(EEOP_CASE_TESTVAL) { /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ef2fd46092..5e915cb1e7 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -48,6 +48,7 @@ #include "catalog/pg_publication.h" #include "commands/matview.h" #include "commands/trigger.h" +#include "commands/session_variable.h" #include "executor/execdebug.h" #include "executor/nodeSubplan.h" #include "foreign/fdwapi.h" @@ -200,6 +201,61 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->sourceText != NULL); estate->es_sourceText = queryDesc->sourceText; + /* + * The executor doesn't work with session variables directly. Values + * of related session variables are copied to dedicated array, and + * this array is passed to executor. + */ + if (queryDesc->num_session_variables > 0) + { + /* + * When paralel access to query parameters (including related + * session variables) is required, then related session variables + * are restored (deserilized) in queryDesc already. So just push + * pointer of this array to executor's estate. + */ + estate->es_session_variables = queryDesc->session_variables; + estate->es_num_session_variables = queryDesc->num_session_variables; + } + else if (queryDesc->plannedstmt->sessionVariables) + { + ListCell *lc; + int nSessionVariables; + int i = 0; + + /* + * In this case, the query uses session variables, but we have + * to prepare the array with passed values (of used session + * variables) first. + */ + nSessionVariables = list_length(queryDesc->plannedstmt->sessionVariables); + + /* Create the array used for passing values of used session variables */ + estate->es_session_variables = (SessionVariableValue *) + palloc(nSessionVariables * sizeof(SessionVariableValue)); + + /* Fill the array */ + foreach(lc, queryDesc->plannedstmt->sessionVariables) + { + AclResult aclresult; + Oid varid = lfirst_oid(lc); + + aclresult = pg_variable_aclcheck(varid, GetUserId(), ACL_READ); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + estate->es_session_variables[i].varid = varid; + estate->es_session_variables[i].value = CopySessionVariable(varid, + &estate->es_session_variables[i].isnull, + &estate->es_session_variables[i].typid); + + i++; + } + + estate->es_num_session_variables = nSessionVariables; + } + /* * Fill in the query environment, if any, from queryDesc. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index f1fd7f7e8b..1e04923788 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,8 +12,9 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer/WAL usage info, and - * the actual plan to be passed down to the worker. + * any ParamListInfo associated with the query, buffer/WAL usage info, + * session variables buffer, and the actual plan to be passed down to + * the worker. * * IDENTIFICATION * src/backend/executor/execParallel.c @@ -66,6 +67,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -140,6 +142,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +/* Helper functions that can pass values of session variables */ +static Size EstimateSessionVariables(EState *estate); +static void SerializeSessionVariables(EState *estate, char **start_address); +static SessionVariableValue * RestoreSessionVariables(char **start_address, + int *num_session_variables); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -597,6 +605,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; + char *session_variables_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; @@ -606,6 +615,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; + int session_variables_len = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; @@ -661,6 +671,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized session variables. */ + session_variables_len = EstimateSessionVariables(estate); + shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -755,6 +770,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); + /* Store serialized session variables. */ + session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space); + SerializeSessionVariables(estate, &session_variables_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -1402,6 +1422,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; + char *sessionvariable_space; dsa_area *area; ParallelWorkerContext pwcxt; @@ -1427,6 +1448,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); + /* Reconstruct session variables. */ + sessionvariable_space = shm_toc_lookup(toc, + PARALLEL_KEY_SESSION_VARIABLES, + false); + queryDesc->session_variables = + RestoreSessionVariables(&sessionvariable_space, + &queryDesc->num_session_variables); + /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); @@ -1495,3 +1524,118 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate the amount of space required to serialize a + * session variable. + */ +static Size +EstimateSessionVariables(EState *estate) +{ + int i; + Size sz = sizeof(int); + + if (estate->es_session_variables == NULL) + return sz; + + for (i = 0; i < estate->es_num_session_variables; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + + typeOid = svarval->typid; + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + sz = add_size(sz, + datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a session variables buffer into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, and then the datum as serialized by + * datumSerialize(). The caller is responsible for ensuring that there is + * enough storage to store the number of bytes that will be written; use + * EstimateSessionVariables to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreSessionVariables can be used to recreate a session variable buffer + * based on the serialized representation; + */ +static void +SerializeSessionVariables(EState *estate, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + nparams = estate->es_num_session_variables; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + typeOid = svarval->typid; + + /* Write type OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + datumSerialize(svarval->value, svarval->isnull, typByVal, typLen, + start_address); + } +} + +static SessionVariableValue * +RestoreSessionVariables(char **start_address, int *num_session_variables) +{ + SessionVariableValue *session_variables; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + *num_session_variables = nparams; + session_variables = (SessionVariableValue *) + palloc(nparams * sizeof(SessionVariableValue)); + + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval = &session_variables[i]; + + /* Read type OID. */ + memcpy(&svarval->typid, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + svarval->value = datumRestore(start_address, &svarval->isnull); + } + + return session_variables; +} diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index b6b6512ef1..3c2ece8d4e 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1073,6 +1073,12 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_PARAM_VARIABLE: + build_EvalXFunc(b, mod, "ExecEvalParamVariable", + v_state, op, v_econtext); + LLVMBuildBr(b, opblocks[opno + 1]); + break; + case EEOP_PARAM_CALLBACK: { LLVMTypeRef v_functype; diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 176d4dd866..8885e04417 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -106,6 +106,7 @@ _copyPlannedStmt(const PlannedStmt *from) COPY_NODE_FIELD(invalItems); COPY_NODE_FIELD(paramExecTypes); COPY_NODE_FIELD(utilityStmt); + COPY_NODE_FIELD(sessionVariables); COPY_LOCATION_FIELD(stmt_location); COPY_SCALAR_FIELD(stmt_len); @@ -1517,6 +1518,7 @@ _copyParam(const Param *from) COPY_SCALAR_FIELD(paramtype); COPY_SCALAR_FIELD(paramtypmod); COPY_SCALAR_FIELD(paramcollid); + COPY_SCALAR_FIELD(paramvarid); COPY_LOCATION_FIELD(location); return newnode; @@ -3687,6 +3689,7 @@ _copyQuery(const Query *from) COPY_SCALAR_FIELD(canSetTag); COPY_NODE_FIELD(utilityStmt); COPY_SCALAR_FIELD(resultRelation); + COPY_SCALAR_FIELD(resultVariable); COPY_SCALAR_FIELD(hasAggs); COPY_SCALAR_FIELD(hasWindowFuncs); COPY_SCALAR_FIELD(hasTargetSRFs); @@ -3697,6 +3700,7 @@ _copyQuery(const Query *from) COPY_SCALAR_FIELD(hasForUpdate); COPY_SCALAR_FIELD(hasRowSecurity); COPY_SCALAR_FIELD(isReturn); + COPY_SCALAR_FIELD(hasSessionVariables); COPY_NODE_FIELD(cteList); COPY_NODE_FIELD(rtable); COPY_NODE_FIELD(jointree); diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index b9ef00e328..94e3f42750 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -272,6 +272,7 @@ _equalParam(const Param *a, const Param *b) COMPARE_SCALAR_FIELD(paramtype); COMPARE_SCALAR_FIELD(paramtypmod); COMPARE_SCALAR_FIELD(paramcollid); + COMPARE_SCALAR_FIELD(paramvarid); COMPARE_LOCATION_FIELD(location); return true; @@ -1322,6 +1323,7 @@ _equalQuery(const Query *a, const Query *b) COMPARE_SCALAR_FIELD(canSetTag); COMPARE_NODE_FIELD(utilityStmt); COMPARE_SCALAR_FIELD(resultRelation); + COMPARE_SCALAR_FIELD(resultVariable); COMPARE_SCALAR_FIELD(hasAggs); COMPARE_SCALAR_FIELD(hasWindowFuncs); COMPARE_SCALAR_FIELD(hasTargetSRFs); @@ -1332,6 +1334,7 @@ _equalQuery(const Query *a, const Query *b) COMPARE_SCALAR_FIELD(hasForUpdate); COMPARE_SCALAR_FIELD(hasRowSecurity); COMPARE_SCALAR_FIELD(isReturn); + COMPARE_SCALAR_FIELD(hasSessionVariables); COMPARE_NODE_FIELD(cteList); COMPARE_NODE_FIELD(rtable); COMPARE_NODE_FIELD(jointree); diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index ce12915592..040aacbba6 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -331,6 +331,7 @@ _outPlannedStmt(StringInfo str, const PlannedStmt *node) WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(paramExecTypes); WRITE_NODE_FIELD(utilityStmt); + WRITE_NODE_FIELD(sessionVariables); WRITE_LOCATION_FIELD(stmt_location); WRITE_INT_FIELD(stmt_len); } @@ -1182,6 +1183,7 @@ _outParam(StringInfo str, const Param *node) WRITE_OID_FIELD(paramtype); WRITE_INT_FIELD(paramtypmod); WRITE_OID_FIELD(paramcollid); + WRITE_OID_FIELD(paramvarid); WRITE_LOCATION_FIELD(location); } @@ -2437,6 +2439,7 @@ _outPlannerGlobal(StringInfo str, const PlannerGlobal *node) WRITE_NODE_FIELD(relationOids); WRITE_NODE_FIELD(invalItems); WRITE_NODE_FIELD(paramExecTypes); + WRITE_NODE_FIELD(sessionVariables); WRITE_UINT_FIELD(lastPHId); WRITE_UINT_FIELD(lastRowMarkId); WRITE_INT_FIELD(lastPlanNodeId); @@ -2497,6 +2500,7 @@ _outPlannerInfo(StringInfo str, const PlannerInfo *node) WRITE_BOOL_FIELD(hasPseudoConstantQuals); WRITE_BOOL_FIELD(hasAlternativeSubPlans); WRITE_BOOL_FIELD(hasRecursion); + WRITE_BOOL_FIELD(hasSessionVariables); WRITE_INT_FIELD(wt_param_id); WRITE_BITMAPSET_FIELD(curOuterRels); WRITE_NODE_FIELD(curOuterParams); @@ -3224,6 +3228,7 @@ _outQuery(StringInfo str, const Query *node) appendStringInfoString(str, " :utilityStmt <>"); WRITE_INT_FIELD(resultRelation); + WRITE_OID_FIELD(resultVariable); WRITE_BOOL_FIELD(hasAggs); WRITE_BOOL_FIELD(hasWindowFuncs); WRITE_BOOL_FIELD(hasTargetSRFs); @@ -3234,6 +3239,7 @@ _outQuery(StringInfo str, const Query *node) WRITE_BOOL_FIELD(hasForUpdate); WRITE_BOOL_FIELD(hasRowSecurity); WRITE_BOOL_FIELD(isReturn); + WRITE_BOOL_FIELD(hasSessionVariables); WRITE_NODE_FIELD(cteList); WRITE_NODE_FIELD(rtable); WRITE_NODE_FIELD(jointree); diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 6a05b69415..50dcb8345e 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -252,6 +252,7 @@ _readQuery(void) READ_BOOL_FIELD(canSetTag); READ_NODE_FIELD(utilityStmt); READ_INT_FIELD(resultRelation); + READ_OID_FIELD(resultVariable); READ_BOOL_FIELD(hasAggs); READ_BOOL_FIELD(hasWindowFuncs); READ_BOOL_FIELD(hasTargetSRFs); @@ -262,6 +263,7 @@ _readQuery(void) READ_BOOL_FIELD(hasForUpdate); READ_BOOL_FIELD(hasRowSecurity); READ_BOOL_FIELD(isReturn); + READ_BOOL_FIELD(hasSessionVariables); READ_NODE_FIELD(cteList); READ_NODE_FIELD(rtable); READ_NODE_FIELD(jointree); @@ -668,6 +670,7 @@ _readParam(void) READ_OID_FIELD(paramtype); READ_INT_FIELD(paramtypmod); READ_OID_FIELD(paramcollid); + READ_OID_FIELD(paramvarid); READ_LOCATION_FIELD(location); READ_DONE(); @@ -1827,6 +1830,7 @@ _readPlannedStmt(void) READ_NODE_FIELD(invalItems); READ_NODE_FIELD(paramExecTypes); READ_NODE_FIELD(utilityStmt); + READ_NODE_FIELD(sessionVariables); READ_LOCATION_FIELD(stmt_location); READ_INT_FIELD(stmt_len); diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index a0f2390334..3edb521e44 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -316,6 +316,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->dependsOnRole = false; + glob->sessionVariables = NIL; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -529,6 +530,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->paramExecTypes = glob->paramExecTypes; /* utilityStmt should be null, but we might as well copy it */ result->utilityStmt = parse->utilityStmt; + result->sessionVariables = glob->sessionVariables; result->stmt_location = parse->stmt_location; result->stmt_len = parse->stmt_len; @@ -683,6 +685,12 @@ subquery_planner(PlannerGlobal *glob, Query *parse, */ pull_up_subqueries(root); + /* + * Check if some subquery uses session variable. Flag hasSessionVariables + * should be true if query or some subquery uses any session variable. + */ + pull_up_has_session_variables(root); + /* * If this is a simple UNION ALL query, flatten it into an appendrel. We * do this now because it requires applying pull_up_subqueries to the leaf diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index 9cef92cab2..0a9def182c 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -181,6 +181,8 @@ static List *set_returning_clause_references(PlannerInfo *root, static List *set_windowagg_runcondition_references(PlannerInfo *root, List *runcondition, Plan *plan); +static bool pull_up_has_session_variables_walker(Node *node, + PlannerInfo *root); /***************************************************************************** @@ -1218,6 +1220,50 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) return plan; } +/* + * Search usage of session variables in subqueries + */ +void +pull_up_has_session_variables(PlannerInfo *root) +{ + Query *query = root->parse; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + } + else + { + (void) query_tree_walker(query, + pull_up_has_session_variables_walker, + (void *) root, 0); + } +} + +static bool +pull_up_has_session_variables_walker(Node *node, PlannerInfo *root) +{ + if (node == NULL) + return false; + if (IsA(node, Query)) + { + Query *query = (Query *) node; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + return false; + } + + /* Recurse into subselects */ + return query_tree_walker((Query *) node, + pull_up_has_session_variables_walker, + (void *) root, 0); + } + return expression_tree_walker(node, pull_up_has_session_variables_walker, + (void *) root); +} + /* * set_indexonlyscan_references * Do set_plan_references processing on an IndexOnlyScan @@ -1824,8 +1870,9 @@ copyVar(Var *var) * This is code that is common to all variants of expression-fixing. * We must look up operator opcode info for OpExpr and related nodes, * add OIDs from regclass Const nodes into root->glob->relationOids, and - * add PlanInvalItems for user-defined functions into root->glob->invalItems. - * We also fill in column index lists for GROUPING() expressions. + * add PlanInvalItems for user-defined functions and session variables into + * root->glob->invalItems. We also fill in column index lists for GROUPING() + * expressions. * * We assume it's okay to update opcode info in-place. So this could possibly * scribble on the planner's input data structures, but it's OK. @@ -1915,15 +1962,39 @@ fix_expr_common(PlannerInfo *root, Node *node) g->cols = cols; } } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + PlanInvalItem *inval_item = makeNode(PlanInvalItem); + + /* paramid is still session variable id */ + inval_item->cacheId = VARIABLEOID; + inval_item->hashValue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(p->paramvarid)); + + /* Append this variable to global, register dependency */ + root->glob->invalItems = lappend(root->glob->invalItems, + inval_item); + } + } } /* * fix_param_node * Do set_plan_references processing on a Param + * Collect session variables list and replace variable oid by + * index to collected list. * * If it's a PARAM_MULTIEXPR, replace it with the appropriate Param from * root->multiexpr_params; otherwise no change is needed. * Just for paranoia's sake, we make a copy of the node in either case. + * + * If it's a PARAM_VARIABLE, then we collect used session variables in + * list root->glob->sessionVariable. We should to assign Param paramvarid + * too, and it is position of related session variable in mentioned list. */ static Node * fix_param_node(PlannerInfo *root, Param *p) @@ -1942,6 +2013,41 @@ fix_param_node(PlannerInfo *root, Param *p) elog(ERROR, "unexpected PARAM_MULTIEXPR ID: %d", p->paramid); return copyObject(list_nth(params, colno - 1)); } + + if (p->paramkind == PARAM_VARIABLE) + { + ListCell *lc; + int n = 0; + bool found = false; + + /* We will modify object */ + p = (Param *) copyObject(p); + + /* + * Now, we can actualize list of session variables, and we can complete + * paramid parameter. + */ + foreach(lc, root->glob->sessionVariables) + { + if (lfirst_oid(lc) == p->paramvarid) + { + p->paramid = n; + found = true; + break; + } + n += 1; + } + + if (!found) + { + root->glob->sessionVariables = lappend_oid(root->glob->sessionVariables, + p->paramvarid); + p->paramid = n; + } + + return (Node *) p; + } + return (Node *) copyObject(p); } @@ -2003,7 +2109,10 @@ fix_alternative_subplan(PlannerInfo *root, AlternativeSubPlan *asplan, * replacing Aggref nodes that should be replaced by initplan output Params, * choosing the best implementation for AlternativeSubPlans, * looking up operator opcode info for OpExpr and related nodes, - * and adding OIDs from regclass Const nodes into root->glob->relationOids. + * adding OIDs from regclass Const nodes into root->glob->relationOids, + * and assigning paramvarid to PARAM_VARIABLE params, and collecting + * of OIDs of session variables in root->glob->sessionVariables list + * (paramvarid is an position of related session variable in this list). * * 'node': the expression to be modified * 'rtoffset': how much to increment varnos by @@ -2025,7 +2134,8 @@ fix_scan_expr(PlannerInfo *root, Node *node, int rtoffset, double num_exec) root->multiexpr_params != NIL || root->glob->lastPHId != 0 || root->minmax_aggs != NIL || - root->hasAlternativeSubPlans) + root->hasAlternativeSubPlans || + root->hasSessionVariables) { return fix_scan_expr_mutator(node, &context); } diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index 533df86ff7..ca48b3b74c 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -26,6 +26,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/executor.h" #include "executor/functions.h" #include "executor/execExpr.h" @@ -853,16 +854,17 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXTERN Params or are - * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * either generated within workers or can be computed by the leader and - * then their value can be passed to workers. + * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE + * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they + * could be either generated within workers or can be computed by the + * leader and then their value can be passed to workers. */ else if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind == PARAM_EXTERN || + param->paramkind == PARAM_VARIABLE) return false; if (param->paramkind != PARAM_EXEC || @@ -2284,6 +2286,7 @@ convert_saop_to_hashed_saop_walker(Node *node, void *context) * value of the Param. * 2. Fold stable, as well as immutable, functions to constants. * 3. Reduce PlaceHolderVar nodes to their contained expressions. + * 4. Current value of session variable can be used for estimation too. *-------------------- */ Node * @@ -2406,6 +2409,29 @@ eval_const_expressions_mutator(Node *node, } } } + else if (param->paramkind == PARAM_VARIABLE && + context->estimate) + { + int16 typLen; + bool typByVal; + Datum pval; + bool isnull; + + get_typlenbyval(param->paramtype, + &typLen, &typByVal); + + pval = CopySessionVariableWithTypeCheck(param->paramvarid, + &isnull, + param->paramtype); + + return (Node *) makeConst(param->paramtype, + param->paramtypmod, + param->paramcollid, + (int) typLen, + pval, + isnull, + typByVal); + } /* * Not replaceable, so just copy the Param (no need to @@ -4813,22 +4839,45 @@ substitute_actual_parameters_mutator(Node *node, { if (node == NULL) return NULL; + + /* + * SQL functions can contain two different kind of params. The nodes with + * paramkind PARAM_EXTERN are related to function's arguments (and should + * be replaced in this step), because this is how we apply the function's + * arguments for an expression. + * + * The nodes with paramkind PARAM_VARIABLE are related to usage of + * session variables. The values of session variables are not passed + * to expression by expression arguments, so it should not be replaced + * here by function's arguments. Although we could substitute params + * related to immutable session variables with default expression by + * this default expression, it is safer to not do it. This way we don't + * have to run security checks here. There can be some performance loss, + * but an access to session variable is fast (and the result of default + * expression is immediately materialized and can be reused). + */ if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind != PARAM_EXTERN) + if (param->paramkind != PARAM_EXTERN && + param->paramkind != PARAM_VARIABLE) elog(ERROR, "unexpected paramkind: %d", (int) param->paramkind); - if (param->paramid <= 0 || param->paramid > context->nargs) - elog(ERROR, "invalid paramid: %d", param->paramid); - /* Count usage of parameter */ - context->usecounts[param->paramid - 1]++; + if (param->paramkind == PARAM_EXTERN) + { + if (param->paramid <= 0 || param->paramid > context->nargs) + elog(ERROR, "invalid paramid: %d", param->paramid); - /* Select the appropriate actual arg and replace the Param with it */ - /* We don't need to copy at this time (it'll get done later) */ - return list_nth(context->args, param->paramid - 1); + /* Count usage of parameter */ + context->usecounts[param->paramid - 1]++; + + /* Select the appropriate actual arg and replace the Param with it */ + /* We don't need to copy at this time (it'll get done later) */ + return list_nth(context->args, param->paramid - 1); + } } + return expression_tree_mutator(node, substitute_actual_parameters_mutator, (void *) context); } diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 1bcb875507..d690d102b0 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -525,6 +525,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -942,6 +944,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1312,8 +1315,9 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) /* process the FROM clause */ transformFromClause(pstate, stmt->fromClause); - /* transform targetlist */ - qry->targetList = transformTargetList(pstate, stmt->targetList, + /* Transform targetlist. */ + qry->targetList = transformTargetList(pstate, + stmt->targetList, EXPR_KIND_SELECT_TARGET); /* mark column origins */ @@ -1404,6 +1408,8 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -1878,6 +1884,8 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) (LockingClause *) lfirst(l), false); } + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -2409,6 +2417,7 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 69e6318b31..046884fe77 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -40,11 +40,12 @@ #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" +#include "utils/typcache.h" #include "utils/xml.h" /* GUC parameters */ bool Transform_null_equals = false; - +bool session_variables_ambiguity_warning = false; static Node *transformExprRecurse(ParseState *pstate, Node *expr); static Node *transformParamRef(ParseState *pstate, ParamRef *pref); @@ -101,6 +102,9 @@ static Expr *make_distinct_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, int location); static Node *make_nulltest_from_distinct(ParseState *pstate, A_Expr *distincta, Node *arg); +static Node *makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location); /* @@ -506,6 +510,10 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) char *relname = NULL; char *colname = NULL; ParseNamespaceItem *nsitem; + Oid varid = InvalidOid; + char *attrname = NULL; + bool not_unique; + bool lockit; int levels_up; enum { @@ -851,6 +859,124 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) parser_errposition(pstate, cref->location))); } + /* Protect session variable by AccessShareLock when it is not shadowed. */ + lockit = (node == NULL); + + /* The col's reference can be a reference to session variable too. */ + varid = IdentifyVariable(cref->fields, &attrname, lockit, ¬_unique); + + /* + * Raise error when reference to session variable is ambiguous and + * and this reference is not shadowed. + */ + if (!node && not_unique) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_PARAMETER), + errmsg("session variable reference \"%s\" is ambiguous", + NameListToString(cref->fields)), + parser_errposition(pstate, cref->location))); + + if (OidIsValid(varid)) + { + /* + * When Session variables is shadowed by columns or by routine's + * variables or routine's arguments. + */ + if (node != NULL) + { + /* + * In this case we can raise a warning, but only when required. + * We should not raise warnings for contexts where usage of session + * variables has not sense, or when we can detect that variable + * doesn't have the wanted field (and then there is no possible + * identifier's collision). + */ + if (session_variables_ambiguity_warning && + pstate->p_expr_kind == EXPR_KIND_SELECT_TARGET) + { + Oid typid; + int32 typmod; + Oid collid; + + get_session_variable_type_typmod_collid(varid, + &typid, &typmod, + &collid); + + /* + * Some cases with ambiguous references can be solved without + * raising a warning. When there is a collision between column + * name (or label) and some session variable name, and when we + * know attribute name, then we can ignore the collision when: + * + * a) variable is of scalar type (then indirection cannot be + * applied on this session variable. + * + * b) when related variable has no field with the given + * attrname, then indirection cannot be applied on this + * session variable. + */ + if (attrname) + { + if (type_is_rowtype(typid)) + { + TupleDesc tupdesc; + bool found = false; + int i; + + /* slow part, I hope it will not be to often */ + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + for (i = 0; i < tupdesc->natts; i++) + { + if (namestrcmp(&(TupleDescAttr(tupdesc, i)->attname), attrname) == 0 && + !TupleDescAttr(tupdesc, i)->attisdropped) + { + found = true; + break; + } + } + + ReleaseTupleDesc(tupdesc); + + /* There is no composite variable with this field. */ + if (!found) + varid = InvalidOid; + } + else + /* There is no composite variable with this name. */ + varid = InvalidOid; + } + + /* + * Raise warning when session variable reference is still + * visible. + */ + if (OidIsValid(varid)) + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s\" is shadowed", + NameListToString(cref->fields)), + errdetail("Session variables can be shadowed by columns, routine's variables and routine's arguments with same name."), + parser_errposition(pstate, cref->location))); + } + + /* Session variables are always shadowed by any other node. */ + varid = InvalidOid; + } + else + { + Oid typid; + int32 typmod; + Oid collid; + + get_session_variable_type_typmod_collid(varid, &typid, &typmod, + &collid); + + node = makeParamSessionVariable(pstate, + varid, typid, typmod, collid, + attrname, cref->location); + } + } + /* * Throw error if no translation found. */ @@ -885,6 +1011,64 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) return node; } +/* + * Generate param variable for reference to session variable + */ +static Node * +makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location) +{ + Param *param; + + param = makeNode(Param); + + param->paramkind = PARAM_VARIABLE; + param->paramvarid = varid; + param->paramtype = typid; + param->paramtypmod = typmod; + param->paramcollid = collid; + + pstate->p_hasSessionVariables = true; + + if (attrname != NULL) + { + TupleDesc tupdesc; + int i; + + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (strcmp(attrname, NameStr(att->attname)) == 0 && + !att->attisdropped) + { + /* Success, so generate a FieldSelect expression */ + FieldSelect *fselect = makeNode(FieldSelect); + + fselect->arg = (Expr *) param; + fselect->fieldnum = i + 1; + fselect->resulttype = att->atttypid; + fselect->resulttypmod = att->atttypmod; + /* save attribute's collation for parse_collate.c */ + fselect->resultcollid = att->attcollation; + + ReleaseTupleDesc(tupdesc); + return (Node *) fselect; + } + } + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("could not identify column \"%s\" in variable", attrname), + parser_errposition(pstate, location))); + } + + return (Node *) param; +} + static Node * transformParamRef(ParseState *pstate, ParamRef *pref) { diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 5aa5a350f3..c5cf50687c 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->queryEnv = queryEnv; qd->instrument_options = instrument_options; /* instrumentation wanted? */ + qd->num_session_variables = 0; + qd->session_variables = NULL; + /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; qd->estate = NULL; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index c3937a60fd..eab0af8809 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -38,6 +38,7 @@ #include "catalog/pg_statistic_ext.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "catalog/pg_variable.h" #include "commands/defrem.h" #include "commands/tablespace.h" #include "common/keywords.h" @@ -501,6 +502,7 @@ static char *generate_function_name(Oid funcid, int nargs, static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static void add_cast_to(StringInfo buf, Oid typid); static char *generate_qualified_type_name(Oid typid); +static char *generate_session_variable_name(Oid varid); static text *string_to_text(char *str); static char *flatten_reloptions(Oid relid); static void get_reloptions(StringInfo buf, Datum reloptions); @@ -8057,6 +8059,14 @@ get_parameter(Param *param, deparse_context *context) return; } + /* translate paramvarid to session variable name */ + if (param->paramkind == PARAM_VARIABLE) + { + appendStringInfo(context->buf, "%s", + generate_session_variable_name(param->paramvarid)); + return; + } + /* * If it's an external parameter, see if the outermost namespace provides * function argument names. @@ -12727,6 +12737,42 @@ generate_collation_name(Oid collid) return result; } +/* + * generate_session_variable_name + * Compute the name to display for a session variable specified by OID + * + * The result includes all necessary quoting and schema-prefixing. + */ +static char * +generate_session_variable_name(Oid varid) +{ + HeapTuple tup; + Form_pg_variable varform; + char *varname; + char *nspname; + char *result; + + tup = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(varid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for variable %u", varid); + + varform = (Form_pg_variable) GETSTRUCT(tup); + + varname = NameStr(varform->varname); + + if (!VariableIsVisible(varid)) + nspname = get_namespace_name_or_temp(varform->varnamespace); + else + nspname = NULL; + + result = quote_qualified_identifier(nspname, varname); + + ReleaseSysCache(tup); + + return result; +} + /* * Given a C string, produce a TEXT datum. * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index 0d6a295674..74daf13a3f 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,6 +58,7 @@ #include "access/transam.h" #include "catalog/namespace.h" +#include "catalog/pg_variable.h" #include "executor/executor.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" @@ -1891,6 +1892,20 @@ ScanQueryWalker(Node *node, bool *acquire) ScanQueryForLocks(castNode(Query, sub->subselect), *acquire); /* Fall through to process lefthand args of SubLink */ } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + if (acquire) + LockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + else + UnlockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + } + } /* * Do NOT recurse into Query nodes, because ScanQueryForLocks already @@ -2022,7 +2037,9 @@ PlanCacheRelCallback(Datum arg, Oid relid) /* * PlanCacheObjectCallback - * Syscache inval callback function for PROCOID and TYPEOID caches + * Syscache inval callback function for TYPEOID, PROCOID, NAMESPACEOID, + * OPEROID, AMOPOPID, FOREIGNSERVEROID, FOREIGNDATAWRAPPEROID and VARIABLEOID + * caches. * * Invalidate all plans mentioning the object with the specified hash value, * or all plans mentioning any member of this cache if hashvalue == 0. diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index af74fe345e..8d61fc082c 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -1902,9 +1902,13 @@ get_call_expr_arg_stable(Node *expr, int argnum) */ if (IsA(arg, Const)) return true; - if (IsA(arg, Param) && - ((Param *) arg)->paramkind == PARAM_EXTERN) - return true; + if (IsA(arg, Param)) + { + Param *p = (Param *) arg; + + if (p->paramkind == PARAM_EXTERN || p->paramkind == PARAM_VARIABLE) + return true; + } return false; } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index a7cc49898b..ba3724e29e 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -1729,6 +1729,18 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + + { + {"session_variables_ambiguity_warning", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Raise warning when reference to session variable is ambiguous."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &session_variables_ambiguity_warning, + false, + NULL, NULL, NULL + }, + { {"db_user_namespace", PGC_SIGHUP, CONN_AUTH_AUTH, gettext_noop("Enables per-database user names."), diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h index 5434e79f54..7614a5dee2 100644 --- a/src/include/commands/session_variable.h +++ b/src/include/commands/session_variable.h @@ -27,6 +27,13 @@ extern void ResetSessionVariables(void); extern void RemoveSessionVariable(Oid varid); extern ObjectAddress DefineSessionVariable(ParseState *pstate, CreateSessionVarStmt * stmt); +extern Datum CopySessionVariable(Oid varid, bool *isNull, Oid *typid); +extern Datum CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); +extern Datum GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); + +extern void SetSessionVariable(Oid varid, Datum value, bool isNull, Oid typid); +extern void SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull, Oid typid); + extern void RegisterOnCommitDropSessionVariable(Oid varid); extern void AtPreEOXact_SessionVariable_on_xact_actions(bool isCommit); diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index e34db8c93c..1111af6920 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -159,6 +159,7 @@ typedef enum ExprEvalOp EEOP_PARAM_EXEC, EEOP_PARAM_EXTERN, EEOP_PARAM_CALLBACK, + EEOP_PARAM_VARIABLE, /* return CaseTestExpr value */ EEOP_CASE_TESTVAL, @@ -384,6 +385,13 @@ typedef struct ExprEvalStep Oid paramtype; /* OID of parameter's datatype */ } param; + /* for EEOP_PARAM_VARIABLE */ + struct + { + Oid varid; /* OID of assigned variable */ + Oid vartype; /* OID of parameter's datatype */ + } vparam; + /* for EEOP_PARAM_CALLBACK */ struct { @@ -806,6 +814,8 @@ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamVariable(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); extern void ExecEvalCurrentOfExpr(ExprState *state, ExprEvalStep *op); extern void ExecEvalNextValueExpr(ExprState *state, ExprEvalStep *op); diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index e79e2c001f..dbf4dc7ea0 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -48,6 +48,10 @@ typedef struct QueryDesc EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + /* reference to session variables buffer */ + int num_session_variables; + SessionVariableValue *session_variables; + /* This field is set by ExecutorRun */ bool already_executed; /* true if previously executed */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 5728801379..26148a9381 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -575,6 +575,18 @@ typedef struct AsyncRequest * tuples) */ } AsyncRequest; +/* ---------------- + * SessionVariableValue + * ---------------- + */ +typedef struct SessionVariableValue +{ + Oid varid; + Oid typid; + bool isnull; + Datum value; +} SessionVariableValue; + /* ---------------- * EState information * @@ -626,6 +638,13 @@ typedef struct EState ParamListInfo es_param_list_info; /* values of external params */ ParamExecData *es_param_exec_vals; /* values of internal params */ + /* Variables info: */ + /* number of used session variables */ + int es_num_session_variables; + + /* array of copied values of session variables */ + SessionVariableValue *es_session_variables; + QueryEnvironment *es_queryEnv; /* query environment */ /* Other working state: */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 6b825785a7..74dbc5a204 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -134,6 +134,7 @@ typedef struct Query int resultRelation; /* rtable index of target relation for * INSERT/UPDATE/DELETE/MERGE; 0 for SELECT */ + Oid resultVariable; /* target variable of LET statement */ bool hasAggs; /* has aggregates in tlist or havingQual */ bool hasWindowFuncs; /* has window functions in tlist */ bool hasTargetSRFs; /* has set-returning functions in tlist */ @@ -143,6 +144,7 @@ typedef struct Query bool hasModifyingCTE; /* has INSERT/UPDATE/DELETE in WITH */ bool hasForUpdate; /* FOR [KEY] UPDATE/SHARE was specified */ bool hasRowSecurity; /* rewriter has applied some RLS policy */ + bool hasSessionVariables; /* uses session variables */ bool isReturn; /* is a RETURN statement */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index a6e5db4eec..0915f346df 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -130,6 +130,8 @@ typedef struct PlannerGlobal char maxParallelHazard; /* worst PROPARALLEL hazard level */ PartitionDirectory partition_directory; /* partition descriptors */ + + List *sessionVariables; /* list of used session variables */ } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -351,6 +353,7 @@ struct PlannerInfo * pseudoconstant = true */ bool hasAlternativeSubPlans; /* true if we've made any of those */ bool hasRecursion; /* true if planning a recursive WITH item */ + bool hasSessionVariables; /* true if session variables were used */ /* * Information about aggregates. Filled by preprocess_aggrefs(). diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 0ea9a22dfb..2c536aab9a 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -86,6 +86,8 @@ typedef struct PlannedStmt Node *utilityStmt; /* non-null if this is utility stmt */ + List *sessionVariables; /* list of OIDs for PARAM_VARIABLE Params */ + /* statement location in source string (copied from Query) */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 51505eee85..edb3fb06ca 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -43,7 +43,10 @@ typedef struct Alias List *colnames; /* optional list of column aliases */ } Alias; -/* What to do at commit time for temporary relations */ +/* + * What to do at commit time for temporary relations or + * persistent/temporary variable. + */ typedef enum OnCommitAction { ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */ @@ -261,13 +264,17 @@ typedef struct Const * of the `paramid' field contain the SubLink's subLinkId, and * the low-order 16 bits contain the column number. (This type * of Param is also converted to PARAM_EXEC during planning.) + * + * PARAM_VARIABLE: The parameter is an access to session variable + * paramid holds varid. */ typedef enum ParamKind { PARAM_EXTERN, PARAM_EXEC, PARAM_SUBLINK, - PARAM_MULTIEXPR + PARAM_MULTIEXPR, + PARAM_VARIABLE } ParamKind; typedef struct Param @@ -278,6 +285,7 @@ typedef struct Param Oid paramtype; /* pg_type OID of parameter's datatype */ int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ + Oid paramvarid; /* OID of session variable if it is used */ int location; /* token location, or -1 if unknown */ } Param; diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index c4f61c1a09..efa1d62f1e 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -117,4 +117,6 @@ extern void record_plan_function_dependency(PlannerInfo *root, Oid funcid); extern void record_plan_type_dependency(PlannerInfo *root, Oid typid); extern bool extract_query_dependencies_walker(Node *node, PlannerInfo *root); +extern void pull_up_has_session_variables(PlannerInfo *root); + #endif /* PLANMAIN_H */ diff --git a/src/include/parser/parse_expr.h b/src/include/parser/parse_expr.h index c8e5c57b43..14b0adb948 100644 --- a/src/include/parser/parse_expr.h +++ b/src/include/parser/parse_expr.h @@ -17,6 +17,7 @@ /* GUC parameters */ extern PGDLLIMPORT bool Transform_null_equals; +extern PGDLLIMPORT bool session_variables_ambiguity_warning; extern Node *transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 9b042962fc..59da340265 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -212,6 +212,7 @@ struct ParseState bool p_hasTargetSRFs; bool p_hasSubLinks; bool p_hasModifyingCTE; + bool p_hasSessionVariables; Node *p_last_srf; /* most recent set-returning func/op found */ -- 2.36.1