From 395bd8eb74de4b9a96044f56e77174fea0afd071 Mon Sep 17 00:00:00 2001 From: Laurenz Albe Date: Wed, 13 Nov 2024 15:08:17 +0100 Subject: [PATCH 16/21] allow parallel execution queries with session variables --- doc/src/sgml/parallel.sgml | 6 - src/backend/commands/session_variable.c | 23 +++ src/backend/executor/execMain.c | 20 ++- src/backend/executor/execParallel.c | 147 +++++++++++++++++- src/backend/optimizer/util/clauses.c | 18 +-- src/backend/tcop/pquery.c | 3 + src/include/commands/session_variable.h | 1 + src/include/executor/execdesc.h | 4 + src/include/nodes/execnodes.h | 1 + .../regress/expected/session_variables.out | 12 +- 10 files changed, 206 insertions(+), 29 deletions(-) diff --git a/doc/src/sgml/parallel.sgml b/doc/src/sgml/parallel.sgml index 683dede6adc..1ce9abf86f5 100644 --- a/doc/src/sgml/parallel.sgml +++ b/doc/src/sgml/parallel.sgml @@ -515,12 +515,6 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%'; Plan nodes that reference a correlated SubPlan. - - - - Plan nodes that use a session variable. - - diff --git a/src/backend/commands/session_variable.c b/src/backend/commands/session_variable.c index 95dd7445c15..bb6abb792be 100644 --- a/src/backend/commands/session_variable.c +++ b/src/backend/commands/session_variable.c @@ -991,6 +991,29 @@ GetSessionVariable(Oid varid, bool *isNull) return copy_session_variable_value(svar, isNull); } +/* + * Returns a copy of the value of the session variable (in the current memory + * context) plus typid of the session variable. The caller is responsible for + * permission checks. + */ +Datum +GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid) +{ + SVariable svar; + + svar = get_session_variable(varid); + + *typid = svar->typid; + + /* + * Although "svar" is freshly validated in this point, svar->is_valid can + * be false, if an invalidation message was processed during the domain check. + * But the variable and all its dependencies are locked now, so we don't need + * to repeat the validation. + */ + return copy_session_variable_value(svar, isNull); +} + /* * Assign the result of the evaluated expression to the session variable */ diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 67e0b9ba2ac..1feba422bfb 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -220,7 +220,19 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) * be changed inside query execution time, and then a reference to * previously returned value can be corrupted). */ - if (queryDesc->plannedstmt->sessionVariables) + if (queryDesc->num_session_variables > 0) + { + /* + * When a parallel query needs to access query parameters (including + * related session variables), then related session variables are + * restored (deserialized) in queryDesc already. So just push pointer + * of this array to executor's estate. + */ + Assert(IsParallelWorker()); + estate->es_session_variables = queryDesc->session_variables; + estate->es_num_session_variables = queryDesc->num_session_variables; + } + else if (queryDesc->plannedstmt->sessionVariables) { int nSessionVariables; int i = 0; @@ -259,9 +271,9 @@ standard_ExecutorStart(QueryDesc *queryDesc, int eflags) get_session_variable_name(varid)); } - estate->es_session_variables[i].value = - GetSessionVariable(varid, - &estate->es_session_variables[i].isnull); + estate->es_session_variables[i].value = GetSessionVariableWithTypeid(varid, + &estate->es_session_variables[i].isnull, + &estate->es_session_variables[i].typid); i++; } diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 1bedb808368..aee66beac91 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -12,8 +12,9 @@ * workers and ensuring that their state generally matches that of the * leader; see src/backend/access/transam/README.parallel for details. * However, we must save and restore relevant executor state, such as - * any ParamListInfo associated with the query, buffer/WAL usage info, and - * the actual plan to be passed down to the worker. + * any ParamListInfo associated with the query, buffer/WAL usage info, + * session variables buffer, and the actual plan to be passed down to + * the worker. * * IDENTIFICATION * src/backend/executor/execParallel.c @@ -64,6 +65,7 @@ #define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008) #define PARALLEL_KEY_JIT_INSTRUMENTATION UINT64CONST(0xE000000000000009) #define PARALLEL_KEY_WAL_USAGE UINT64CONST(0xE00000000000000A) +#define PARALLEL_KEY_SESSION_VARIABLES UINT64CONST(0xE00000000000000B) #define PARALLEL_TUPLE_QUEUE_SIZE 65536 @@ -138,6 +140,12 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, /* Helper function that runs in the parallel worker. */ static DestReceiver *ExecParallelGetReceiver(dsm_segment *seg, shm_toc *toc); +/* Helper functions that can pass values of session variables */ +static Size EstimateSessionVariables(EState *estate); +static void SerializeSessionVariables(EState *estate, char **start_address); +static SessionVariableValue *RestoreSessionVariables(char **start_address, + int *num_session_variables); + /* * Create a serialized representation of the plan to be sent to each worker. */ @@ -598,6 +606,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, char *pstmt_data; char *pstmt_space; char *paramlistinfo_space; + char *session_variables_space; BufferUsage *bufusage_space; WalUsage *walusage_space; SharedExecutorInstrumentation *instrumentation = NULL; @@ -607,6 +616,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int instrumentation_len = 0; int jit_instrumentation_len = 0; int instrument_offset = 0; + int session_variables_len = 0; Size dsa_minsize = dsa_minimum_size(); char *query_string; int query_len; @@ -662,6 +672,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_estimate_chunk(&pcxt->estimator, paramlistinfo_len); shm_toc_estimate_keys(&pcxt->estimator, 1); + /* Estimate space for serialized session variables. */ + session_variables_len = EstimateSessionVariables(estate); + shm_toc_estimate_chunk(&pcxt->estimator, session_variables_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); + /* * Estimate space for BufferUsage. * @@ -763,6 +778,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, shm_toc_insert(pcxt->toc, PARALLEL_KEY_PARAMLISTINFO, paramlistinfo_space); SerializeParamList(estate->es_param_list_info, ¶mlistinfo_space); + /* Store serialized session variables. */ + session_variables_space = shm_toc_allocate(pcxt->toc, session_variables_len); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_VARIABLES, session_variables_space); + SerializeSessionVariables(estate, &session_variables_space); + /* Allocate space for each worker's BufferUsage; no need to initialize. */ bufusage_space = shm_toc_allocate(pcxt->toc, mul_size(sizeof(BufferUsage), pcxt->nworkers)); @@ -1420,6 +1440,7 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) SharedJitInstrumentation *jit_instrumentation; int instrument_options = 0; void *area_space; + char *sessionvariable_space; dsa_area *area; ParallelWorkerContext pwcxt; @@ -1445,6 +1466,14 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) area_space = shm_toc_lookup(toc, PARALLEL_KEY_DSA, false); area = dsa_attach_in_place(area_space, seg); + /* Reconstruct session variables. */ + sessionvariable_space = shm_toc_lookup(toc, + PARALLEL_KEY_SESSION_VARIABLES, + false); + queryDesc->session_variables = + RestoreSessionVariables(&sessionvariable_space, + &queryDesc->num_session_variables); + /* Start up the executor */ queryDesc->plannedstmt->jitFlags = fpes->jit_flags; if (!ExecutorStart(queryDesc, fpes->eflags)) @@ -1513,3 +1542,117 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc) FreeQueryDesc(queryDesc); receiver->rDestroy(receiver); } + +/* + * Estimate the amount of space required to serialize a session variable. + */ +static Size +EstimateSessionVariables(EState *estate) +{ + int i; + Size sz = sizeof(int); + + if (estate->es_session_variables == NULL) + return sz; + + for (i = 0; i < estate->es_num_session_variables; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + + typeOid = svarval->typid; + + sz = add_size(sz, sizeof(Oid)); /* space for type OID */ + + /* space for datum/isnull */ + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + sz = add_size(sz, + datumEstimateSpace(svarval->value, svarval->isnull, typByVal, typLen)); + } + + return sz; +} + +/* + * Serialize a session variables buffer into caller-provided storage. + * + * We write the number of parameters first, as a 4-byte integer, and then + * write details for each parameter in turn. The details for each parameter + * consist of a 4-byte type OID, and then the datum as serialized by + * datumSerialize(). The caller is responsible for ensuring that there is + * enough storage to store the number of bytes that will be written; use + * EstimateSessionVariables to find out how many will be needed. + * *start_address is updated to point to the byte immediately following those + * written. + * + * RestoreSessionVariables can be used to recreate a session variable buffer + * based on the serialized representation; + */ +static void +SerializeSessionVariables(EState *estate, char **start_address) +{ + int nparams; + int i; + + /* Write number of parameters. */ + nparams = estate->es_num_session_variables; + memcpy(*start_address, &nparams, sizeof(int)); + *start_address += sizeof(int); + + /* Write each parameter in turn. */ + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval; + Oid typeOid; + int16 typLen; + bool typByVal; + + svarval = &estate->es_session_variables[i]; + typeOid = svarval->typid; + + /* Write type OID. */ + memcpy(*start_address, &typeOid, sizeof(Oid)); + *start_address += sizeof(Oid); + + Assert(OidIsValid(typeOid)); + get_typlenbyval(typeOid, &typLen, &typByVal); + + datumSerialize(svarval->value, svarval->isnull, typByVal, typLen, + start_address); + } +} + +static SessionVariableValue * +RestoreSessionVariables(char **start_address, int *num_session_variables) +{ + SessionVariableValue *session_variables; + int i; + int nparams; + + memcpy(&nparams, *start_address, sizeof(int)); + *start_address += sizeof(int); + + *num_session_variables = nparams; + session_variables = (SessionVariableValue *) + palloc(nparams * sizeof(SessionVariableValue)); + + for (i = 0; i < nparams; i++) + { + SessionVariableValue *svarval = &session_variables[i]; + + /* Read type OID. */ + memcpy(&svarval->typid, *start_address, sizeof(Oid)); + *start_address += sizeof(Oid); + + /* Read datum/isnull. */ + svarval->value = datumRestore(start_address, &svarval->isnull); + } + + return session_variables; +} diff --git a/src/backend/optimizer/util/clauses.c b/src/backend/optimizer/util/clauses.c index f60828f862e..175390c6c28 100644 --- a/src/backend/optimizer/util/clauses.c +++ b/src/backend/optimizer/util/clauses.c @@ -923,25 +923,19 @@ max_parallel_hazard_walker(Node *node, max_parallel_hazard_context *context) /* * We can't pass Params to workers at the moment either, so they are also - * parallel-restricted, unless they are PARAM_EXTERN Params or are - * PARAM_EXEC Params listed in safe_param_ids, meaning they could be - * either generated within workers or can be computed by the leader and - * then their value can be passed to workers. + * parallel-restricted, unless they are PARAM_EXTERN or PARAM_VARIABLE + * Params or are PARAM_EXEC Params listed in safe_param_ids, meaning they + * could be either generated within workers or can be computed by the + * leader and then their value can be passed to workers. */ else if (IsA(node, Param)) { Param *param = (Param *) node; - if (param->paramkind == PARAM_EXTERN) + if (param->paramkind == PARAM_EXTERN || + param->paramkind == PARAM_VARIABLE) return false; - /* we don't support passing session variables to workers */ - if (param->paramkind == PARAM_VARIABLE) - { - if (max_parallel_hazard_test(PROPARALLEL_RESTRICTED, context)) - return true; - } - if (param->paramkind != PARAM_EXEC || !list_member_int(context->safe_param_ids, param->paramid)) { diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c index dea24453a6c..dce885190c2 100644 --- a/src/backend/tcop/pquery.c +++ b/src/backend/tcop/pquery.c @@ -92,6 +92,9 @@ CreateQueryDesc(PlannedStmt *plannedstmt, qd->queryEnv = queryEnv; qd->instrument_options = instrument_options; /* instrumentation wanted? */ + qd->num_session_variables = 0; + qd->session_variables = NULL; + /* null these fields until set by ExecutorStart */ qd->tupDesc = NULL; qd->estate = NULL; diff --git a/src/include/commands/session_variable.h b/src/include/commands/session_variable.h index 4492bee6d69..f66b6c0b864 100644 --- a/src/include/commands/session_variable.h +++ b/src/include/commands/session_variable.h @@ -29,6 +29,7 @@ extern void AtEOSubXact_SessionVariables(bool isCommit, SubTransactionId mySubid extern void SetSessionVariable(Oid varid, Datum value, bool isNull); extern Datum GetSessionVariable(Oid varid, bool *isNull); +extern Datum GetSessionVariableWithTypeid(Oid varid, bool *isNull, Oid *typid); extern void ExecuteLetStmt(ParseState *pstate, LetStmt *stmt, ParamListInfo params, QueryEnvironment *queryEnv, QueryCompletion *qc); diff --git a/src/include/executor/execdesc.h b/src/include/executor/execdesc.h index ba53305ad42..41eeb1c83c9 100644 --- a/src/include/executor/execdesc.h +++ b/src/include/executor/execdesc.h @@ -52,6 +52,10 @@ typedef struct QueryDesc /* This field is set by ExecutePlan */ bool already_executed; /* true if previously executed */ + /* reference to session variables buffer */ + int num_session_variables; + SessionVariableValue *session_variables; + /* This is always set NULL by the core system, but plugins can change it */ struct Instrumentation *totaltime; /* total time spent in ExecutorRun */ } QueryDesc; diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 1b8ad640922..5632f77aee7 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -642,6 +642,7 @@ typedef struct AsyncRequest */ typedef struct SessionVariableValue { + Oid typid; bool isnull; Datum value; } SessionVariableValue; diff --git a/src/test/regress/expected/session_variables.out b/src/test/regress/expected/session_variables.out index 0ed2d5fbb01..d0658116683 100644 --- a/src/test/regress/expected/session_variables.out +++ b/src/test/regress/expected/session_variables.out @@ -1284,12 +1284,14 @@ SELECT count(*) FROM svar_test WHERE a%10 = zero; -- parallel execution is not supported yet EXPLAIN (COSTS OFF) SELECT count(*) FROM svar_test WHERE a%10 = zero; - QUERY PLAN ------------------------------------ + QUERY PLAN +-------------------------------------------- Aggregate - -> Seq Scan on svar_test - Filter: ((a % 10) = zero) -(3 rows) + -> Gather + Workers Planned: 2 + -> Parallel Seq Scan on svar_test + Filter: ((a % 10) = zero) +(5 rows) LET zero = (SELECT count(*) FROM svar_test); -- result should be 1000 -- 2.48.1