From fedeaef0aea0122863e5e77ccf1fecf91c9555a3 Mon Sep 17 00:00:00 2001 From: "okbob@github.com" Date: Sun, 13 Nov 2022 17:45:23 +0100 Subject: [PATCH 02/10] session variables Implementation storage and access routines. Session variables are stored in session memory inside dedicated hash table. Two levels of an access are implemented: API level and SQL level. Both levels are implemented in this commit. The most difficult part is cleaning (reset) of session variable. The content of session variable should be cleaned when related session variable is removed from system catalog. But queue of sinval messages can be truncated, or current transaction state can disallow an access to system catalog, so cleaning can be postponed. --- src/backend/access/transam/xact.c | 2 +- src/backend/catalog/dependency.c | 5 + src/backend/catalog/namespace.c | 295 +++++ src/backend/commands/meson.build | 1 + src/backend/commands/session_variable.c | 1233 +++++++++++++++++++-- src/backend/executor/execExpr.c | 76 ++ src/backend/executor/execExprInterp.c | 17 + src/backend/executor/execMain.c | 58 + src/backend/executor/execParallel.c | 147 ++- src/backend/jit/llvm/llvmjit_expr.c | 6 + src/backend/optimizer/plan/planner.c | 8 + src/backend/optimizer/plan/setrefs.c | 118 +- src/backend/optimizer/prep/prepjointree.c | 3 + src/backend/optimizer/util/clauses.c | 74 +- src/backend/parser/analyze.c | 7 + src/backend/parser/parse_expr.c | 273 ++++- src/backend/tcop/pquery.c | 3 + src/backend/utils/adt/ruleutils.c | 46 + src/backend/utils/cache/plancache.c | 29 +- src/backend/utils/fmgr/fmgr.c | 10 +- src/backend/utils/misc/guc_tables.c | 10 + src/include/catalog/namespace.h | 2 + src/include/catalog/pg_proc.dat | 7 + src/include/commands/session_variable.h | 13 +- src/include/executor/execExpr.h | 11 + src/include/executor/execdesc.h | 4 + src/include/nodes/execnodes.h | 19 + src/include/nodes/parsenodes.h | 1 + src/include/nodes/pathnodes.h | 5 + src/include/nodes/plannodes.h | 2 + src/include/nodes/primnodes.h | 11 +- src/include/optimizer/planmain.h | 2 + src/include/parser/parse_expr.h | 1 + src/include/parser/parse_node.h | 1 + src/tools/pgindent/typedefs.list | 3 + 35 files changed, 2403 insertions(+), 100 deletions(-) diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 2303e1e053..1b6859f221 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2214,7 +2214,7 @@ CommitTransaction(void) */ smgrDoPendingSyncs(true, is_parallel_worker); - /* Let ON COMMIT DROP */ + /* Let ON COMMIT DROP or ON TRANSACTION END */ AtPreEOXact_SessionVariable(true); /* close large objects before lower-level cleanup */ diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 731b9805ad..989a100ea8 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -1909,6 +1909,11 @@ find_expr_references_walker(Node *node, { Param *param = (Param *) node; + /* A variable parameter depends on the session variable */ + if (param->paramkind == PARAM_VARIABLE) + add_object_address(OCLASS_VARIABLE, param->paramvarid, 0, + context->addrs); + /* A parameter must depend on the parameter's datatype */ add_object_address(OCLASS_TYPE, param->paramtype, 0, context->addrs); diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c index 0d5c2d1f04..eed7c8f25c 100644 --- a/src/backend/catalog/namespace.c +++ b/src/backend/catalog/namespace.c @@ -2988,6 +2988,301 @@ LookupVariable(const char *nspname, return varoid; } +/* + * The input list contains names with indirection expressions used as the left + * part of LET statement. The following routine returns a new list with only + * initial strings (names) - without indirection expressions. + */ +List * +NamesFromList(List *names) +{ + ListCell *l; + List *result = NIL; + + foreach(l, names) + { + Node *n = lfirst(l); + + if (IsA(n, String)) + { + result = lappend(result, n); + } + else + break; + } + + return result; +} + +/* + * IdentifyVariable - try to find variable identified by list of names. + * + * Before this call we don't know, how these fields should be mapped to + * schema name, variable name and attribute name. In this routine + * we try to apply passed names to all possible combinations of schema name, + * variable name and attribute name, and we count valid combinations. + * + * Returns oid of identified variable. When last field of names list is + * identified as an attribute, then output attrname argument is set to + * an string of this field. + * + * When there is not any valid combination, then we are sure, so the + * list of names cannot to identify any session variable. In this case + * we return InvalidOid. + * + * We can find more valid combination than one. + * Example: users can have session variable x in schema y, and + * session variable y with attribute x inside some schema from + * search path. In this situation the meaning of expression "y"."x" + * is ambiguous. In this case this routine returns oid of variable + * x in schema y, and the output parameter "not_unique" is set to + * true. In this case this variable is locked. + * + * The AccessShareLock is created on related session variable. The lock + * will be kept for the whole transaction. + * + * Note: the out attrname should be used only when the session variable + * is identified. When the session variable is not identified, then this + * output variable can hold reference to some string, but isn't sure + * about its semantics. + */ +Oid +IdentifyVariable(List *names, char **attrname, bool *not_unique) +{ + Node *field1 = NULL; + Node *field2 = NULL; + Node *field3 = NULL; + Node *field4 = NULL; + char *a = NULL; + char *b = NULL; + char *c = NULL; + char *d = NULL; + Oid varid = InvalidOid; + Oid old_varid = InvalidOid; + uint64 inval_count; + bool retry = false; + + /* + * DDL operations can change the results of a name lookup. Since all such + * operations will generate invalidation messages, we keep track of + * whether any such messages show up while we're performing the operation, + * and retry until either (1) no more invalidation messages show up or (2) + * the answer doesn't change. + */ + for (;;) + { + Oid varoid_without_attr = InvalidOid; + Oid varoid_with_attr = InvalidOid; + + *not_unique = false; + *attrname = NULL; + varid = InvalidOid; + + inval_count = SharedInvalidMessageCounter; + + switch (list_length(names)) + { + case 1: + field1 = linitial(names); + + Assert(IsA(field1, String)); + + varid = LookupVariable(NULL, strVal(field1), false, true); + break; + + case 2: + field1 = linitial(names); + field2 = lsecond(names); + + Assert(IsA(field1, String)); + a = strVal(field1); + + if (IsA(field2, String)) + { + b = strVal(field2); + + /* + * a.b can mean "schema"."variable" or "variable"."field", + * Check both variants, and returns InvalidOid with + * not_unique flag, when both interpretations are + * possible. Second node can be star. In this case, the + * only allowed possibility is "variable"."*". + */ + varoid_without_attr = LookupVariable(a, b, false, true); + varoid_with_attr = LookupVariable(NULL, a, true, true); + } + else + { + /* + * Session variables doesn't support unboxing by star + * syntax. But this syntax have to be calculated here, + * because can come from non session variables related + * expressions. + */ + Assert(IsA(field2, A_Star)); + break; + } + + if (OidIsValid(varoid_without_attr) && OidIsValid(varoid_with_attr)) + { + *not_unique = true; + varid = varoid_without_attr; + } + else if (OidIsValid(varoid_without_attr)) + { + varid = varoid_without_attr; + } + else if (OidIsValid(varoid_with_attr)) + { + *attrname = b; + varid = varoid_with_attr; + } + break; + + case 3: + field1 = linitial(names); + field2 = lsecond(names); + field3 = lthird(names); + + Assert(IsA(field1, String)); + Assert(IsA(field2, String)); + + a = strVal(field1); + b = strVal(field2); + + if (IsA(field3, String)) + { + c = strVal(field3); + + /* + * a.b.c can mean "catalog"."schema"."variable" or + * "schema"."variable"."field", Check both variants, and + * returns InvalidOid with not_unique flag, when both + * interpretations are possible. When third node is star, + * the only possible interpretation is + * "schema"."variable"."*". + */ + varoid_without_attr = LookupVariable(b, c, false, true); + varoid_with_attr = LookupVariable(a, b, true, true); + } + else + { + Assert(IsA(field3, A_Star)); + break; + } + + if (OidIsValid(varoid_without_attr) && OidIsValid(varoid_with_attr)) + { + *not_unique = true; + varid = varoid_without_attr; + } + else if (OidIsValid(varoid_without_attr)) + { + + /* + * In this case, "a" is used as catalog name - check it. + */ + if (strcmp(a, get_database_name(MyDatabaseId)) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cross-database references are not implemented: %s", + NameListToString(names)))); + + varid = varoid_without_attr; + } + else if (OidIsValid(varoid_with_attr)) + { + *attrname = c; + varid = varoid_with_attr; + } + break; + + case 4: + field1 = linitial(names); + field2 = lsecond(names); + field3 = lthird(names); + field4 = lfourth(names); + + Assert(IsA(field1, String)); + Assert(IsA(field2, String)); + Assert(IsA(field3, String)); + + a = strVal(field1); + b = strVal(field2); + c = strVal(field3); + + /* + * In this case, "a" is used as catalog name - check it. + */ + if (strcmp(a, get_database_name(MyDatabaseId)) != 0) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cross-database references are not implemented: %s", + NameListToString(names)))); + + if (IsA(field4, String)) + { + d = strVal(field4); + } + else + { + Assert(IsA(field4, A_Star)); + return InvalidOid; + } + + *attrname = d; + varid = LookupVariable(b, c, true, true); + break; + + default: + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(names)))); + break; + } + + /* + * If, upon retry, we get back the same OID we did last time, then the + * invalidation messages we processed did not change the final answer. + * So we're done. + * + * If we got a different OID, we've locked the variable that used to + * have this name rather than the one that does now. So release the + * lock. + */ + if (retry) + { + if (old_varid == varid) + break; + + if (OidIsValid(old_varid)) + UnlockDatabaseObject(VariableRelationId, old_varid, 0, AccessShareLock); + } + + /* + * Lock the variable. This will also accept any pending invalidation + * messages. If we got back InvalidOid, indicating not found, then + * there's nothing to lock, but we accept invalidation messages + * anyway, to flush any negative catcache entries that may be + * lingering. + */ + if (!OidIsValid(varid)) + AcceptInvalidationMessages(); + else if (OidIsValid(varid)) + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + if (inval_count == SharedInvalidMessageCounter) + break; + + retry = true; + old_varid = varid; + varid = InvalidOid; + } + + return varid; +} + /* * DeconstructQualifiedName * Given a possibly-qualified name expressed as a list of String nodes, diff --git a/src/backend/commands/meson.build b/src/backend/commands/meson.build index 9b350d025f..440a15cf67 100644 --- a/src/backend/commands/meson.build +++ b/src/backend/commands/meson.build @@ -36,6 +36,7 @@ backend_sources += files( 'schemacmds.c', 'seclabel.c', 'sequence.c', + 'session_variable.c', 'statscmds.c', 'subscriptioncmds.c', 'tablecmds.c', diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c index 4570583082..9ea3c89902 100644 --- a/src/backend/commands/session_variable.c +++ b/src/backend/commands/session_variable.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * src/backend/commands/sessionvariable.c + * src/backend/commands/session_variable.c * *------------------------------------------------------------------------- */ @@ -22,87 +22,245 @@ #include "commands/session_variable.h" #include "funcapi.h" #include "miscadmin.h" +#include "optimizer/optimizer.h" +#include "storage/lmgr.h" +#include "storage/proc.h" #include "utils/builtins.h" +#include "utils/datum.h" +#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" /* - * The life cycle of temporary session variable can be - * limmited by using clause ON COMMIT DROP. + * Values of session variables are stored in the backend local memory, + * in sessionvars hash table in binary format, in a dedicated memory + * context SVariableMemoryContext. A session variable value can stay + * valid for longer than the transaction that assigns its value. To + * make sure that the underlying memory is eventually freed, but not + * before it's guarantee that the value won't be needed anymore, we + * need to handle the two following points: + * + * - We need detect when a variable is dropped, whether in the current + * transaction in the current session or by another session, and mark + * the underlying entries for removal. To protect the content against + * possibly rollbacked DROP VARIABLE commands, the entries (and + * memory) shouldn't be freed immediately but be postponed until the + * end of the transaction. + * + * - The session variable can be dropped explicitly (by DROP VARIABLE + * command) or implicitly (using ON COMMIT DROP clause), and the + * value can be implicitly removed (using the ON TRANSACTION END + * clause). In all those cases the memory should also be freed at + * the transaction end. + * + * To achieve that, we maintain 3 queues of actions to be performed at + * certain time: + * - a List of SVariableXActActionItem, to handle ON COMMIT DROP + * variables, and delayed memory cleanup of variable dropped by the + * current transaction. Those actions are transactional (for instance + * we don't want to cleanup the memory of a rollbacked DROP VARIABLE) + * so the structure is needed to keep track of the final transaction + * state + * - a List of variable Oid for the ON TRANSACTION ON RESET variables + * - a List of variable Oid for the concurrent DROP VARIABLE + * notification we receive via shared invalidations. + * + * Note that although resetting a variable doesn't technically require + * to remove the entry from the sessionvars hash table, we currently + * do it. It's a simple way to implement the reset, and helps to + * reduce memory usage and prevents the hash table from bloating. + * + * There are two different ways to do the final access to session + * variables: buffered (indirect) or direct. Buffered access is used + * in regular DML statements, where we have to ensure the stability of + * the variable values. The session variables have the same behaviour + * as external query parameters, which is consistent with using + * PL/pgSQL's variables in embedded queries in PL/pgSQL. + * + * This is implemented by using an aux buffer (an array) that holds a + * copy of values of used (in query) session variables, which is also + * transmitted to the parallel workers. The values from this array + * are passed as constant (EEOP_CONST). + * + * Direct access is used by simple expression evaluation (PLpgSQL). + * In this case we don't need to ensure the stability of passed + * values, and maintaining the buffer with copies of values of session + * variables would be useless overhead. In this case we just read the + * value of the session variable directly (EEOP_PARAM_VARIABLE). This + * strategy removes the necessity to modify related PL/pgSQL code to + * support session variables (the reading of session variables is + * fully transparent for PL/pgSQL). */ typedef enum SVariableXActAction { SVAR_ON_COMMIT_DROP, /* used for ON COMMIT DROP */ + SVAR_ON_COMMIT_RESET, /* used for DROP VARIABLE */ } SVariableXActAction; typedef struct SVariableXActActionItem { Oid varid; /* varid of session variable */ + SVariableXActAction action; /* - * creating_subid is the ID of the creating subxact. If the action was - * unregistered during the current transaction, deleting_subid is the ID - * of the deleting subxact, otherwise InvalidSubTransactionId. + * creating_subid is the ID of the sub-transaction that registered + * the action. If the action was unregistered during the current + * transaction, deleting_subid is the ID of the deleting + * sub-transaction, otherwise InvalidSubTransactionId. */ SubTransactionId creating_subid; SubTransactionId deleting_subid; } SVariableXActActionItem; -/* List holds fields of SVariableXActActionItem type */ -static List *xact_drop_actions = NIL; - -static void register_session_variable_xact_action(Oid varid, SVariableXActAction action); -static void unregister_session_variable_xact_action(Oid varid, SVariableXActAction action); +/* List of SVariableXActActionItem */ +static List *xact_on_commit_actions = NIL; +/* + * To process ON TRANSACTION END RESET variables, for which we always + * need to clear the saved values. + */ +static List *xact_reset_varids = NIL; /* - * Do the necessary work to setup local memory management of a new - * variable. - * - * Caller should already have created the necessary entry in catalog - * and made them visible. + * When the session variable is dropped we need to free local memory. The + * session variable can be dropped by current session, but it can be + * dropped by other's sessions too, so we have to watch sinval message. + * But because we don't want to free local memory immediately, we need to + * hold list of possibly dropped session variables and at the end of + * transaction, we check session variables from this list against system + * catalog. This check can be postponed into next transaction if + * current transactions is in aborted state, as we wouldn't be able to + * access the system catalog. */ -void -SessionVariableCreatePostprocess(Oid varid, char eoxaction) +static List *xact_recheck_varids = NIL; + +typedef struct SVariableData { + Oid varid; /* pg_variable OID of the variable (hash key) */ + /* - * For temporary variables, we need to create a new end of xact action to - * ensure deletion from catalog. + * The session variable is identified by oid. The oid is unique in + * catalog. Unfortunately, the memory cleanup can be postponed to + * the beginning + * of the session next transaction, and it's possible that this next + * transaction sees a different variable with the same oid. We + * therefore need an extra identifier to distinguish both cases. We + * use the LSN number of session variable at creation time. The + * value of session variable (in memory) is valid, when there is a + * record in pg_variable with same oid and same create_lsn. */ - if (eoxaction == VARIABLE_EOX_DROP) - { - Assert(isTempNamespace(get_session_variable_namespace(varid))); + XLogRecPtr create_lsn; - register_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + bool isnull; + bool freeval; + Datum value; + + Oid typid; + int16 typlen; + bool typbyval; + + bool is_domain; + void *domain_check_extra; + LocalTransactionId domain_check_extra_lxid; + + /* + * Top level local transaction id of the last transaction that dropped the + * variable if any. We need this information to avoid freeing memory for + * variable dropped by the local backend that may be eventually rollbacked. + */ + LocalTransactionId drop_lxid; + + bool is_not_null; /* don't allow null values */ + bool is_immutable; /* true when variable is immutable */ + bool has_defexpr; /* true when variable has a default value */ + + bool is_valid; /* true when variable was successfully + * initialized */ + + uint32 hashvalue; /* used for pairing sinval message */ + + bool eox_reset; /* true, when lifecycle is limitted by + * transaction */ +} SVariableData; + +typedef SVariableData *SVariable; + +static HTAB *sessionvars = NULL; /* hash table for session variables */ + +static MemoryContext SVariableMemoryContext = NULL; + +static void create_sessionvars_hashtables(void); +static void free_session_variable_value(SVariable svar); +static void init_session_variable(SVariable svar, Variable *var); +static bool is_session_variable_valid(SVariable svar); +static void pg_variable_cache_callback(Datum arg, int cacheid, + uint32 hashvalue); +static SVariable prepare_variable_for_reading(Oid varid); +static void register_session_variable_xact_action(Oid varid, + SVariableXActAction action); +static void remove_session_variable(SVariable svar); +static void remove_session_variable_by_id(Oid varid); +static void set_session_variable(SVariable svar, Datum value, bool isnull, + bool init_mode); +static const char *SVariableXActActionName(SVariableXActAction action); +static void sync_sessionvars_all(bool filter_lxid); +static void unregister_session_variable_xact_action(Oid varid, + SVariableXActAction action); + + +/* + * Returns human readable name of SVariableXActAction value. + */ +static const char * +SVariableXActActionName(SVariableXActAction action) +{ + switch (action) + { + case SVAR_ON_COMMIT_DROP: + return "ON COMMIT DROP"; + case SVAR_ON_COMMIT_RESET: + return "ON COMMIT RESET"; + default: + elog(ERROR, "unknown SVariableXActAction action %d", action); } } /* - * Handle the local memory cleanup for a DROP VARIABLE command. - * - * Caller should take care of removing the pg_variable entry first. + * Free all memory allocated for the given session variable, but + * preserve the hash entry in sessionvars. */ -void -SessionVariableDropPostprocess(Oid varid) +static void +free_session_variable_value(SVariable svar) { + /* Clean current value */ + if (!svar->isnull) + { + if (svar->freeval) + { + pfree(DatumGetPointer(svar->value)); + svar->freeval = false; + } + + svar->isnull = true; + } + + svar->value = (Datum) 0; + svar->freeval = false; + /* - * The entry was removed from catalog already, we must not do it - * again at end of xact time. + * We can mark this session variable as valid when it has not default + * expression, and when null is allowed. When it has defexpr, then the + * content will be valid after an assignment or defexp evaluation. */ - unregister_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + svar->is_valid = !svar->has_defexpr && !svar->is_not_null; } /* * Registration of actions to be executed on session variables at transaction * end time. We want to drop temporary session variables with clause ON COMMIT - * DROP, or we want to reset values of session variables with clause ON - * TRANSACTION END RESET or we want to clean (reset) local memory allocated by - * values of dropped session variables. - */ - -/* - * Register a session variable xact action. + * DROP, or we want to clean (reset) local memory allocated by + * values of session variables dropped in other backends. */ static void register_session_variable_xact_action(Oid varid, @@ -111,27 +269,30 @@ register_session_variable_xact_action(Oid varid, SVariableXActActionItem *xact_ai; MemoryContext oldcxt; - oldcxt = MemoryContextSwitchTo(CacheMemoryContext); + elog(DEBUG1, "SVariableXActAction \"%s\" is registered for session variable (oid:%u)", + SVariableXActActionName(action), varid); + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); xact_ai = (SVariableXActActionItem *) palloc(sizeof(SVariableXActActionItem)); xact_ai->varid = varid; + xact_ai->action = action; xact_ai->creating_subid = GetCurrentSubTransactionId(); xact_ai->deleting_subid = InvalidSubTransactionId; - Assert(action == SVAR_ON_COMMIT_DROP); - xact_drop_actions = lcons(xact_ai, xact_drop_actions); + xact_on_commit_actions = lcons(xact_ai, xact_on_commit_actions); MemoryContextSwitchTo(oldcxt); } /* - * Unregister an action on a given session variable from action list. In this - * moment, the action is just marked as deleted by setting deleting_subid. The - * calling even might be rollbacked, in which case we should not lose this - * action. + * Unregister an action on a given session variable from the action list. + * The action is just marked as deleted by setting deleting_subid. + * The calling subtransaction even might be rollbacked, in which case the + * action shouldn't be removed. */ static void unregister_session_variable_xact_action(Oid varid, @@ -139,43 +300,839 @@ unregister_session_variable_xact_action(Oid varid, { ListCell *l; - Assert(action == SVAR_ON_COMMIT_DROP); + elog(DEBUG1, "SVariableXActAction \"%s\" is unregistered for session variable (oid:%u)", + SVariableXActActionName(action), varid); - foreach(l, xact_drop_actions) + foreach(l, xact_on_commit_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(l); - if (xact_ai->varid == varid) + if (xact_ai->action == action && xact_ai->varid == varid) xact_ai->deleting_subid = GetCurrentSubTransactionId(); } } /* - * Perform ON TRANSACTION END RESET or ON COMMIT DROP - * and COMMIT/ROLLBACK of transaction session variables. + * Release the given session variable from sessionvars hashtab and free + * all underlying allocated memory. + */ +static void +remove_session_variable(SVariable svar) +{ + free_session_variable_value(svar); + + /* + * In this moment, the session variable is not in catalog, so only saved + * oid can be displayed. + */ + elog(DEBUG1, "session variable (oid:%u) is removing from memory", + svar->varid); + + if (hash_search(sessionvars, + (void *) &svar->varid, + HASH_REMOVE, + NULL) == NULL) + elog(DEBUG1, "hash table corrupted"); +} + +/* + * Release the session variable defined by varid from sessionvars + * hashtab and free all underlying allocated memory. + */ +static void +remove_session_variable_by_id(Oid varid) +{ + SVariable svar; + bool found; + + if (!sessionvars) + return; + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + if (found) + remove_session_variable(svar); +} + +/* + * Callback function for session variable invalidation. + * + * It queues a list of variable Oid in xact_recheck_varids. + */ +static void +pg_variable_cache_callback(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS status; + SVariable svar; + + /* + * There is no guarantee of sessionvars being initialized, even when + * receiving an invalidation callback, as DISCARD [ ALL | VARIABLES ] + * destroys the hash table entirely. + */ + if (!sessionvars) + return; + + elog(DEBUG1, "pg_variable_cache_callback %u %u", cacheid, hashvalue); + + /* + * When the hashvalue is not specified, then we have to recheck all + * currently used session variables. Since we can't guarantee the exact + * session variable from its hashValue, we also have to iterate over + * all items of the sessionvars hash table. + */ + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + if (hashvalue == 0 || svar->hashvalue == hashvalue) + { + MemoryContext oldcxt; + + /* The list needs to be able to survive the transaction */ + oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + + xact_recheck_varids = lappend_oid(xact_recheck_varids, + svar->varid); + + MemoryContextSwitchTo(oldcxt); + + elog(DEBUG1, "session variable (oid:%u) should be rechecked (forced by sinval)", + svar->varid); + } + + /* + * although it there is low probability, we have to iterate over all + * locally set session variables, because hashvalue is not a unique + * identifier. + */ + } +} + +/* + * Returns true when the entry in pg_variable is valid for the given session + * variable. + */ +static bool +is_session_variable_valid(SVariable svar) +{ + HeapTuple tp; + bool result = false; + + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + if (HeapTupleIsValid(tp)) + { + /* + * In this case, the only oid cannot be used as unique identifier, + * because the oid counter can wraparound, and the oid can be used for + * new other session variable. We do a second check against 64bit + * unique identifier. + */ + if (svar->create_lsn == ((Form_pg_variable) GETSTRUCT(tp))->create_lsn) + result = true; + + ReleaseSysCache(tp); + } + + return result; +} + +/* + * Recheck the possibly invalidated variables (in memory) against system + * catalog. This routine is called before any read or any write from/to session + * variables and when processing a committed transaction. + * If filter_lxid is true, this function will ignore the recheck for variables + * that have the same cached local transaction id as the transaction current + * top level local transaction id, ie. the variables dropped in the current top + * level transaction or any underlying subtransaction. + */ +static void +sync_sessionvars_all(bool filter_lxid) +{ + SVariable svar; + ListCell *l; + + if (!xact_recheck_varids) + return; + + /* + * If the sessionvars hashtable is NULL (which can be done by DISCARD + * VARIABLES), we are sure that there aren't any active session variable + * in this session. + */ + if (!sessionvars) + { + list_free(xact_recheck_varids); + xact_recheck_varids = NIL; + return; + } + + elog(DEBUG1, "effective call of sync_sessionvars_all()"); + + /* + * The recheck list can contain many duplicates, so clean it up before + * processing to avoid extraneous work. + */ + list_sort(xact_recheck_varids, list_oid_cmp); + list_deduplicate_oid(xact_recheck_varids); + + /* + * This routine is called before any reading, so the session should be in + * transaction state. This is required to access the system catalog. + */ + Assert(IsTransactionState()); + + foreach(l, xact_recheck_varids) + { + bool found; + Oid varid = lfirst_oid(l); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + /* + * Remove invalid variables, but don't touch variables that were + * dropped by the current top level local transaction or any + * subtransaction underneath, as there's no guarantee that the + * transaction will be committed. Such variables will be removed in + * the next transaction if needed. + */ + if (found) + { + /* + * If this is a variable dropped by the current transaction, + * ignore it and keep the oid to recheck in the next transaction. + */ + if (filter_lxid && svar->drop_lxid == MyProc->lxid) + continue; + + if (!is_session_variable_valid(svar)) + remove_session_variable(svar); + } + + /* + * If caller asked to filter the list, we have to clean items as they + * are processed. + */ + if (filter_lxid) + xact_recheck_varids = foreach_delete_current(xact_recheck_varids, + l); + } + + /* + * If caller didn't ask to filter the list, all items have been processed + * so we can simply destroy the list. + */ + if (!filter_lxid) + { + list_free(xact_recheck_varids); + xact_recheck_varids = NIL; + } +} + +/* + * Create the hash table for storing session variables. + */ +static void +create_sessionvars_hashtables(void) +{ + HASHCTL vars_ctl; + + Assert(!sessionvars); + + /* set callbacks */ + if (!SVariableMemoryContext) + { + /* Read sinval messages */ + CacheRegisterSyscacheCallback(VARIABLEOID, + pg_variable_cache_callback, + (Datum) 0); + + /* We need our own long lived memory context */ + SVariableMemoryContext = + AllocSetContextCreate(TopMemoryContext, + "session variables", + ALLOCSET_START_SMALL_SIZES); + } + + Assert(SVariableMemoryContext); + + memset(&vars_ctl, 0, sizeof(vars_ctl)); + vars_ctl.keysize = sizeof(Oid); + vars_ctl.entrysize = sizeof(SVariableData); + vars_ctl.hcxt = SVariableMemoryContext; + + Assert(sessionvars == NULL); + + sessionvars = hash_create("Session variables", 64, &vars_ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); +} + +/* + * Assign some content to the session variable. It's copied to + * SVariableMemoryContext if necessary. + * + * init_mode is true when the value of session variable should be initialized + * by the default expression if any. This is the only case where we allow the + * modification of an immutable variables with default expression. + * + * If any error happens, the existing value shouldn't be modified. + */ +static void +set_session_variable(SVariable svar, Datum value, bool isnull, bool init_mode) +{ + Datum newval = value; + + Assert(svar && OidIsValid(svar->typid)); + + /* Don't allow assignment of null to NOT NULL variable */ + if (isnull && svar->is_not_null) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + /* + * Don't allow the modification of an immutable session variable that + * already has an assigned value (possibly NULL) or has a default + * expression (in which case the value should always be the result of + * default expression evaluation) unless the variable is being initialized. + */ + if (!init_mode && + (svar->is_immutable && + (svar->is_valid || svar->has_defexpr))) + ereport(ERROR, + (errcode(ERRCODE_ERROR_IN_ASSIGNMENT), + errmsg("session variable \"%s.%s\" is declared IMMUTABLE", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid)))); + + if (!isnull) + { + MemoryContext oldcxt = MemoryContextSwitchTo(SVariableMemoryContext); + + newval = datumCopy(value, svar->typbyval, svar->typlen); + + MemoryContextSwitchTo(oldcxt); + } + else + { + /* The caller shouldn't have provided any real value. */ + Assert(value == (Datum) 0); + } + + free_session_variable_value(svar); + + svar->value = newval; + + svar->isnull = isnull; + svar->freeval = newval != value; + svar->is_valid = true; + + /* + * XXX While unlikely, an error here is possible. + * It wouldn't leak memory as the allocated chunk has already been + * correctly assigned to the session variable, but would contradict this + * function contract, which is that this function should either succeed or + * leave the current value untouched. + */ + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new value", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid), + svar->varid); +} + +/* + * Initialize session variable svar from variable var + */ +static void +init_session_variable(SVariable svar, Variable *var) +{ + MemoryContext oldcxt; + + Assert(OidIsValid(var->oid)); + + svar->varid = var->oid; + svar->create_lsn = var->create_lsn; + + svar->isnull = true; + svar->freeval = false; + svar->value = (Datum) 0; + + svar->typid = var->typid; + get_typlenbyval(var->typid, &svar->typlen, &svar->typbyval); + + svar->is_domain = (get_typtype(var->typid) == TYPTYPE_DOMAIN); + svar->domain_check_extra = NULL; + svar->domain_check_extra_lxid = InvalidLocalTransactionId; + + svar->drop_lxid = InvalidLocalTransactionId; + + svar->is_not_null = var->is_not_null; + svar->is_immutable = var->is_immutable; + svar->has_defexpr = var->has_defexpr; + + /* the value of variable is not known yet */ + svar->is_valid = false; + + svar->hashvalue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(var->oid)); + + svar->eox_reset = var->eoxaction == VARIABLE_EOX_RESET || + var->eoxaction == VARIABLE_EOX_DROP; + + oldcxt = MemoryContextSwitchTo(TopTransactionContext); + + if (svar->eox_reset) + xact_reset_varids = lappend_oid(xact_reset_varids, var->oid); + + MemoryContextSwitchTo(oldcxt); +} + +/* + * Search a seesion variable in the hash table given its oid. If it + * doesn't exist, then insert it (and calculate defexpr if it exists). + * + * Caller is responsible for doing permission checks. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +static SVariable +prepare_variable_for_reading(Oid varid) +{ + SVariable svar; + Variable var; + bool found; + + var.oid = InvalidOid; + + if (!sessionvars) + create_sessionvars_hashtables(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + /* + * Make sure that all entries in sessionvars hash table are valid, but + * keeping variables dropped by the current transaction. + */ + sync_sessionvars_all(true); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + /* Return content if it is available and valid */ + if (!found || !svar->is_valid) + { + /* We need to load defexpr. */ + InitVariable(&var, varid, false); + + if (!found) + { + init_session_variable(svar, &var); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new entry in memory (emitted by READ)", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid), + varid); + } + + /* + * Raise an error if this is a NOT NULL variable without default + * expression. + */ + if (var.is_not_null && !var.defexpr) + ereport(ERROR, + (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("null value is not allowed for NOT NULL session variable \"%s.%s\"", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("The session variable was not initialized yet."))); + + if (svar->has_defexpr) + { + Datum value = (Datum) 0; + bool isnull; + EState *estate = NULL; + Expr *defexpr; + ExprState *defexprs; + MemoryContext oldcxt; + + /* Prepare default expr */ + estate = CreateExecutorState(); + + oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); + + defexpr = expression_planner((Expr *) var.defexpr); + defexprs = ExecInitExpr(defexpr, NULL); + value = ExecEvalExprSwitchContext(defexprs, + GetPerTupleExprContext(estate), + &isnull); + + MemoryContextSwitchTo(oldcxt); + + /* Store result before releasing Executor memory */ + set_session_variable(svar, value, isnull, true); + + FreeExecutorState(estate); + } + else + set_session_variable(svar, (Datum) 0, true, true); + } + + /* + * Although the value of domain type should be valid (it is checked when + * it is assigned to session variable), we have to check related + * constraints each time we access the variable. It can be more expensive + * than in PL/pgSQL, as PL/pgSQL forces domain checks only when the value is assigned + * to the variable or when the value is returned from function. + * However, domain types have a constraint cache so it's not too much + * expensive.. + */ + if (svar->is_domain) + { + /* + * Store domain_check extra in TopTransactionContext. When we are in + * other transaction, the domain_check_extra cache is not valid + * anymore. + */ + if (svar->domain_check_extra_lxid != MyProc->lxid) + svar->domain_check_extra = NULL; + + domain_check(svar->value, svar->isnull, + svar->typid, &svar->domain_check_extra, + TopTransactionContext); + + svar->domain_check_extra_lxid = MyProc->lxid; + } + + return svar; +} + +/* + * Store the given value in an SVariable, and cache it if not already present. + * + * Caller is responsible for doing permission checks. + * + * As side effect this function acquires AccessShareLock on + * related session variable until the end of the transaction. + */ +void +SetSessionVariable(Oid varid, Datum value, bool isNull) +{ + SVariable svar; + bool found; + + if (!sessionvars) + create_sessionvars_hashtables(); + + /* Protect used session variable against drop until transaction end */ + LockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + /* + * Make sure that all entries in sessionvars hash table are valid, but + * keeping variables dropped by the current transaction. + */ + sync_sessionvars_all(true); + + svar = (SVariable) hash_search(sessionvars, &varid, + HASH_ENTER, &found); + + if (!found) + { + Variable var; + + /* We don't need to know defexpr here */ + InitVariable(&var, varid, true); + init_session_variable(svar, &var); + + elog(DEBUG1, "session variable \"%s.%s\" (oid:%u) has new entry in memory (emitted by WRITE)", + get_namespace_name(get_session_variable_namespace(svar->varid)), + get_session_variable_name(svar->varid), + varid); + } + + /* + * This should either succeed or fail without changing the currently stored + * value. + */ + set_session_variable(svar, value, isNull, false); +} + +/* + * Wrapper around SetSessionVariable after checking for correct permission. + */ +void +SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull) +{ + AclResult aclresult; + + /* + * Is caller allowed to update the session variable? + */ + aclresult = object_aclcheck(VariableRelationId, varid, GetUserId(), ACL_UPDATE); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, get_session_variable_name(varid)); + + SetSessionVariable(varid, value, isNull); +} + +/* + * Returns a copy of the value of the session variable specified by its oid. + * Caller is responsible for doing permission checks. + */ +Datum +CopySessionVariable(Oid varid, bool *isNull, Oid *typid) +{ + SVariable svar; + Datum result; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + *typid = svar->typid; + + /* force copy of non NULL value */ + if (!svar->isnull) + { + result = datumCopy(svar->value, svar->typbyval, svar->typlen); + *isNull = false; + } + else + { + result = (Datum) 0; + *isNull = true; + } + + return (Datum) result; +} + +/* + * Returns a copy of ths value of the session variable specified by its oid + * with a check of the expected type. Like previous CopySessionVariable, the + * caller is responsible for doing permission checks. + */ +Datum +CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + Datum result; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + if (!svar->isnull) + { + result = datumCopy(svar->value, svar->typbyval, svar->typlen); + *isNull = false; + } + else + { + result = (Datum) 0; + *isNull = true; + } + + return (Datum) result; +} + +/* + * Returns the value of the session variable specified by its oid with a check + * of the expected type. Like CopySessionVariable, the caller is responsible + * for doing permission checks. + */ +Datum +GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid) +{ + SVariable svar; + + svar = prepare_variable_for_reading(varid); + Assert(svar != NULL && svar->is_valid); + + return CopySessionVariableWithTypeCheck(varid, isNull, expected_typid); + + if (expected_typid != svar->typid) + elog(ERROR, "type of variable \"%s.%s\" is different than expected", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)); + + *isNull = svar->isnull; + + return svar->value; +} + +/* + * Do the necessary work to setup local memory management of a new + * variable. + * + * Caller should already have created the necessary entry in catalog + * and made them visible. + */ +void +SessionVariableCreatePostprocess(Oid varid, char eoxaction) +{ + /* + * For temporary variables, we need to create a new end of xact action to + * ensure deletion from catalog. + */ + if (eoxaction == VARIABLE_EOX_DROP) + { + Assert(isTempNamespace(get_session_variable_namespace(varid))); + + register_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + } +} + +/* + * Handle the local memory cleanup for a DROP VARIABLE command. + * + * Caller should take care of removing the pg_variable entry first. + */ +void +SessionVariableDropPostprocess(Oid varid) +{ + /* + * The entry was removed from catalog already, we must not do it + * again at end of xact time. + */ + unregister_session_variable_xact_action(varid, SVAR_ON_COMMIT_DROP); + + if (sessionvars) + { + bool found; + SVariable svar = (SVariable) hash_search(sessionvars, &varid, + HASH_FIND, &found); + + if (found) + { + /* + * Save the current top level local transaction id to make sure we + * don't automatically remove the local variable storage in + * sync_sessionvars_all, as the DROP VARIABLE will send an + * invalidation message. + */ + Assert(LocalTransactionIdIsValid(MyProc->lxid)); + svar->drop_lxid = MyProc->lxid; + + /* + * For variables that are not ON TRANSACTION END RESET, we need to + * register an SVAR_ON_COMMIT_RESET action to free the local + * memory for this variable when the top level transaction + * is committed (we don't need to wait for sinval + * message). The cleanup action for one session variable can be + * duplicated in the action list without causing any problem, so we + * don't need to ensure uniqueness. We need a different action + * from RESET, because RESET is executed on any transaction end, + * but we want to execute this cleanup only when the current + * transaction will be committed. This action can be reverted by + * ABORT of DROP VARIABLE command. + */ + if (!svar->eox_reset) + register_session_variable_xact_action(varid, + SVAR_ON_COMMIT_RESET); + } + } +} + +/* + * Fast drop of the complete content of all session variables hash table, and + * cleanup of any list that wouldn't be relevant anymore. + * This is used by DISCARD VARIABLES (and DISCARD ALL) command. + */ +void +ResetSessionVariables(void) +{ + ListCell *lc; + + /* Destroy hash table and reset related memory context */ + if (sessionvars) + { + hash_destroy(sessionvars); + sessionvars = NULL; + } + + /* Release memory allocated by session variables */ + if (SVariableMemoryContext != NULL) + MemoryContextReset(SVariableMemoryContext); + + /* + * There isn't any session variable left, but we still need to retain the + * ON COMMIT DROP actions if any. + */ + foreach(lc, xact_on_commit_actions) + { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(lc); + + if (xact_ai->action == SVAR_ON_COMMIT_DROP) + continue; + + pfree(xact_ai); + xact_on_commit_actions = foreach_delete_current(xact_on_commit_actions, + lc); + } + + /* We should clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; + + /* + * xact_recheck_varids is stored in SVariableMemoryContext, so it has + * already been freed, just reset the list. + */ + xact_recheck_varids = NIL; +} + +/* + * Perform the necessary work for ON TRANSACTION END RESET and ON COMMIT DROP + * session variables. + * If the transaction is committed, also process the delayed memory cleanup of + * local DROP VARIABLE and process all pending rechecks. */ void AtPreEOXact_SessionVariable(bool isCommit) { ListCell *l; - foreach(l, xact_drop_actions) + /* + * Clean memory for all ON TRANSACTION END RESET variables. Do it first, + * as it reduces the overhead of the RECHECK action list. + */ + foreach(l, xact_reset_varids) { - SVariableXActActionItem *xact_ai = - (SVariableXActActionItem *) lfirst(l); + remove_session_variable_by_id(lfirst_oid(l)); + } - /* Iterate only over entries that are still pending */ - if (xact_ai->deleting_subid == InvalidSubTransactionId) + /* We can now clean xact_reset_varids */ + list_free(xact_reset_varids); + xact_reset_varids = NIL; + + if (isCommit && xact_on_commit_actions) + { + foreach(l, xact_on_commit_actions) { + SVariableXActActionItem *xact_ai = + (SVariableXActActionItem *) lfirst(l); + + /* Iterate only over entries that are still pending */ + if (xact_ai->deleting_subid != InvalidSubTransactionId) + continue; /* - * ON COMMIT DROP is allowed only for temp session variables. So - * we should explicitly delete only when current transaction was - * committed. When it's rollback, then session variable is removed - * automatically. + * ON COMMIT DROP is allowed only for temp session variables. + * So we should explicitly delete only when the current + * transaction is committed. When it's rollbacked, the session + * variable is removed automatically. */ - if (isCommit) + if (xact_ai->action == SVAR_ON_COMMIT_DROP) { ObjectAddress object; @@ -191,10 +1148,30 @@ AtPreEOXact_SessionVariable(bool isCommit) elog(DEBUG1, "session variable (oid:%u) will be deleted (forced by SVAR_ON_COMMIT_DROP action)", xact_ai->varid); + /* + * If the variable was locally set, the memory will be + * automatically cleaned up when we process the underlying + * shared invalidation for this drop. There can't be a recheck + * action for this variable, so there's nothing to gain + * explicitly removing it here. + */ performDeletion(&object, DROP_CASCADE, PERFORM_DELETION_INTERNAL | PERFORM_DELETION_QUIETLY); } + else + { + /* + * When we process DROP VARIABLE statement issued by the + * current transaction, we create an SVAR_ON_COMMIT_RESET xact + * action. We want to process this action only when related + * transaction is commited (when DROP VARIABLE statement + * sucessfully processed) as we need to preserve the variable + * content if the transaction that issued the DROP VARAIBLE + * statement is rollbacked. + */ + remove_session_variable_by_id(xact_ai->varid); + } } } @@ -202,12 +1179,26 @@ AtPreEOXact_SessionVariable(bool isCommit) * Any drop action left is an entry that was unregistered and not * rollbacked, so we can simply remove them. */ - list_free_deep(xact_drop_actions); - xact_drop_actions = NIL; + list_free_deep(xact_on_commit_actions); + xact_on_commit_actions = NIL; + + /* + * We process the list of recheck last for performance reason,the previous + * steps might remove entries from the hash table. + * We need catalog access to process the recheck, so this can only be done + * if the transaction is committed. Otherwise, we just keep the recheck + * list as-is and it will be processed at the next (committed) transaction. + */ + if (isCommit && xact_recheck_varids) + { + Assert(sessionvars); + + sync_sessionvars_all(false); + } } /* - * Post-subcommit or post-subabort cleanup of xact action list. + * Post-subcommit or post-subabort cleanup of xact_on_commit_actions list. * * During subabort, we can immediately remove entries created during this * subtransaction. During subcommit, just transfer entries marked during @@ -220,24 +1211,130 @@ AtEOSubXact_SessionVariable(bool isCommit, { ListCell *cur_item; - foreach(cur_item, xact_drop_actions) + foreach(cur_item, xact_on_commit_actions) { SVariableXActActionItem *xact_ai = (SVariableXActActionItem *) lfirst(cur_item); + /* + * The subtransaction that created this entry was rollbacked, we can + * remove it. + */ if (!isCommit && xact_ai->creating_subid == mySubid) { /* cur_item must be removed */ - xact_drop_actions = foreach_delete_current(xact_drop_actions, cur_item); + xact_on_commit_actions = foreach_delete_current(xact_on_commit_actions, cur_item); pfree(xact_ai); } else { - /* cur_item must be preserved */ + /* Otherwise cur_item must be preserved */ if (xact_ai->creating_subid == mySubid) xact_ai->creating_subid = parentSubid; if (xact_ai->deleting_subid == mySubid) - xact_ai->deleting_subid = isCommit ? parentSubid : InvalidSubTransactionId; + xact_ai->deleting_subid = isCommit ? parentSubid + : InvalidSubTransactionId; + } + } +} + +/* + * pg_session_variables - designed for testing + * + * This is a function designed for testing and debugging. It returns the + * content of sessionvars as-is, and can therefore display entries about + * session variables that were dropped but for which this backend didn't + * process the shared invalidations yet. + */ +Datum +pg_session_variables(PG_FUNCTION_ARGS) +{ +#define NUM_PG_SESSION_VARIABLES_ATTS 10 + + elog(DEBUG1, "pg_session_variables start"); + + InitMaterializedSRF(fcinfo, 0); + + if (sessionvars) + { + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + HASH_SEQ_STATUS status; + SVariable svar; + + /* + * Make sure that all entries in sessionvars hash table are valid, but + * keeping variables dropped by the current transaction. + */ + sync_sessionvars_all(true); + + hash_seq_init(&status, sessionvars); + + while ((svar = (SVariable) hash_seq_search(&status)) != NULL) + { + Datum values[NUM_PG_SESSION_VARIABLES_ATTS]; + bool nulls[NUM_PG_SESSION_VARIABLES_ATTS]; + HeapTuple tp; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + values[0] = ObjectIdGetDatum(svar->varid); + values[3] = ObjectIdGetDatum(svar->typid); + + /* check if session variable is visible in system catalog */ + tp = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(svar->varid)); + + /* + * Sessionvars can hold data of variables removed from catalog, + * (and not purged) and then namespacename and name cannot be read + * from catalog. + */ + if (HeapTupleIsValid(tp)) + { + Form_pg_variable varform = (Form_pg_variable) GETSTRUCT(tp); + + /* When we see data in catalog */ + values[1] = CStringGetTextDatum( + get_namespace_name(varform->varnamespace)); + + values[2] = CStringGetTextDatum(NameStr(varform->varname)); + + values[4] = CStringGetTextDatum(format_type_be(svar->typid)); + values[5] = BoolGetDatum(false); + values[6] = BoolGetDatum(svar->is_valid); + + values[8] = BoolGetDatum( + object_aclcheck(VariableRelationId, svar->varid, + GetUserId(), ACL_SELECT) == ACLCHECK_OK); + + values[9] = BoolGetDatum( + object_aclcheck(VariableRelationId, svar->varid, + GetUserId(), ACL_UPDATE) == ACLCHECK_OK); + + ReleaseSysCache(tp); + } + else + { + /* + * When session variable was removed from catalog, but we + * haven't processed the invlidation yet. + */ + nulls[1] = true; + values[2] = CStringGetTextDatum( + DatumGetCString(DirectFunctionCall1(oidout, svar->varid))); + values[4] = PointerGetDatum( + cstring_to_text(format_type_be(svar->typid))); + values[5] = BoolGetDatum(true); + values[6] = BoolGetDatum(svar->is_valid); + nulls[7] = true; + nulls[8] = true; + } + + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); } } + + elog(DEBUG1, "pg_session_variables end"); + + return (Datum) 0; } diff --git a/src/backend/executor/execExpr.c b/src/backend/executor/execExpr.c index 81429b9f05..e9b23be595 100644 --- a/src/backend/executor/execExpr.c +++ b/src/backend/executor/execExpr.c @@ -34,6 +34,7 @@ #include "catalog/objectaccess.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -995,6 +996,81 @@ ExecInitExprRec(Expr *node, ExprState *state, scratch.d.param.paramtype = param->paramtype; ExprEvalPushStep(state, &scratch); break; + + case PARAM_VARIABLE: + { + int es_num_session_variables = 0; + SessionVariableValue *es_session_variables = NULL; + + if (state->parent && state->parent->state) + { + es_session_variables = state->parent->state->es_session_variables; + es_num_session_variables = state->parent->state->es_num_session_variables; + } + + if (es_session_variables) + { + SessionVariableValue *var; + + /* + * Use buffered session variables when the + * buffer with copied values is avaiable + * (standard query executor mode) + */ + + /* Parameter sanity checks. */ + if (param->paramid >= es_num_session_variables) + elog(ERROR, "paramid of PARAM_VARIABLE param is out of range"); + + var = &es_session_variables[param->paramid]; + + if (var->typid != param->paramtype) + elog(ERROR, "type of buffered value is different than PARAM_VARIABLE type"); + + /* + * In this case, pass the value like + * a constant. + */ + scratch.opcode = EEOP_CONST; + scratch.d.constval.value = var->value; + scratch.d.constval.isnull = var->isnull; + ExprEvalPushStep(state, &scratch); + } + else + { + AclResult aclresult; + Oid varid = param->paramvarid; + Oid vartype = param->paramtype; + + /* + * When the expression is evaluated directly + * without query executor start (plpgsql simple + * expr evaluation), then the array es_session_variables + * is null. In this case we need to use direct + * access to session variables. The values are + * not protected by using copy, but it is not + * problem (we don't need to emulate stability + * of the value). + * + * In this case we should to do aclcheck, because + * usual aclcheck from standard_ExecutorStart + * is not executed in this case. Fortunately + * it is just once per transaction. + */ + aclresult = object_aclcheck(VariableRelationId, varid, + GetUserId(), ACL_SELECT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + scratch.opcode = EEOP_PARAM_VARIABLE; + scratch.d.vparam.varid = varid; + scratch.d.vparam.vartype = vartype; + ExprEvalPushStep(state, &scratch); + } + } + break; + case PARAM_EXTERN: /* diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 1dab2787b7..d3f044962c 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -59,6 +59,7 @@ #include "access/heaptoast.h" #include "catalog/pg_type.h" #include "commands/sequence.h" +#include "commands/session_variable.h" #include "executor/execExpr.h" #include "executor/nodeSubplan.h" #include "funcapi.h" @@ -446,6 +447,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) &&CASE_EEOP_PARAM_EXEC, &&CASE_EEOP_PARAM_EXTERN, &&CASE_EEOP_PARAM_CALLBACK, + &&CASE_EEOP_PARAM_VARIABLE, &&CASE_EEOP_CASE_TESTVAL, &&CASE_EEOP_MAKE_READONLY, &&CASE_EEOP_IOCOERCE, @@ -1081,6 +1083,21 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) EEO_NEXT(); } + EEO_CASE(EEOP_PARAM_VARIABLE) + { + /* + * direct access to session variable (without buffering). + * Because returned value can be used (without an assignement) + * after the referenced session variables is updated, we have + * to return copy of stored value every time. This is not an + * issue for local (plpgsql) variables. + */ + *op->resvalue = CopySessionVariableWithTypeCheck(op->d.vparam.varid, + op->resnull, + op->d.vparam.vartype); + EEO_NEXT(); + } + EEO_CASE(EEOP_CASE_TESTVAL) { /* diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 12ff4f3de5..ddaece018c 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -48,6 +48,7 @@ #include "catalog/pg_publication.h" #include "commands/matview.h" #include "commands/trigger.h" +#include "commands/session_variable.h" #include "executor/execdebug.h" #include "executor/nodeSubplan.h" #include "foreign/fdwapi.h" @@ -200,6 +201,63 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) Assert(queryDesc->sourceText != NULL); estate->es_sourceText = queryDesc->sourceText; + /* + * The executor doesn't work with session variables directly. Values of + * related session variables are copied to dedicated array, and this array + * is passed to executor. + */ + if (queryDesc->num_session_variables > 0) + { + /* + * When a parallel query needs to access query parameters (including + * related session variables), then related session variables are + * restored (deserialized) in queryDesc already. So just push pointer + * of this array to executor's estate. + */ + Assert(IsParallelWorker()); + estate->es_session_variables = queryDesc->session_variables; + estate->es_num_session_variables = queryDesc->num_session_variables; + } + else if (queryDesc->plannedstmt->sessionVariables) + { + ListCell *lc; + int nSessionVariables; + int i = 0; + + /* + * In this case, the query uses session variables, but we have to + * prepare the array with passed values (of used session variables) + * first. + */ + Assert(!IsParallelWorker()); + nSessionVariables = list_length(queryDesc->plannedstmt->sessionVariables); + + /* Create the array used for passing values of used session variables */ + estate->es_session_variables = (SessionVariableValue *) + palloc(nSessionVariables * sizeof(SessionVariableValue)); + + /* Fill the array */ + foreach(lc, queryDesc->plannedstmt->sessionVariables) + { + AclResult aclresult; + Oid varid = lfirst_oid(lc); + + aclresult = object_aclcheck(VariableRelationId, varid, GetUserId(), ACL_SELECT); + if (aclresult != ACLCHECK_OK) + aclcheck_error(aclresult, OBJECT_VARIABLE, + get_session_variable_name(varid)); + + estate->es_session_variables[i].varid = varid; + estate->es_session_variables[i].value = CopySessionVariable(varid, + &estate->es_session_variables[i].isnull, + &estate->es_session_variables[i].typid); + + i++; + } + + estate->es_num_session_variables = nSessionVariables; + } + /* * Fill in the query environment, if any, from queryDesc. */ diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index aca0c6f323..ac15eafcb0 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,8 +12,9 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer/WAL usage info, and - * the actual plan to be passed down to the worker. + * any ParamListInfo associated with the query, buffer/WAL usage info, + * session variables buffer, and the actual plan to be passed down to + * the worker. * * IDENTIFICATION * src/backend/executor/execParallel.c @@ -66,6 +67,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -140,6 +142,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +/* Helper functions that can pass values of session variables */ +static Size EstimateSessionVariables(EState *estate); +static void SerializeSessionVariables(EState *estate, char **start_address); +static SessionVariableValue *RestoreSessionVariables(char **start_address, + int *num_session_variables); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -598,6 +606,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; + char *session_variables_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; @@ -607,6 +616,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; + int session_variables_len = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; @@ -662,6 +672,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized session variables. */ + session_variables_len = EstimateSessionVariables(estate); + shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -756,6 +771,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); + /* Store serialized session variables. */ + session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space); + SerializeSessionVariables(estate, &session_variables_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -1403,6 +1423,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; + char *sessionvariable_space; dsa_area *area; ParallelWorkerContext pwcxt; @@ -1428,6 +1449,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); + /* Reconstruct session variables. */ + sessionvariable_space = shm_toc_lookup(toc, + PARALLEL_KEY_SESSION_VARIABLES, + false); + queryDesc->session_variables = + RestoreSessionVariables(&sessionvariable_space, + &queryDesc->num_session_variables); + /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; ExecutorStart(queryDesc, fpes->eflags); @@ -1496,3 +1525,117 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate the amount of space required to serialize a session variable. + */ +static Size +EstimateSessionVariables(EState *estate) +{ + int i; + Size sz = sizeof(int); + + if (estate->es_session_variables == NULL) + return sz; + + for (i = 0; i < estate->es_num_session_variables; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + + typeOid = svarval->typid; + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + sz = add_size(sz, + datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a session variables buffer into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, and then the datum as serialized by + * datumSerialize(). The caller is responsible for ensuring that there is + * enough storage to store the number of bytes that will be written; use + * EstimateSessionVariables to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreSessionVariables can be used to recreate a session variable buffer + * based on the serialized representation; + */ +static void +SerializeSessionVariables(EState *estate, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + nparams = estate->es_num_session_variables; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + typeOid = svarval->typid; + + /* Write type OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + datumSerialize(svarval->value, svarval->isnull, typByVal, typLen, + start_address); + } +} + +static SessionVariableValue * +RestoreSessionVariables(char **start_address, int *num_session_variables) +{ + SessionVariableValue *session_variables; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + *num_session_variables = nparams; + session_variables = (SessionVariableValue *) + palloc(nparams * sizeof(SessionVariableValue)); + + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval = &session_variables[i]; + + /* Read type OID. */ + memcpy(&svarval->typid, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + svarval->value = datumRestore(start_address, &svarval->isnull); + } + + return session_variables; +} diff --git a/src/backend/jit/llvm/llvmjit_expr.c b/src/backend/jit/llvm/llvmjit_expr.c index f114337f8e..6e3c4d0481 100644 --- a/src/backend/jit/llvm/llvmjit_expr.c +++ b/src/backend/jit/llvm/llvmjit_expr.c @@ -1073,6 +1073,12 @@ llvm_compile_expr(ExprState *state) LLVMBuildBr(b, opblocks[opno + 1]); break; + case EEOP_PARAM_VARIABLE: + build_EvalXFunc(b, mod, "ExecEvalParamVariable", + v_state, op, v_econtext); + LLVMBuildBr(b, opblocks[opno + 1]); + break; + case EEOP_PARAM_CALLBACK: { LLVMTypeRef v_functype; diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index 799602f5ea..295b940a4d 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -317,6 +317,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, glob->lastPlanNodeId = 0; glob->transientPlan = false; glob->dependsOnRole = false; + glob->sessionVariables = NIL; /* * Assess whether it's feasible to use parallel mode for this query. We @@ -531,6 +532,7 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, result->paramExecTypes = glob->paramExecTypes; /* utilityStmt should be null, but we might as well copy it */ result->utilityStmt = parse->utilityStmt; + result->sessionVariables = glob->sessionVariables; result->stmt_location = parse->stmt_location; result->stmt_len = parse->stmt_len; @@ -686,6 +688,12 @@ subquery_planner(PlannerGlobal *glob, Query *parse, */ pull_up_subqueries(root); + /* + * Check if some subquery uses session variable. Flag hasSessionVariables + * should be true if query or some subquery uses any session variable. + */ + pull_up_has_session_variables(root); + /* * If this is a simple UNION ALL query, flatten it into an appendrel. We * do this now because it requires applying pull_up_subqueries to the leaf diff --git a/src/backend/optimizer/plan/setrefs.c b/src/backend/optimizer/plan/setrefs.c index e67f0e3509..470b99e006 100644 --- a/src/backend/optimizer/plan/setrefs.c +++ b/src/backend/optimizer/plan/setrefs.c @@ -181,6 +181,8 @@ static List *set_returning_clause_references(PlannerInfo *root, static List *set_windowagg_runcondition_references(PlannerInfo *root, List *runcondition, Plan *plan); +static bool pull_up_has_session_variables_walker(Node *node, + PlannerInfo *root); /***************************************************************************** @@ -1243,6 +1245,50 @@ set_plan_refs(PlannerInfo *root, Plan *plan, int rtoffset) return plan; } +/* + * Search usage of session variables in subqueries + */ +void +pull_up_has_session_variables(PlannerInfo *root) +{ + Query *query = root->parse; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + } + else + { + (void) query_tree_walker(query, + pull_up_has_session_variables_walker, + (void *) root, 0); + } +} + +static bool +pull_up_has_session_variables_walker(Node *node, PlannerInfo *root) +{ + if (node == NULL) + return false; + if (IsA(node, Query)) + { + Query *query = (Query *) node; + + if (query->hasSessionVariables) + { + root->hasSessionVariables = true; + return false; + } + + /* Recurse into subselects */ + return query_tree_walker((Query *) node, + pull_up_has_session_variables_walker, + (void *) root, 0); + } + return expression_tree_walker(node, pull_up_has_session_variables_walker, + (void *) root); +} + /* * set_indexonlyscan_references * Do set_plan_references processing on an IndexOnlyScan @@ -1838,8 +1884,9 @@ copyVar(Var *var) * This is code that is common to all variants of expression-fixing. * We must look up operator opcode info for OpExpr and related nodes, * add OIDs from regclass Const nodes into root->glob->relationOids, and - * add PlanInvalItems for user-defined functions into root->glob->invalItems. - * We also fill in column index lists for GROUPING() expressions. + * add PlanInvalItems for user-defined functions and session variables into + * root->glob->invalItems. We also fill in column index lists for GROUPING() + * expressions. * * We assume it's okay to update opcode info in-place. So this could possibly * scribble on the planner's input data structures, but it's OK. @@ -1929,15 +1976,39 @@ fix_expr_common(PlannerInfo *root, Node *node) g->cols = cols; } } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + PlanInvalItem *inval_item = makeNode(PlanInvalItem); + + /* paramid is still session variable id */ + inval_item->cacheId = VARIABLEOID; + inval_item->hashValue = GetSysCacheHashValue1(VARIABLEOID, + ObjectIdGetDatum(p->paramvarid)); + + /* Append this variable to global, register dependency */ + root->glob->invalItems = lappend(root->glob->invalItems, + inval_item); + } + } } /* * fix_param_node * Do set_plan_references processing on a Param + * Collect session variables list and replace variable oid by + * index to collected list. * * If it's a PARAM_MULTIEXPR, replace it with the appropriate Param from * root->multiexpr_params; otherwise no change is needed. * Just for paranoia's sake, we make a copy of the node in either case. + * + * If it's a PARAM_VARIABLE, then we collect used session variables in + * list root->glob->sessionVariable. We should to assign Param paramvarid + * too, and it is position of related session variable in mentioned list. */ static Node * fix_param_node(PlannerInfo *root, Param *p) @@ -1956,6 +2027,41 @@ fix_param_node(PlannerInfo *root, Param *p) elog(ERROR, "unexpected PARAM_MULTIEXPR ID: %d", p->paramid); return copyObject(list_nth(params, colno - 1)); } + + if (p->paramkind == PARAM_VARIABLE) + { + ListCell *lc; + int n = 0; + bool found = false; + + /* We will modify object */ + p = (Param *) copyObject(p); + + /* + * Now, we can actualize list of session variables, and we can + * complete paramid parameter. + */ + foreach(lc, root->glob->sessionVariables) + { + if (lfirst_oid(lc) == p->paramvarid) + { + p->paramid = n; + found = true; + break; + } + n += 1; + } + + if (!found) + { + root->glob->sessionVariables = lappend_oid(root->glob->sessionVariables, + p->paramvarid); + p->paramid = n; + } + + return (Node *) p; + } + return (Node *) copyObject(p); } @@ -2017,7 +2123,10 @@ fix_alternative_subplan(PlannerInfo *root, AlternativeSubPlan *asplan, * replacing Aggref nodes that should be replaced by initplan output Params, * choosing the best implementation for AlternativeSubPlans, * looking up operator opcode info for OpExpr and related nodes, - * and adding OIDs from regclass Const nodes into root->glob->relationOids. + * adding OIDs from regclass Const nodes into root->glob->relationOids, + * and assigning paramvarid to PARAM_VARIABLE params, and collecting + * of OIDs of session variables in root->glob->sessionVariables list + * (paramvarid is an position of related session variable in this list). * * 'node': the expression to be modified * 'rtoffset': how much to increment varnos by @@ -2039,7 +2148,8 @@ fix_scan_expr(PlannerInfo *root, Node *node, int rtoffset, double num_exec) root->multiexpr_params != NIL || root->glob->lastPHId != 0 || root->minmax_aggs != NIL || - root->hasAlternativeSubPlans) + root->hasAlternativeSubPlans || + root->hasSessionVariables) { return fix_scan_expr_mutator(node, &context); } diff --git a/src/backend/optimizer/prep/prepjointree.c b/src/backend/optimizer/prep/prepjointree.c index 2ea3ca734e..118b6e494a 100644 --- a/src/backend/optimizer/prep/prepjointree.c +++ b/src/backend/optimizer/prep/prepjointree.c @@ -1276,6 +1276,9 @@ pull_up_simple_subquery(PlannerInfo *root, Node *jtnode, RangeTblEntry *rte, /* If subquery had any RLS conditions, now main query does too */ parse->hasRowSecurity |= subquery->hasRowSecurity; + /* If subquery had session variables, now main query does too */ + parse->hasSessionVariables |= subquery->hasSessionVariables; + /* * subquery won't be pulled up if it hasAggs, hasWindowFuncs, or * hasTargetSRFs, so no work needed on those flags diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index bffc8112aa..8acd8ddf82 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -26,6 +26,7 @@ #include "catalog/pg_operator.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" +#include "commands/session_variable.h" #include "executor/executor.h" #include "executor/functions.h" #include "funcapi.h" @@ -804,16 +805,17 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXTERN Params or are - * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * either generated within workers or can be computed by the leader and - * then their value can be passed to workers. + * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE + * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they + * could be either generated within workers or can be computed by the + * leader and then their value can be passed to workers. */ else if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind == PARAM_EXTERN || + param->paramkind == PARAM_VARIABLE) return false; if (param->paramkind != PARAM_EXEC || @@ -2267,6 +2269,7 @@ convert_saop_to_hashed_saop_walker(Node *node, void *context) * value of the Param. * 2. Fold stable, as well as immutable, functions to constants. * 3. Reduce PlaceHolderVar nodes to their contained expressions. + * 4. Current value of session variable can be used for estimation too. *-------------------- */ Node * @@ -2389,6 +2392,29 @@ eval_const_expressions_mutator(Node *node, } } } + else if (param->paramkind == PARAM_VARIABLE && + context->estimate) + { + int16 typLen; + bool typByVal; + Datum pval; + bool isnull; + + get_typlenbyval(param->paramtype, + &typLen, &typByVal); + + pval = CopySessionVariableWithTypeCheck(param->paramvarid, + &isnull, + param->paramtype); + + return (Node *) makeConst(param->paramtype, + param->paramtypmod, + param->paramcollid, + (int) typLen, + pval, + isnull, + typByVal); + } /* * Not replaceable, so just copy the Param (no need to @@ -4756,21 +4782,43 @@ substitute_actual_parameters_mutator(Node *node, { if (node == NULL) return NULL; + + /* + * SQL functions can contain two different kind of params. The nodes with + * paramkind PARAM_EXTERN are related to function's arguments (and should + * be replaced in this step), because this is how we apply the function's + * arguments for an expression. + * + * The nodes with paramkind PARAM_VARIABLE are related to usage of session + * variables. The values of session variables are not passed to expression + * by expression arguments, so it should not be replaced here by + * function's arguments. Although we could substitute params related to + * immutable session variables with default expression by this default + * expression, it is safer to not do it. This way we don't have to run + * security checks here. There can be some performance loss, but an access + * to session variable is fast (and the result of default expression is + * immediately materialized and can be reused). + */ if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind != PARAM_EXTERN) + if (param->paramkind != PARAM_EXTERN && + param->paramkind != PARAM_VARIABLE) elog(ERROR, "unexpected paramkind: %d", (int) param->paramkind); - if (param->paramid <= 0 || param->paramid > context->nargs) - elog(ERROR, "invalid paramid: %d", param->paramid); - /* Count usage of parameter */ - context->usecounts[param->paramid - 1]++; + if (param->paramkind == PARAM_EXTERN) + { + if (param->paramid <= 0 || param->paramid > context->nargs) + elog(ERROR, "invalid paramid: %d", param->paramid); + + /* Count usage of parameter */ + context->usecounts[param->paramid - 1]++; - /* Select the appropriate actual arg and replace the Param with it */ - /* We don't need to copy at this time (it'll get done later) */ - return list_nth(context->args, param->paramid - 1); + /* Select the appropriate actual arg and replace the Param with it */ + /* We don't need to copy at this time (it'll get done later) */ + return list_nth(context->args, param->paramid - 1); + } } return expression_tree_mutator(node, substitute_actual_parameters_mutator, (void *) context); diff --git a/src/backend/parser/analyze.c b/src/backend/parser/analyze.c index 6688c2a865..d59fca31d2 100644 --- a/src/backend/parser/analyze.c +++ b/src/backend/parser/analyze.c @@ -525,6 +525,8 @@ transformDeleteStmt(ParseState *pstate, DeleteStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; + assign_query_collations(pstate, qry); /* this must be done after collations, for reliable comparison of exprs */ @@ -942,6 +944,7 @@ transformInsertStmt(ParseState *pstate, InsertStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1397,6 +1400,7 @@ transformSelectStmt(ParseState *pstate, SelectStmt *stmt) qry->hasWindowFuncs = pstate->p_hasWindowFuncs; qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; foreach(l, stmt->lockingClause) { @@ -1622,6 +1626,7 @@ transformValuesClause(ParseState *pstate, SelectStmt *stmt) qry->jointree = makeFromExpr(pstate->p_joinlist, NULL); qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); @@ -1871,6 +1876,7 @@ transformSetOperationStmt(ParseState *pstate, SelectStmt *stmt) qry->hasWindowFuncs = pstate->p_hasWindowFuncs; qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasAggs = pstate->p_hasAggs; + qry->hasSessionVariables = pstate->p_hasSessionVariables; foreach(l, lockingClause) { @@ -2409,6 +2415,7 @@ transformUpdateStmt(ParseState *pstate, UpdateStmt *stmt) qry->hasTargetSRFs = pstate->p_hasTargetSRFs; qry->hasSubLinks = pstate->p_hasSubLinks; + qry->hasSessionVariables = pstate->p_hasSessionVariables; assign_query_collations(pstate, qry); diff --git a/src/backend/parser/parse_expr.c b/src/backend/parser/parse_expr.c index 1bb8c5a850..069c5809bc 100644 --- a/src/backend/parser/parse_expr.c +++ b/src/backend/parser/parse_expr.c @@ -33,15 +33,17 @@ #include "parser/parse_relation.h" #include "parser/parse_target.h" #include "parser/parse_type.h" +#include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/date.h" #include "utils/lsyscache.h" #include "utils/timestamp.h" +#include "utils/typcache.h" #include "utils/xml.h" /* GUC parameters */ bool Transform_null_equals = false; - +bool session_variables_ambiguity_warning = false; static Node *transformExprRecurse(ParseState *pstate, Node *expr); static Node *transformParamRef(ParseState *pstate, ParamRef *pref); @@ -81,6 +83,9 @@ static Expr *make_distinct_op(ParseState *pstate, List *opname, Node *ltree, Node *rtree, int location); static Node *make_nulltest_from_distinct(ParseState *pstate, A_Expr *distincta, Node *arg); +static Node *makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location); /* @@ -424,6 +429,59 @@ transformIndirection(ParseState *pstate, A_Indirection *ind) return result; } +/* + * Returns true, when expression of kind allows using of + * session variables. + */ +static bool +expr_kind_allows_session_variables(ParseExprKind p_expr_kind) +{ + switch (p_expr_kind) + { + case EXPR_KIND_NONE: + Assert(false); /* can't happen */ + return false; + + case EXPR_KIND_OTHER: + case EXPR_KIND_JOIN_ON: + case EXPR_KIND_FROM_SUBSELECT: + case EXPR_KIND_FROM_FUNCTION: + case EXPR_KIND_WHERE: + case EXPR_KIND_HAVING: + case EXPR_KIND_FILTER: + case EXPR_KIND_WINDOW_PARTITION: + case EXPR_KIND_WINDOW_ORDER: + case EXPR_KIND_WINDOW_FRAME_RANGE: + case EXPR_KIND_WINDOW_FRAME_ROWS: + case EXPR_KIND_WINDOW_FRAME_GROUPS: + case EXPR_KIND_SELECT_TARGET: + case EXPR_KIND_INSERT_TARGET: + case EXPR_KIND_UPDATE_SOURCE: + case EXPR_KIND_UPDATE_TARGET: + case EXPR_KIND_MERGE_WHEN: + case EXPR_KIND_GROUP_BY: + case EXPR_KIND_ORDER_BY: + case EXPR_KIND_DISTINCT_ON: + case EXPR_KIND_LIMIT: + case EXPR_KIND_OFFSET: + case EXPR_KIND_RETURNING: + case EXPR_KIND_VALUES: + case EXPR_KIND_VALUES_SINGLE: + case EXPR_KIND_ALTER_COL_TRANSFORM: + case EXPR_KIND_EXECUTE_PARAMETER: + case EXPR_KIND_POLICY: + case EXPR_KIND_CALL_ARGUMENT: + case EXPR_KIND_COPY_WHERE: + case EXPR_KIND_LET_TARGET: + + /* okay */ + return true; + + default: + return false; + } +} + /* * Transform a ColumnRef. * @@ -772,6 +830,161 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) parser_errposition(pstate, cref->location))); } + /* + * There are contexts where session's variables are not allowed. The + * question is if we want to identify session's variables in these + * contexts? The code can be more simple, when we don't do it, but then we + * cannot to raise maybe useful message like "you cannot to use session + * variables here". On second hand, in this case the warnings about + * session's variable shadowing can be messy. + */ + if (expr_kind_allows_session_variables(pstate->p_expr_kind)) + { + Oid varid = InvalidOid; + char *attrname = NULL; + bool not_unique; + + /* + * Session variables are shadowed by columns, routine's variables or + * routine's arguments ever. We don't want to use session variable + * when it is not exactly shadowed, but RTE is valid like: + * + * CREATE TYPE T AS (c int); CREATE VARIABLE foo AS T; CREATE TABLE + * foo(a int, b int); + * + * SELECT foo.a, foo.b, foo.c FROM foo; + * + * This case can be messy and then we disallow it. When we know, so + * possible variable will be shadowed, we try to identify variable + * only when session_variables_ambiguity_warning is requested. + */ + if (node || + (!node && relname && crerr == CRERR_NO_COLUMN)) + { + /* + * In this path we just try (if it is wanted) detect if session + * variable is shadowed. + */ + if (session_variables_ambiguity_warning) + { + /* + * The AccessShareLock is created on related session variable. The lock + * will be kept for the whole transaction. + */ + varid = IdentifyVariable(cref->fields, &attrname, ¬_unique); + + /* This path will ending by WARNING. Unlock variable first */ + if (OidIsValid(varid)) + UnlockDatabaseObject(VariableRelationId, varid, 0, AccessShareLock); + + /* + * Some cases with ambiguous references can be solved without + * raising a warning. When there is a collision between column + * name (or label) and some session variable name, and when we + * know attribute name, then we can ignore the collision when: + * + * a) variable is of scalar type (then indirection cannot be + * applied on this session variable. + * + * b) when related variable has no field with the given + * attrname, then indirection cannot be applied on this + * session variable. + */ + if (OidIsValid(varid) && attrname && node) + { + Oid typid; + Oid collid; + int32 typmod; + + get_session_variable_type_typmod_collid(varid, + &typid, &typmod, + &collid); + + if (type_is_rowtype(typid)) + { + TupleDesc tupdesc; + bool found = false; + int i; + + /* slow part, I hope it will not be to often */ + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + for (i = 0; i < tupdesc->natts; i++) + { + if (namestrcmp(&(TupleDescAttr(tupdesc, i)->attname), attrname) == 0 && + !TupleDescAttr(tupdesc, i)->attisdropped) + { + found = true; + break; + } + } + + ReleaseTupleDesc(tupdesc); + + /* There is no composite variable with this field. */ + if (!found) + varid = InvalidOid; + } + else + /* There is no composite variable with this name. */ + varid = InvalidOid; + } + + /* + * Raise warning when session variable reference is still + * visible. + */ + if (OidIsValid(varid)) + { + if (node) + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s\" is shadowed", + NameListToString(cref->fields)), + errdetail("Session variables can be shadowed by columns, routine's variables and routine's arguments with the same name."), + parser_errposition(pstate, cref->location))); + else + /* session variable is shadowed by RTE */ + ereport(WARNING, + (errcode(ERRCODE_AMBIGUOUS_COLUMN), + errmsg("session variable \"%s.%s\" is shadowed", + get_namespace_name(get_session_variable_namespace(varid)), + get_session_variable_name(varid)), + errdetail("Session variables can be shadowed by tables or table's aliases with the same name."), + parser_errposition(pstate, cref->location))); + } + } + } + else + { + /* + * The AccessShareLock is created on related session variable. The lock + * will be kept for the whole transaction. + */ + varid = IdentifyVariable(cref->fields, &attrname, ¬_unique); + + if (OidIsValid(varid)) + { + Oid typid; + int32 typmod; + Oid collid; + + if (not_unique) + ereport(ERROR, + (errcode(ERRCODE_AMBIGUOUS_PARAMETER), + errmsg("session variable reference \"%s\" is ambiguous", + NameListToString(cref->fields)), + parser_errposition(pstate, cref->location))); + + get_session_variable_type_typmod_collid(varid, &typid, &typmod, + &collid); + + node = makeParamSessionVariable(pstate, + varid, typid, typmod, collid, + attrname, cref->location); + } + } + } + /* * Throw error if no translation found. */ @@ -806,6 +1019,64 @@ transformColumnRef(ParseState *pstate, ColumnRef *cref) return node; } +/* + * Generate param variable for reference to session variable + */ +static Node * +makeParamSessionVariable(ParseState *pstate, + Oid varid, Oid typid, int32 typmod, Oid collid, + char *attrname, int location) +{ + Param *param; + + param = makeNode(Param); + + param->paramkind = PARAM_VARIABLE; + param->paramvarid = varid; + param->paramtype = typid; + param->paramtypmod = typmod; + param->paramcollid = collid; + + pstate->p_hasSessionVariables = true; + + if (attrname != NULL) + { + TupleDesc tupdesc; + int i; + + tupdesc = lookup_rowtype_tupdesc(typid, typmod); + + for (i = 0; i < tupdesc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, i); + + if (strcmp(attrname, NameStr(att->attname)) == 0 && + !att->attisdropped) + { + /* Success, so generate a FieldSelect expression */ + FieldSelect *fselect = makeNode(FieldSelect); + + fselect->arg = (Expr *) param; + fselect->fieldnum = i + 1; + fselect->resulttype = att->atttypid; + fselect->resulttypmod = att->atttypmod; + /* save attribute's collation for parse_collate.c */ + fselect->resultcollid = att->attcollation; + + ReleaseTupleDesc(tupdesc); + return (Node *) fselect; + } + } + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("could not identify column \"%s\" in variable", attrname), + parser_errposition(pstate, location))); + } + + return (Node *) param; +} + static Node * transformParamRef(ParseState *pstate, ParamRef *pref) { diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index 52e2db6452..06a5bfbbfa 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -86,6 +86,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->queryEnv = queryEnv; qd->instrument_options = instrument_options; /* instrumentation wanted? */ + qd->num_session_variables = 0; + qd->session_variables = NULL; + /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; qd->estate = NULL; diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c index a20a1b069b..aa6b66c78a 100644 --- a/src/backend/utils/adt/ruleutils.c +++ b/src/backend/utils/adt/ruleutils.c @@ -38,6 +38,7 @@ #include "catalog/pg_statistic_ext.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" +#include "catalog/pg_variable.h" #include "commands/defrem.h" #include "commands/tablespace.h" #include "common/keywords.h" @@ -496,6 +497,7 @@ static char *generate_function_name(Oid funcid, int nargs, static char *generate_operator_name(Oid operid, Oid arg1, Oid arg2); static void add_cast_to(StringInfo buf, Oid typid); static char *generate_qualified_type_name(Oid typid); +static char *generate_session_variable_name(Oid varid); static text *string_to_text(char *str); static char *flatten_reloptions(Oid relid); static void get_reloptions(StringInfo buf, Datum reloptions); @@ -8044,6 +8046,14 @@ get_parameter(Param *param, deparse_context *context) return; } + /* translate paramvarid to session variable name */ + if (param->paramkind == PARAM_VARIABLE) + { + appendStringInfo(context->buf, "%s", + generate_session_variable_name(param->paramvarid)); + return; + } + /* * If it's an external parameter, see if the outermost namespace provides * function argument names. @@ -12064,6 +12074,42 @@ generate_collation_name(Oid collid) return result; } +/* + * generate_session_variable_name + * Compute the name to display for a session variable specified by OID + * + * The result includes all necessary quoting and schema-prefixing. + */ +static char * +generate_session_variable_name(Oid varid) +{ + HeapTuple tup; + Form_pg_variable varform; + char *varname; + char *nspname; + char *result; + + tup = SearchSysCache1(VARIABLEOID, ObjectIdGetDatum(varid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for variable %u", varid); + + varform = (Form_pg_variable) GETSTRUCT(tup); + + varname = NameStr(varform->varname); + + if (!VariableIsVisible(varid)) + nspname = get_namespace_name_or_temp(varform->varnamespace); + else + nspname = NULL; + + result = quote_qualified_identifier(nspname, varname); + + ReleaseSysCache(tup); + + return result; +} + /* * Given a C string, produce a TEXT datum. * diff --git a/src/backend/utils/cache/plancache.c b/src/backend/utils/cache/plancache.c index cc943205d3..b824bef98c 100644 --- a/src/backend/utils/cache/plancache.c +++ b/src/backend/utils/cache/plancache.c @@ -58,6 +58,7 @@ #include "access/transam.h" #include "catalog/namespace.h" +#include "catalog/pg_variable.h" #include "executor/executor.h" #include "miscadmin.h" #include "nodes/nodeFuncs.h" @@ -1865,9 +1866,12 @@ ScanQueryForLocks(Query *parsetree, bool acquire) /* * Recurse into sublink subqueries, too. But we already did the ones in - * the rtable and cteList. + * the rtable and cteList. We need to force recursive call for session + * variables too, to find and lock variables used in query (see + * ScanQueryWalker). */ - if (parsetree->hasSubLinks) + if (parsetree->hasSubLinks || + parsetree->hasSessionVariables) { query_tree_walker(parsetree, ScanQueryWalker, (void *) &acquire, @@ -1876,7 +1880,8 @@ ScanQueryForLocks(Query *parsetree, bool acquire) } /* - * Walker to find sublink subqueries for ScanQueryForLocks + * Walker to find sublink subqueries or referenced session variables + * for ScanQueryForLocks */ static bool ScanQueryWalker(Node *node, bool *acquire) @@ -1891,6 +1896,20 @@ ScanQueryWalker(Node *node, bool *acquire) ScanQueryForLocks(castNode(Query, sub->subselect), *acquire); /* Fall through to process lefthand args of SubLink */ } + else if (IsA(node, Param)) + { + Param *p = (Param *) node; + + if (p->paramkind == PARAM_VARIABLE) + { + if (acquire) + LockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + else + UnlockDatabaseObject(VariableRelationId, p->paramvarid, + 0, AccessShareLock); + } + } /* * Do NOT recurse into Query nodes, because ScanQueryForLocks already @@ -2022,7 +2041,9 @@ PlanCacheRelCallback(Datum arg, Oid relid) /* * PlanCacheObjectCallback - * Syscache inval callback function for PROCOID and TYPEOID caches + * Syscache inval callback function for TYPEOID, PROCOID, NAMESPACEOID, + * OPEROID, AMOPOPID, FOREIGNSERVEROID, FOREIGNDATAWRAPPEROID and VARIABLEOID + * caches. * * Invalidate all plans mentioning the object with the specified hash value, * or all plans mentioning any member of this cache if hashvalue == 0. diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index 3c210297aa..49b22d20e3 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -1902,9 +1902,13 @@ get_call_expr_arg_stable(Node *expr, int argnum) */ if (IsA(arg, Const)) return true; - if (IsA(arg, Param) && - ((Param *) arg)->paramkind == PARAM_EXTERN) - return true; + if (IsA(arg, Param)) + { + Param *p = (Param *) arg; + + if (p->paramkind == PARAM_EXTERN || p->paramkind == PARAM_VARIABLE) + return true; + } return false; } diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 1bf14eec66..213d9ebf1a 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -1498,6 +1498,16 @@ struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"session_variables_ambiguity_warning", PGC_USERSET, DEVELOPER_OPTIONS, + gettext_noop("Raise warning when reference to a session variable is ambiguous."), + NULL, + GUC_NOT_IN_SAMPLE + }, + &session_variables_ambiguity_warning, + false, + NULL, NULL, NULL + }, { {"db_user_namespace", PGC_SIGHUP, CONN_AUTH_AUTH, gettext_noop("Enables per-database user names."), diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h index 847c9fa6a0..e6873d369a 100644 --- a/src/include/catalog/namespace.h +++ b/src/include/catalog/namespace.h @@ -166,8 +166,10 @@ extern void SetTempNamespaceState(Oid tempNamespaceId, Oid tempToastNamespaceId); extern void ResetTempTableNamespace(void); +extern List *NamesFromList(List *names); extern Oid LookupVariable(const char *nspname, const char *varname, bool rowtype_only, bool missing_ok); +extern Oid IdentifyVariable(List *names, char **attrname, bool *not_unique); extern OverrideSearchPath *GetOverrideSearchPath(MemoryContext context); extern OverrideSearchPath *CopyOverrideSearchPath(OverrideSearchPath *path); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 77ddc3e779..ecbd56ab2a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11857,4 +11857,11 @@ prorettype => 'bytea', proargtypes => 'pg_brin_minmax_multi_summary', prosrc => 'brin_minmax_multi_summary_send' }, +{ oid => '8488', descr => 'list of used session variables', + proname => 'pg_session_variables', prorows => '1000', proretset => 't', + provolatile => 's', prorettype => 'record', proargtypes => '', + proallargtypes => '{oid,text,text,oid,text,bool,bool,bool,bool}', + proargmodes => '{o,o,o,o,o,o,o,o,o}', + proargnames => '{varid,schema,name,typid,typname,removed,has_value,can_select,can_update}', + prosrc => 'pg_session_variables' }, ] diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h index add1ae50a6..a4820aa21d 100644 --- a/src/include/commands/session_variable.h +++ b/src/include/commands/session_variable.h @@ -1,7 +1,7 @@ /*------------------------------------------------------------------------- * - * sessionvariable.h - * prototypes for sessionvariable.c. + * session_variable.h + * prototypes for session_variable.c. * * * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group @@ -26,6 +26,15 @@ extern void SessionVariableCreatePostprocess(Oid varid, char eoxaction); extern void SessionVariableDropPostprocess(Oid varid); +extern Datum CopySessionVariable(Oid varid, bool *isNull, Oid *typid); +extern Datum CopySessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); +extern Datum GetSessionVariableWithTypeCheck(Oid varid, bool *isNull, Oid expected_typid); + +extern void SetSessionVariable(Oid varid, Datum value, bool isNull); +extern void SetSessionVariableWithSecurityCheck(Oid varid, Datum value, bool isNull); + +extern void ResetSessionVariables(void); + extern void AtPreEOXact_SessionVariable(bool isCommit); extern void AtEOSubXact_SessionVariable(bool isCommit, SubTransactionId mySubid, diff --git a/src/include/executor/execExpr.h b/src/include/executor/execExpr.h index 0557302b92..1b411fc6d4 100644 --- a/src/include/executor/execExpr.h +++ b/src/include/executor/execExpr.h @@ -158,6 +158,7 @@ typedef enum ExprEvalOp EEOP_PARAM_EXEC, EEOP_PARAM_EXTERN, EEOP_PARAM_CALLBACK, + EEOP_PARAM_VARIABLE, /* return CaseTestExpr value */ EEOP_CASE_TESTVAL, @@ -381,6 +382,13 @@ typedef struct ExprEvalStep Oid paramtype; /* OID of parameter's datatype */ } param; + /* for EEOP_PARAM_VARIABLE */ + struct + { + Oid varid; /* OID of assigned variable */ + Oid vartype; /* OID of parameter's datatype */ + } vparam; + /* for EEOP_PARAM_CALLBACK */ struct { @@ -734,6 +742,9 @@ extern void ExecEvalParamExec(ExprState *state, ExprEvalStep *op, ExprContext *econtext); extern void ExecEvalParamExtern(ExprState *state, ExprEvalStep *op, ExprContext *econtext); +extern void ExecEvalParamVariable(ExprState *state, ExprEvalStep *op, + ExprContext *econtext); +extern void ExecEvalSQLValueFunction(ExprState *state, ExprEvalStep *op); extern void ExecEvalCurrentOfExpr(ExprState *state, ExprEvalStep *op); extern void ExecEvalNextValueExpr(ExprState *state, ExprEvalStep *op); extern void ExecEvalRowNull(ExprState *state, ExprEvalStep *op, diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index e79e2c001f..dbf4dc7ea0 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -48,6 +48,10 @@ typedef struct QueryDesc EState *estate; /* executor's query-wide state */ PlanState *planstate; /* tree of per-plan-node state */ + /* reference to session variables buffer */ + int num_session_variables; + SessionVariableValue *session_variables; + /* This field is set by ExecutorRun */ bool already_executed; /* true if previously executed */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 71248a9466..a69d62145e 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -597,6 +597,18 @@ typedef struct AsyncRequest * tuples) */ } AsyncRequest; +/* ---------------- + * SessionVariableValue + * ---------------- + */ +typedef struct SessionVariableValue +{ + Oid varid; + Oid typid; + bool isnull; + Datum value; +} SessionVariableValue; + /* ---------------- * EState information * @@ -649,6 +661,13 @@ typedef struct EState ParamListInfo es_param_list_info; /* values of external params */ ParamExecData *es_param_exec_vals; /* values of internal params */ + /* Variables info: */ + /* number of used session variables */ + int es_num_session_variables; + + /* array of copied values of session variables */ + SessionVariableValue *es_session_variables; + QueryEnvironment *es_queryEnv; /* query environment */ /* Other working state: */ diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 14bc2fcd07..5290db1231 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -148,6 +148,7 @@ typedef struct Query bool hasModifyingCTE; /* has INSERT/UPDATE/DELETE in WITH */ bool hasForUpdate; /* FOR [KEY] UPDATE/SHARE was specified */ bool hasRowSecurity; /* rewriter has applied some RLS policy */ + bool hasSessionVariables; /* uses session variables */ bool isReturn; /* is a RETURN statement */ diff --git a/src/include/nodes/pathnodes.h b/src/include/nodes/pathnodes.h index dd4eb8679d..b1190dbf5e 100644 --- a/src/include/nodes/pathnodes.h +++ b/src/include/nodes/pathnodes.h @@ -160,6 +160,9 @@ typedef struct PlannerGlobal /* partition descriptors */ PartitionDirectory partition_directory pg_node_attr(read_write_ignore); + + /* list of used session variables */ + List *sessionVariables; } PlannerGlobal; /* macro for fetching the Plan associated with a SubPlan node */ @@ -461,6 +464,8 @@ struct PlannerInfo bool placeholdersFrozen; /* true if planning a recursive WITH item */ bool hasRecursion; + /* true if session variables were used */ + bool hasSessionVariables; /* * Information about aggregates. Filled by preprocess_aggrefs(). diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 2e202892a7..c01a35d46d 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -95,6 +95,8 @@ typedef struct PlannedStmt Node *utilityStmt; /* non-null if this is utility stmt */ + List *sessionVariables; /* OIDs for PARAM_VARIABLE Params */ + /* statement location in source string (copied from Query) */ int stmt_location; /* start location, or -1 if unknown */ int stmt_len; /* length in bytes; 0 means "rest of string" */ diff --git a/src/include/nodes/primnodes.h b/src/include/nodes/primnodes.h index 74f228d959..1697cca7e4 100644 --- a/src/include/nodes/primnodes.h +++ b/src/include/nodes/primnodes.h @@ -43,7 +43,9 @@ typedef struct Alias List *colnames; /* optional list of column aliases */ } Alias; -/* What to do at commit time for temporary relations */ +/* + * What to do at commit time for temporary relations or session variables. + */ typedef enum OnCommitAction { ONCOMMIT_NOOP, /* No ON COMMIT clause (do nothing) */ @@ -296,13 +298,17 @@ typedef struct Const * of the `paramid' field contain the SubLink's subLinkId, and * the low-order 16 bits contain the column number. (This type * of Param is also converted to PARAM_EXEC during planning.) + * + * PARAM_VARIABLE: The parameter is an access to session variable + * paramid holds varid. */ typedef enum ParamKind { PARAM_EXTERN, PARAM_EXEC, PARAM_SUBLINK, - PARAM_MULTIEXPR + PARAM_MULTIEXPR, + PARAM_VARIABLE } ParamKind; typedef struct Param @@ -313,6 +319,7 @@ typedef struct Param Oid paramtype; /* pg_type OID of parameter's datatype */ int32 paramtypmod; /* typmod value, if known */ Oid paramcollid; /* OID of collation, or InvalidOid if none */ + Oid paramvarid; /* OID of session variable if it is used */ int location; /* token location, or -1 if unknown */ } Param; diff --git a/src/include/optimizer/planmain.h b/src/include/optimizer/planmain.h index 9dffdcfd1e..ebb14e250f 100644 --- a/src/include/optimizer/planmain.h +++ b/src/include/optimizer/planmain.h @@ -117,4 +117,6 @@ extern void record_plan_function_dependency(PlannerInfo *root, Oid funcid); extern void record_plan_type_dependency(PlannerInfo *root, Oid typid); extern bool extract_query_dependencies_walker(Node *node, PlannerInfo *context); +extern void pull_up_has_session_variables(PlannerInfo *root); + #endif /* PLANMAIN_H */ diff --git a/src/include/parser/parse_expr.h b/src/include/parser/parse_expr.h index c8e5c57b43..14b0adb948 100644 --- a/src/include/parser/parse_expr.h +++ b/src/include/parser/parse_expr.h @@ -17,6 +17,7 @@ /* GUC parameters */ extern PGDLLIMPORT bool Transform_null_equals; +extern PGDLLIMPORT bool session_variables_ambiguity_warning; extern Node *transformExpr(ParseState *pstate, Node *expr, ParseExprKind exprKind); diff --git a/src/include/parser/parse_node.h b/src/include/parser/parse_node.h index 4d65a9e9e8..d07f5b0584 100644 --- a/src/include/parser/parse_node.h +++ b/src/include/parser/parse_node.h @@ -212,6 +212,7 @@ struct ParseState bool p_hasTargetSRFs; bool p_hasSubLinks; bool p_hasModifyingCTE; + bool p_hasSessionVariables; Node *p_last_srf; /* most recent set-returning func/op found */ diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 8b08f64468..3a94570222 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -2452,6 +2452,7 @@ SerializedTransactionState Session SessionBackupState SessionEndType +SessionVariableValue SetConstraintState SetConstraintStateData SetConstraintTriggerData @@ -2636,6 +2637,8 @@ SupportRequestRows SupportRequestSelectivity SupportRequestSimplify SupportRequestWFuncMonotonic +SVariable +SVariableData SVariableXActAction SVariableXActActionItem Syn -- 2.38.1