From b2d74947ffc762061e2335c802510f57d1af9a82 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 21 Apr 2021 17:12:24 +0800 Subject: [PATCH 3/3] check-parallel-safety-in-fmgr --- src/backend/access/transam/xact.c | 26 + src/backend/executor/execExprInterp.c | 26 +- src/backend/executor/execMain.c | 3 + src/backend/optimizer/plan/planner.c | 18 +- src/backend/utils/fmgr/fmgr.c | 28 + src/include/access/xact.h | 1 + src/include/fmgr.h | 5 +- src/test/regress/expected/insert_parallel.out | 530 ++++++++++++++++++ src/test/regress/parallel_schedule | 1 + src/test/regress/serial_schedule | 1 + src/test/regress/sql/insert_parallel.sql | 337 +++++++++++ 11 files changed, 952 insertions(+), 24 deletions(-) create mode 100644 src/test/regress/expected/insert_parallel.out create mode 100644 src/test/regress/sql/insert_parallel.sql diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 441445927e..2d68e4633a 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1014,6 +1014,32 @@ IsInParallelMode(void) return CurrentTransactionState->parallelModeLevel != 0; } +/* + * PrepareParallelModePlanExec + * + * Prepare for entering parallel mode plan execution, based on command-type. + */ +void +PrepareParallelModePlanExec(CmdType commandType) +{ + if (IsModifySupportedInParallelMode(commandType)) + { + Assert(!IsInParallelMode()); + + /* + * Prepare for entering parallel mode by assigning a TransactionId. + * Failure to do this now would result in heap_insert() subsequently + * attempting to assign a TransactionId whilst in parallel-mode, which + * is not allowed. + * + * This approach has a disadvantage in that if the underlying SELECT + * does not return any rows, then the TransactionId is not used, + * however that shouldn't happen in practice in many cases. + */ + (void) GetCurrentTransactionId(); + } +} + /* * CommandCounterIncrement */ diff --git a/src/backend/executor/execExprInterp.c b/src/backend/executor/execExprInterp.c index 094e22d392..c9e43ada7a 100644 --- a/src/backend/executor/execExprInterp.c +++ b/src/backend/executor/execExprInterp.c @@ -717,7 +717,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) Datum d; fcinfo->isnull = false; - d = op->d.func.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *op->resvalue = d; *op->resnull = fcinfo->isnull; @@ -741,7 +741,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) } } fcinfo->isnull = false; - d = op->d.func.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *op->resvalue = d; *op->resnull = fcinfo->isnull; @@ -1223,7 +1223,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) Datum eqresult; fcinfo->isnull = false; - eqresult = op->d.func.fn_addr(fcinfo); + eqresult = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); /* Must invert result of "="; safe to do even if null */ *op->resvalue = BoolGetDatum(!DatumGetBool(eqresult)); *op->resnull = fcinfo->isnull; @@ -1252,7 +1252,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) Datum eqresult; fcinfo->isnull = false; - eqresult = op->d.func.fn_addr(fcinfo); + eqresult = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *op->resvalue = eqresult; *op->resnull = fcinfo->isnull; } @@ -1273,7 +1273,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) Datum result; fcinfo->isnull = false; - result = op->d.func.fn_addr(fcinfo); + result = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); /* if the arguments are equal return null */ if (!fcinfo->isnull && DatumGetBool(result)) @@ -1361,7 +1361,7 @@ ExecInterpExpr(ExprState *state, ExprContext *econtext, bool *isnull) /* Apply comparison function */ fcinfo->isnull = false; - d = op->d.rowcompare_step.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.rowcompare_step.fn_addr, fcinfo); *op->resvalue = d; /* force NULL result if NULL function result */ @@ -2152,7 +2152,7 @@ ExecJustApplyFuncToCase(ExprState *state, ExprContext *econtext, bool *isnull) } } fcinfo->isnull = false; - d = op->d.func.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *isnull = fcinfo->isnull; return d; } @@ -2347,7 +2347,7 @@ ExecEvalFuncExprFusage(ExprState *state, ExprEvalStep *op, pgstat_init_function_usage(fcinfo, &fcusage); fcinfo->isnull = false; - d = op->d.func.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *op->resvalue = d; *op->resnull = fcinfo->isnull; @@ -2381,7 +2381,7 @@ ExecEvalFuncExprStrictFusage(ExprState *state, ExprEvalStep *op, pgstat_init_function_usage(fcinfo, &fcusage); fcinfo->isnull = false; - d = op->d.func.fn_addr(fcinfo); + d = FuncExprCallInvoke(op->d.func.fn_addr, fcinfo); *op->resvalue = d; *op->resnull = fcinfo->isnull; @@ -3379,7 +3379,7 @@ ExecEvalScalarArrayOp(ExprState *state, ExprEvalStep *op) else { fcinfo->isnull = false; - thisresult = op->d.scalararrayop.fn_addr(fcinfo); + thisresult = FuncExprCallInvoke(op->d.scalararrayop.fn_addr, fcinfo); } /* Combine results per OR or AND semantics */ @@ -3436,7 +3436,7 @@ saop_element_hash(struct saophash_hash *tb, Datum key) fcinfo->args[0].value = key; fcinfo->args[0].isnull = false; - hash = elements_tab->op->d.hashedscalararrayop.hash_fn_addr(fcinfo); + hash = FuncExprCallInvoke(elements_tab->op->d.hashedscalararrayop.hash_fn_addr, fcinfo); return DatumGetUInt32(hash); } @@ -3458,7 +3458,7 @@ saop_hash_element_match(struct saophash_hash *tb, Datum key1, Datum key2) fcinfo->args[1].value = key2; fcinfo->args[1].isnull = false; - result = elements_tab->op->d.hashedscalararrayop.fn_addr(fcinfo); + result = FuncExprCallInvoke(elements_tab->op->d.hashedscalararrayop.fn_addr, fcinfo); return DatumGetBool(result); } @@ -3619,7 +3619,7 @@ ExecEvalHashedScalarArrayOp(ExprState *state, ExprEvalStep *op, ExprContext *eco fcinfo->args[1].value = (Datum) 0; fcinfo->args[1].isnull = true; - result = op->d.hashedscalararrayop.fn_addr(fcinfo); + result = FuncExprCallInvoke(op->d.hashedscalararrayop.fn_addr, fcinfo); resultnull = fcinfo->isnull; } } diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 2cf6dad768..3b339efbe0 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1535,7 +1535,10 @@ ExecutePlan(EState *estate, estate->es_use_parallel_mode = use_parallel_mode; if (use_parallel_mode) + { + PrepareParallelModePlanExec(estate->es_plannedstmt->commandType); EnterParallelMode(); + } /* * Loop until we've processed the proper number of tuples from the plan. diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index dbc2827d20..7736813230 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -314,16 +314,16 @@ standard_planner(Query *parse, const char *query_string, int cursorOptions, /* * Assess whether it's feasible to use parallel mode for this query. We * can't do this in a standalone backend, or if the command will try to - * modify any data, or if this is a cursor operation, or if GUCs are set - * to values that don't permit parallelism, or if parallel-unsafe - * functions are present in the query tree. + * modify any data (except for Insert), or if this is a cursor operation, + * or if GUCs are set to values that don't permit parallelism, or if + * parallel-unsafe functions are present in the query tree. * - * (Note that we do allow CREATE TABLE AS, SELECT INTO, and CREATE - * MATERIALIZED VIEW to use parallel plans, but as of now, only the leader - * backend writes into a completely new table. In the future, we can - * extend it to allow workers to write into the table. However, to allow - * parallel updates and deletes, we have to solve other problems, - * especially around combo CIDs.) + * (Note that we do allow CREATE TABLE AS, INSERT INTO...SELECT, SELECT + * INTO, and CREATE MATERIALIZED VIEW to use parallel plans. However, as + * of now, only the leader backend writes into a completely new table. In + * the future, we can extend it to allow workers to write into the table. + * However, to allow parallel updates and deletes, we have to solve other + * problems, especially around combo CIDs.) * * For now, we don't try to use parallel mode if we're running inside a * parallel worker. We might eventually be able to relax this diff --git a/src/backend/utils/fmgr/fmgr.c b/src/backend/utils/fmgr/fmgr.c index b6835c2c4c..1f95775011 100644 --- a/src/backend/utils/fmgr/fmgr.c +++ b/src/backend/utils/fmgr/fmgr.c @@ -16,6 +16,8 @@ #include "postgres.h" #include "access/detoast.h" +#include "access/parallel.h" +#include "access/xact.h" #include "catalog/pg_language.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" @@ -109,6 +111,30 @@ fmgr_lookupByName(const char *name) return NULL; } +/* + * Invoke a function given pointer to function or handler to be called + * and a filled-in FunctionCallInfoBaseData. + * + * Check function's parallel safety before invoking the funciton. + * If function are not allowed to be executed in parallel mode an error is raised. + */ +Datum +FuncExprCallInvoke(PGFunction fn_addr, FunctionCallInfo fcinfo) +{ + char parallel_safety = fcinfo->flinfo->fn_parallel; + + if (IsInParallelMode() && + ((IsParallelWorker() && + parallel_safety == PROPARALLEL_RESTRICTED) || + parallel_safety == PROPARALLEL_UNSAFE)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("parallel-safety execution violation of function \"%s\" (%c)", + get_func_name(fcinfo->flinfo->fn_oid), parallel_safety))); + + return fn_addr(fcinfo); +} + /* * This routine fills a FmgrInfo struct, given the OID * of the function to be called. @@ -174,6 +200,7 @@ fmgr_info_cxt_security(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt, finfo->fn_stats = TRACK_FUNC_ALL; /* ie, never track */ finfo->fn_addr = fbp->func; finfo->fn_oid = functionId; + finfo->fn_parallel = PROPARALLEL_SAFE; return; } @@ -186,6 +213,7 @@ fmgr_info_cxt_security(Oid functionId, FmgrInfo *finfo, MemoryContext mcxt, finfo->fn_nargs = procedureStruct->pronargs; finfo->fn_strict = procedureStruct->proisstrict; finfo->fn_retset = procedureStruct->proretset; + finfo->fn_parallel = procedureStruct->proparallel; /* * If it has prosecdef set, non-null proconfig, or if a plugin wants to diff --git a/src/include/access/xact.h b/src/include/access/xact.h index c04e6a98d7..34cfaf542c 100644 --- a/src/include/access/xact.h +++ b/src/include/access/xact.h @@ -466,6 +466,7 @@ extern void ParsePrepareRecord(uint8 info, xl_xact_prepare *xlrec, xl_xact_parse extern void EnterParallelMode(void); extern void ExitParallelMode(void); extern bool IsInParallelMode(void); +extern void PrepareParallelModePlanExec(CmdType commandType); /* * IsModifySupportedInParallelMode diff --git a/src/include/fmgr.h b/src/include/fmgr.h index ab7b85c86e..a747700818 100644 --- a/src/include/fmgr.h +++ b/src/include/fmgr.h @@ -64,6 +64,7 @@ typedef struct FmgrInfo void *fn_extra; /* extra space for use by handler */ MemoryContext fn_mcxt; /* memory context to store fn_extra in */ fmNodePtr fn_expr; /* expression parse tree for call, or NULL */ + char fn_parallel; /* parallel-safety: s/r/u */ } FmgrInfo; /* @@ -169,8 +170,8 @@ extern void fmgr_symbol(Oid functionId, char **mod, char **fn); * the fcinfo->isnull flag before each call, since callees are permitted to * assume that starts out false. */ -#define FunctionCallInvoke(fcinfo) ((* (fcinfo)->flinfo->fn_addr) (fcinfo)) - +#define FunctionCallInvoke(fcinfo) FuncExprCallInvoke((fcinfo)->flinfo->fn_addr, fcinfo) +Datum FuncExprCallInvoke(PGFunction fn_addr, FunctionCallInfo fcinfo); /*------------------------------------------------------------------------- * Support macros to ease writing fmgr-compatible functions diff --git a/src/test/regress/expected/insert_parallel.out b/src/test/regress/expected/insert_parallel.out new file mode 100644 index 0000000000..9b9b397bfe --- /dev/null +++ b/src/test/regress/expected/insert_parallel.out @@ -0,0 +1,530 @@ +-- +-- PARALLEL +-- +-- +-- START: setup some tables and data needed by the tests. +-- +-- Setup - index expressions test +create function pg_class_relname(Oid) +returns name language sql parallel unsafe +as 'select relname from pg_class where $1 = oid'; +-- For testing purposes, we'll mark this function as parallel-unsafe +create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel unsafe; +create or replace function fullname_parallel_restricted(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel restricted; +create table names(index int, first_name text, last_name text); +create table names2(index int, first_name text, last_name text); +create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name)); +create table names4(index int, first_name text, last_name text); +create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name)); +alter table names2 parallel safe; +alter table names4 parallel safe; +insert into names values + (1, 'albert', 'einstein'), + (2, 'niels', 'bohr'), + (3, 'erwin', 'schrodinger'), + (4, 'leonhard', 'euler'), + (5, 'stephen', 'hawking'), + (6, 'isaac', 'newton'), + (7, 'alan', 'turing'), + (8, 'richard', 'feynman'); +-- Setup - column default tests +create or replace function bdefault_unsafe () +returns int language plpgsql parallel unsafe as $$ +begin + RETURN 5; +end $$; +create or replace function cdefault_restricted () +returns int language plpgsql parallel restricted as $$ +begin + RETURN 10; +end $$; +create or replace function ddefault_safe () +returns int language plpgsql parallel safe as $$ +begin + RETURN 20; +end $$; +create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe()); +create table test_data(a int); +insert into test_data select * from generate_series(1,10); +alter table testdef parallel safe; +-- +-- END: setup some tables and data needed by the tests. +-- +-- encourage use of parallel plans +set parallel_setup_cost=0; +set parallel_tuple_cost=0; +set min_parallel_table_scan_size=0; +set max_parallel_workers_per_gather=4; +create table para_insert_p1 ( + unique1 int4 PRIMARY KEY, + stringu1 name +); +create table para_insert_f1 ( + unique1 int4 REFERENCES para_insert_p1(unique1), + stringu1 name +); +alter table para_insert_p1 parallel safe; +alter table para_insert_f1 parallel safe; +-- Check FK trigger +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('para_insert_f1'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | r + pg_trigger | r + pg_proc | r + pg_trigger | r +(4 rows) + +-- +-- Test INSERT with underlying query. +-- (should create plan with parallel SELECT, Gather parent node) +-- +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on para_insert_p1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into para_insert_p1 select unique1, stringu1 from tenk1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + count +------- + 1 +(1 row) + +-- +-- Test INSERT with ordered underlying query. +-- (should create plan with parallel SELECT, GatherMerge parent node) +-- +truncate para_insert_p1 cascade; +NOTICE: truncate cascades to table "para_insert_f1" +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; + QUERY PLAN +---------------------------------------------- + Insert on para_insert_p1 + -> Gather Merge + Workers Planned: 4 + -> Sort + Sort Key: tenk1.unique1 + -> Parallel Seq Scan on tenk1 +(6 rows) + +insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + count +------- + 1 +(1 row) + +-- +-- Test INSERT with RETURNING clause. +-- (should create plan with parallel SELECT, Gather parent node) +-- +create table test_data1(like test_data); +alter table test_data1 parallel safe; +explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data; + QUERY PLAN +-------------------------------------------- + Insert on test_data1 + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data + Filter: (a = 10) +(5 rows) + +insert into test_data1 select * from test_data where a = 10 returning a as data; + data +------ + 10 +(1 row) + +-- +-- Test INSERT into a table with a foreign key. +-- (Insert into a table with a foreign key is parallel-restricted, +-- as doing this in a parallel worker would create a new commandId +-- and within a worker this is not currently supported) +-- +explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on para_insert_f1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into para_insert_f1 select unique1, stringu1 from tenk1; +-- select some values to verify that the insert worked +select count(*), sum(unique1) from para_insert_f1; + count | sum +-------+---------- + 10000 | 49995000 +(1 row) + +-- +-- Test INSERT with ON CONFLICT ... DO UPDATE ... +-- (should not create a parallel plan) +-- +create table test_conflict_table(id serial primary key, somedata int); +alter table test_conflict_table parallel safe; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data; + QUERY PLAN +-------------------------------------------- + Insert on test_conflict_table + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data +(4 rows) + +insert into test_conflict_table(id, somedata) select a, a from test_data; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; + QUERY PLAN +------------------------------------------------------ + Insert on test_conflict_table + Conflict Resolution: UPDATE + Conflict Arbiter Indexes: test_conflict_table_pkey + -> Seq Scan on test_data +(4 rows) + +-- +-- Test INSERT with parallel-unsafe index expression +-- +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | u + pg_index | u +(2 rows) + +alter table names2 parallel safe; +insert into names2 select * from names returning *; +ERROR: parallel-safety execution violation of function "fullname_parallel_unsafe" (u) +-- +-- Test INSERT with parallel-restricted index expression +-- +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | r + pg_index | r +(2 rows) + +-- +-- Test INSERT with underlying query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names5 (like names); +explain (costs off) insert into names5 select * from names returning *; + QUERY PLAN +------------------------- + Insert on names5 + -> Seq Scan on names +(2 rows) + +-- +-- Test INSERT with underlying ordered query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names6 (like names); +alter table names6 parallel safe; +explain (costs off) insert into names6 select * from names order by last_name returning *; + QUERY PLAN +---------------------------------------------- + Insert on names6 + -> Gather Merge + Workers Planned: 3 + -> Sort + Sort Key: names.last_name + -> Parallel Seq Scan on names +(6 rows) + +insert into names6 select * from names order by last_name returning *; + index | first_name | last_name +-------+------------+------------- + 2 | niels | bohr + 1 | albert | einstein + 4 | leonhard | euler + 8 | richard | feynman + 5 | stephen | hawking + 6 | isaac | newton + 3 | erwin | schrodinger + 7 | alan | turing +(8 rows) + +-- +-- Test INSERT with underlying ordered query - and RETURNING (with projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names7 (like names); +alter table names7 parallel safe; +explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + QUERY PLAN +---------------------------------------------- + Insert on names7 + -> Gather Merge + Workers Planned: 3 + -> Sort + Sort Key: names.last_name + -> Parallel Seq Scan on names +(6 rows) + +insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + last_name_then_first_name +--------------------------- + bohr, niels + einstein, albert + euler, leonhard + feynman, richard + hawking, stephen + newton, isaac + schrodinger, erwin + turing, alan +(8 rows) + +-- +-- Test INSERT into temporary table with underlying query. +-- (Insert into a temp table is parallel-restricted; +-- should create a parallel plan; parallel SELECT) +-- +create temporary table temp_names (like names); +alter table temp_names parallel safe; +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('temp_names'); + pg_class_relname | proparallel +------------------+------------- + pg_class | r +(1 row) + +explain (costs off) insert into temp_names select * from names; + QUERY PLAN +---------------------------------------- + Insert on temp_names + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on names +(4 rows) + +insert into temp_names select * from names; +-- +-- Test INSERT with column defaults +-- +-- +-- +-- Parallel INSERT with unsafe column default, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data; + QUERY PLAN +----------------------------- + Insert on testdef + -> Seq Scan on test_data +(2 rows) + +-- +-- Parallel INSERT with restricted column default, should use parallel SELECT +-- +explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data; + QUERY PLAN +-------------------------------------------- + Insert on testdef + -> Gather + Workers Planned: 3 + -> Parallel Seq Scan on test_data +(4 rows) + +insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +select * from testdef order by a; + a | b | c | d +----+----+----+---- + 1 | 2 | 10 | 8 + 2 | 4 | 10 | 16 + 3 | 6 | 10 | 24 + 4 | 8 | 10 | 32 + 5 | 10 | 10 | 40 + 6 | 12 | 10 | 48 + 7 | 14 | 10 | 56 + 8 | 16 | 10 | 64 + 9 | 18 | 10 | 72 + 10 | 20 | 10 | 80 +(10 rows) + +truncate testdef; +-- +-- Parallel INSERT with restricted and unsafe column defaults, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,d) select a,a*8 from test_data; + QUERY PLAN +----------------------------- + Insert on testdef + -> Seq Scan on test_data +(2 rows) + +-- +-- Test INSERT into partition with underlying query. +-- +create table parttable1 (a int, b name) partition by range (a); +create table parttable1_1 partition of parttable1 for values from (0) to (5000); +create table parttable1_2 partition of parttable1 for values from (5000) to (10000); +alter table parttable1 parallel safe; +explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1; + QUERY PLAN +---------------------------------------- + Insert on parttable1 + -> Gather + Workers Planned: 4 + -> Parallel Seq Scan on tenk1 +(4 rows) + +insert into parttable1 select unique1,stringu1 from tenk1; +select count(*) from parttable1_1; + count +------- + 5000 +(1 row) + +select count(*) from parttable1_2; + count +------- + 5000 +(1 row) + +-- +-- Test table with parallel-unsafe check constraint +-- (should not create a parallel plan) +-- +create or replace function check_b_unsafe(b name) returns boolean as $$ + begin + return (b <> 'XXXXXX'); + end; +$$ language plpgsql parallel unsafe; +create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name); +alter table table_check_b parallel safe; +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('table_check_b'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | u + pg_constraint | u +(2 rows) + +insert into table_check_b select * from names; +ERROR: parallel-safety execution violation of function "check_b_unsafe" (u) +-- +-- Test table with parallel-safe after stmt-level triggers +-- (should create a parallel SELECT plan; triggers should fire) +-- +create table names_with_safe_trigger (like names); +alter table names_with_safe_trigger parallel safe; +create or replace function insert_after_trigger_safe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_safe'; + return new; + end; +$$ language plpgsql parallel safe; +create trigger insert_after_trigger_safe before insert on names_with_safe_trigger + for each statement execute procedure insert_after_trigger_safe(); +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_safe_trigger'); + pg_class_relname | proparallel +------------------+------------- +(0 rows) + +insert into names_with_safe_trigger select * from names; +NOTICE: hello from insert_after_trigger_safe +-- +-- Test table with parallel-unsafe after stmt-level triggers +-- (should not create a parallel plan; triggers should fire) +-- +create table names_with_unsafe_trigger (like names); +alter table names_with_unsafe_trigger parallel safe; +create or replace function insert_after_trigger_unsafe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_unsafe'; + return new; + end; +$$ language plpgsql parallel unsafe; +create trigger insert_after_trigger_unsafe before insert on names_with_unsafe_trigger + for each statement execute procedure insert_after_trigger_unsafe(); +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_unsafe_trigger'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | u + pg_trigger | u +(2 rows) + +insert into names_with_unsafe_trigger select * from names; +ERROR: parallel-safety execution violation of function "insert_after_trigger_unsafe" (u) +-- +-- Test partition with parallel-unsafe trigger +-- (should not create a parallel plan) +-- +create table part_unsafe_trigger (a int4, b name) partition by range (a); +alter table names_with_unsafe_trigger parallel safe; +create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000); +create trigger part_insert_after_trigger_unsafe before insert on part_unsafe_trigger_1 + for each statement execute procedure insert_after_trigger_unsafe(); +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('part_unsafe_trigger'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | u + pg_trigger | u +(2 rows) + +insert into names_with_unsafe_trigger select * from names; +ERROR: parallel-safety execution violation of function "insert_after_trigger_unsafe" (u) +-- +-- Test DOMAIN column with a CHECK constraint +-- +create function sql_is_distinct_from_u(anyelement, anyelement) +returns boolean language sql parallel unsafe +as 'select $1 is distinct from $2 limit 1'; +create domain inotnull_u int + check (sql_is_distinct_from_u(value, null)); +create table dom_table_u (x inotnull_u, y int); +-- Test DOMAIN column with parallel-unsafe CHECK constraint +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('dom_table_u'); + pg_class_relname | proparallel +------------------+------------- + pg_proc | u + pg_constraint | u +(2 rows) + +-- +-- Clean up anything not created in the transaction +-- +drop table names; +drop index names2_fullname_idx; +drop table names2; +drop index names4_fullname_idx; +drop table names4; +drop table testdef; +drop table test_data; +drop function bdefault_unsafe; +drop function cdefault_restricted; +drop function ddefault_safe; +drop function fullname_parallel_unsafe; +drop function fullname_parallel_restricted; diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule index a091300857..c6741a98aa 100644 --- a/src/test/regress/parallel_schedule +++ b/src/test/regress/parallel_schedule @@ -95,6 +95,7 @@ test: rules psql psql_crosstab amutils stats_ext collate.linux.utf8 # run by itself so it can run parallel workers test: select_parallel test: write_parallel +test: insert_parallel # no relation related tests can be put in this group test: publication subscription diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule index 5644847601..638b7a23d0 100644 --- a/src/test/regress/serial_schedule +++ b/src/test/regress/serial_schedule @@ -151,6 +151,7 @@ test: stats_ext test: collate.linux.utf8 test: select_parallel test: write_parallel +test: insert_parallel test: publication test: subscription test: select_views diff --git a/src/test/regress/sql/insert_parallel.sql b/src/test/regress/sql/insert_parallel.sql new file mode 100644 index 0000000000..b505a55caa --- /dev/null +++ b/src/test/regress/sql/insert_parallel.sql @@ -0,0 +1,337 @@ +-- +-- PARALLEL +-- + +-- +-- START: setup some tables and data needed by the tests. +-- + +-- Setup - index expressions test + +create function pg_class_relname(Oid) +returns name language sql parallel unsafe +as 'select relname from pg_class where $1 = oid'; + +-- For testing purposes, we'll mark this function as parallel-unsafe +create or replace function fullname_parallel_unsafe(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel unsafe; + +create or replace function fullname_parallel_restricted(f text, l text) returns text as $$ + begin + return f || l; + end; +$$ language plpgsql immutable parallel restricted; + +create table names(index int, first_name text, last_name text); +create table names2(index int, first_name text, last_name text); +create index names2_fullname_idx on names2 (fullname_parallel_unsafe(first_name, last_name)); +create table names4(index int, first_name text, last_name text); +create index names4_fullname_idx on names4 (fullname_parallel_restricted(first_name, last_name)); + +alter table names2 parallel safe; +alter table names4 parallel safe; + + +insert into names values + (1, 'albert', 'einstein'), + (2, 'niels', 'bohr'), + (3, 'erwin', 'schrodinger'), + (4, 'leonhard', 'euler'), + (5, 'stephen', 'hawking'), + (6, 'isaac', 'newton'), + (7, 'alan', 'turing'), + (8, 'richard', 'feynman'); + +-- Setup - column default tests + +create or replace function bdefault_unsafe () +returns int language plpgsql parallel unsafe as $$ +begin + RETURN 5; +end $$; + +create or replace function cdefault_restricted () +returns int language plpgsql parallel restricted as $$ +begin + RETURN 10; +end $$; + +create or replace function ddefault_safe () +returns int language plpgsql parallel safe as $$ +begin + RETURN 20; +end $$; + +create table testdef(a int, b int default bdefault_unsafe(), c int default cdefault_restricted(), d int default ddefault_safe()); +create table test_data(a int); +insert into test_data select * from generate_series(1,10); +alter table testdef parallel safe; + + +-- +-- END: setup some tables and data needed by the tests. +-- + +-- encourage use of parallel plans +set parallel_setup_cost=0; +set parallel_tuple_cost=0; +set min_parallel_table_scan_size=0; +set max_parallel_workers_per_gather=4; + +create table para_insert_p1 ( + unique1 int4 PRIMARY KEY, + stringu1 name +); + +create table para_insert_f1 ( + unique1 int4 REFERENCES para_insert_p1(unique1), + stringu1 name +); + +alter table para_insert_p1 parallel safe; +alter table para_insert_f1 parallel safe; + +-- Check FK trigger +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('para_insert_f1'); + +-- +-- Test INSERT with underlying query. +-- (should create plan with parallel SELECT, Gather parent node) +-- +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1; +insert into para_insert_p1 select unique1, stringu1 from tenk1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + +-- +-- Test INSERT with ordered underlying query. +-- (should create plan with parallel SELECT, GatherMerge parent node) +-- +truncate para_insert_p1 cascade; +explain (costs off) insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +insert into para_insert_p1 select unique1, stringu1 from tenk1 order by unique1; +-- select some values to verify that the parallel insert worked +select count(*), sum(unique1) from para_insert_p1; +-- verify that the same transaction has been used by all parallel workers +select count(*) from (select distinct cmin,xmin from para_insert_p1) as dt; + +-- +-- Test INSERT with RETURNING clause. +-- (should create plan with parallel SELECT, Gather parent node) +-- +create table test_data1(like test_data); +alter table test_data1 parallel safe; +explain (costs off) insert into test_data1 select * from test_data where a = 10 returning a as data; +insert into test_data1 select * from test_data where a = 10 returning a as data; + +-- +-- Test INSERT into a table with a foreign key. +-- (Insert into a table with a foreign key is parallel-restricted, +-- as doing this in a parallel worker would create a new commandId +-- and within a worker this is not currently supported) +-- +explain (costs off) insert into para_insert_f1 select unique1, stringu1 from tenk1; +insert into para_insert_f1 select unique1, stringu1 from tenk1; +-- select some values to verify that the insert worked +select count(*), sum(unique1) from para_insert_f1; + +-- +-- Test INSERT with ON CONFLICT ... DO UPDATE ... +-- (should not create a parallel plan) +-- +create table test_conflict_table(id serial primary key, somedata int); +alter table test_conflict_table parallel safe; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data; +insert into test_conflict_table(id, somedata) select a, a from test_data; +explain (costs off) insert into test_conflict_table(id, somedata) select a, a from test_data ON CONFLICT(id) DO UPDATE SET somedata = EXCLUDED.somedata + 1; + + +-- +-- Test INSERT with parallel-unsafe index expression +-- +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names2'); +alter table names2 parallel safe; +insert into names2 select * from names returning *; + +-- +-- Test INSERT with parallel-restricted index expression +-- +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names4'); + +-- +-- Test INSERT with underlying query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names5 (like names); +explain (costs off) insert into names5 select * from names returning *; + +-- +-- Test INSERT with underlying ordered query - and RETURNING (no projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names6 (like names); +alter table names6 parallel safe; +explain (costs off) insert into names6 select * from names order by last_name returning *; +insert into names6 select * from names order by last_name returning *; + +-- +-- Test INSERT with underlying ordered query - and RETURNING (with projection) +-- (should create a parallel plan; parallel SELECT) +-- +create table names7 (like names); +alter table names7 parallel safe; +explain (costs off) insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; +insert into names7 select * from names order by last_name returning last_name || ', ' || first_name as last_name_then_first_name; + + +-- +-- Test INSERT into temporary table with underlying query. +-- (Insert into a temp table is parallel-restricted; +-- should create a parallel plan; parallel SELECT) +-- +create temporary table temp_names (like names); +alter table temp_names parallel safe; +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('temp_names'); +explain (costs off) insert into temp_names select * from names; +insert into temp_names select * from names; + +-- +-- Test INSERT with column defaults +-- +-- + +-- +-- Parallel INSERT with unsafe column default, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,c,d) select a,a*4,a*8 from test_data; + +-- +-- Parallel INSERT with restricted column default, should use parallel SELECT +-- +explain (costs off) insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +insert into testdef(a,b,d) select a,a*2,a*8 from test_data; +select * from testdef order by a; +truncate testdef; + +-- +-- Parallel INSERT with restricted and unsafe column defaults, should not use a parallel plan +-- +explain (costs off) insert into testdef(a,d) select a,a*8 from test_data; + +-- +-- Test INSERT into partition with underlying query. +-- +create table parttable1 (a int, b name) partition by range (a); +create table parttable1_1 partition of parttable1 for values from (0) to (5000); +create table parttable1_2 partition of parttable1 for values from (5000) to (10000); + +alter table parttable1 parallel safe; + +explain (costs off) insert into parttable1 select unique1,stringu1 from tenk1; +insert into parttable1 select unique1,stringu1 from tenk1; +select count(*) from parttable1_1; +select count(*) from parttable1_2; + +-- +-- Test table with parallel-unsafe check constraint +-- (should not create a parallel plan) +-- +create or replace function check_b_unsafe(b name) returns boolean as $$ + begin + return (b <> 'XXXXXX'); + end; +$$ language plpgsql parallel unsafe; + +create table table_check_b(a int4, b name check (check_b_unsafe(b)), c name); +alter table table_check_b parallel safe; +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('table_check_b'); +insert into table_check_b select * from names; + +-- +-- Test table with parallel-safe after stmt-level triggers +-- (should create a parallel SELECT plan; triggers should fire) +-- +create table names_with_safe_trigger (like names); +alter table names_with_safe_trigger parallel safe; + +create or replace function insert_after_trigger_safe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_safe'; + return new; + end; +$$ language plpgsql parallel safe; +create trigger insert_after_trigger_safe before insert on names_with_safe_trigger + for each statement execute procedure insert_after_trigger_safe(); +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_safe_trigger'); +insert into names_with_safe_trigger select * from names; + +-- +-- Test table with parallel-unsafe after stmt-level triggers +-- (should not create a parallel plan; triggers should fire) +-- +create table names_with_unsafe_trigger (like names); +alter table names_with_unsafe_trigger parallel safe; +create or replace function insert_after_trigger_unsafe() returns trigger as $$ + begin + raise notice 'hello from insert_after_trigger_unsafe'; + return new; + end; +$$ language plpgsql parallel unsafe; +create trigger insert_after_trigger_unsafe before insert on names_with_unsafe_trigger + for each statement execute procedure insert_after_trigger_unsafe(); +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('names_with_unsafe_trigger'); +insert into names_with_unsafe_trigger select * from names; + +-- +-- Test partition with parallel-unsafe trigger +-- (should not create a parallel plan) +-- + +create table part_unsafe_trigger (a int4, b name) partition by range (a); +alter table names_with_unsafe_trigger parallel safe; +create table part_unsafe_trigger_1 partition of part_unsafe_trigger for values from (0) to (5000); +create table part_unsafe_trigger_2 partition of part_unsafe_trigger for values from (5000) to (10000); +create trigger part_insert_after_trigger_unsafe before insert on part_unsafe_trigger_1 + for each statement execute procedure insert_after_trigger_unsafe(); + +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('part_unsafe_trigger'); +insert into names_with_unsafe_trigger select * from names; + +-- +-- Test DOMAIN column with a CHECK constraint +-- +create function sql_is_distinct_from_u(anyelement, anyelement) +returns boolean language sql parallel unsafe +as 'select $1 is distinct from $2 limit 1'; + +create domain inotnull_u int + check (sql_is_distinct_from_u(value, null)); + +create table dom_table_u (x inotnull_u, y int); + + +-- Test DOMAIN column with parallel-unsafe CHECK constraint +select pg_class_relname(classid), proparallel from pg_get_parallel_safety('dom_table_u'); + +-- +-- Clean up anything not created in the transaction +-- + +drop table names; +drop index names2_fullname_idx; +drop table names2; +drop index names4_fullname_idx; +drop table names4; +drop table testdef; +drop table test_data; + +drop function bdefault_unsafe; +drop function cdefault_restricted; +drop function ddefault_safe; +drop function fullname_parallel_unsafe; +drop function fullname_parallel_restricted; -- 2.18.4