From 03ee268446bd31267ff4b373f7d9d59c6fa22d2a Mon Sep 17 00:00:00 2001 From: Vignesh C Date: Tue, 27 Oct 2020 06:25:36 +0530 Subject: [PATCH v9 4/5] Tests for parallel copy. This patch has the tests for parallel copy. --- contrib/postgres_fdw/expected/postgres_fdw.out | 49 ++++ contrib/postgres_fdw/sql/postgres_fdw.sql | 52 ++++ src/test/regress/expected/copy2.out | 326 +++++++++++++++++++++- src/test/regress/input/copy.source | 31 +++ src/test/regress/output/copy.source | 27 ++ src/test/regress/sql/copy2.sql | 368 ++++++++++++++++++++++++- 6 files changed, 845 insertions(+), 8 deletions(-) diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out index 2d88d06..474c5e7 100644 --- a/contrib/postgres_fdw/expected/postgres_fdw.out +++ b/contrib/postgres_fdw/expected/postgres_fdw.out @@ -9033,5 +9033,54 @@ SELECT 1 FROM ft1 LIMIT 1; -- should fail ERROR: 08006 \set VERBOSITY default COMMIT; +-- parallel copy related tests. +CREATE TABLE test_parallel_copy ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) ; +CREATE FOREIGN TABLE test_parallel_copy_ft ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) SERVER loopback OPTIONS (table_name 'test_parallel_copy') ; +-- parallel copy into foreign table, parallelism must not be picked up. +COPY test_parallel_copy_ft FROM stdin WITH (PARALLEL 1); +SELECT count(*) FROM test_parallel_copy_ft; + count +------- + 2 +(1 row) + +-- parallel copy into a table with foreign partition. +CREATE TABLE part_test_parallel_copy ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) PARTITION BY LIST (b); +CREATE FOREIGN TABLE part_test_parallel_copy_a1 (c TEXT, b INT, a INT, e TEXT, d TEXT) SERVER loopback; +CREATE TABLE part_test_parallel_copy_a2 (a INT, c TEXT, b INT, d TEXT, e TEXT); +ALTER TABLE part_test_parallel_copy ATTACH PARTITION part_test_parallel_copy_a1 FOR VALUES IN(1); +ALTER TABLE part_test_parallel_copy ATTACH PARTITION part_test_parallel_copy_a2 FOR VALUES IN(2); +COPY part_test_parallel_copy FROM stdin WITH (PARALLEL 1); +ERROR: cannot perform PARALLEL COPY if partition has BEFORE/INSTEAD OF triggers, or if the partition is foreign partition +HINT: Try COPY without PARALLEL option +CONTEXT: COPY part_test_parallel_copy, line 1: "1 1 test_c1 test_d1 test_e1" +parallel worker +SELECT count(*) FROM part_test_parallel_copy WHERE b = 2; + count +------- + 0 +(1 row) + -- Clean up DROP PROCEDURE terminate_backend_and_wait(text); +DROP FOREIGN TABLE test_parallel_copy_ft; +DROP TABLE test_parallel_copy; +DROP TABLE part_test_parallel_copy; diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql index 7581c54..635fcc2 100644 --- a/contrib/postgres_fdw/sql/postgres_fdw.sql +++ b/contrib/postgres_fdw/sql/postgres_fdw.sql @@ -2695,5 +2695,57 @@ SELECT 1 FROM ft1 LIMIT 1; -- should fail \set VERBOSITY default COMMIT; +-- parallel copy related tests. +CREATE TABLE test_parallel_copy ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) ; + +CREATE FOREIGN TABLE test_parallel_copy_ft ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) SERVER loopback OPTIONS (table_name 'test_parallel_copy') ; + +-- parallel copy into foreign table, parallelism must not be picked up. +COPY test_parallel_copy_ft FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +SELECT count(*) FROM test_parallel_copy_ft; + +-- parallel copy into a table with foreign partition. +CREATE TABLE part_test_parallel_copy ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) PARTITION BY LIST (b); + +CREATE FOREIGN TABLE part_test_parallel_copy_a1 (c TEXT, b INT, a INT, e TEXT, d TEXT) SERVER loopback; + +CREATE TABLE part_test_parallel_copy_a2 (a INT, c TEXT, b INT, d TEXT, e TEXT); + +ALTER TABLE part_test_parallel_copy ATTACH PARTITION part_test_parallel_copy_a1 FOR VALUES IN(1); + +ALTER TABLE part_test_parallel_copy ATTACH PARTITION part_test_parallel_copy_a2 FOR VALUES IN(2); + +COPY part_test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +SELECT count(*) FROM part_test_parallel_copy WHERE b = 2; + -- Clean up DROP PROCEDURE terminate_backend_and_wait(text); +DROP FOREIGN TABLE test_parallel_copy_ft; +DROP TABLE test_parallel_copy; +DROP TABLE part_test_parallel_copy; diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c64f071..6ec4dea 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -301,18 +301,32 @@ It is "perfect".| "It is ""perfect""."," " "", --test that we read consecutive LFs properly -CREATE TEMP TABLE testnl (a int, b text, c int); +CREATE TABLE testnl (a int, b text, c int); COPY testnl FROM stdin CSV; +COPY testnl FROM stdin WITH (FORMAT 'csv', PARALLEL 1); +SELECT * FROM testnl; + a | b | c +---+----------------------+--- + 1 | a field with two LFs+| 2 + | +| + | inside | + 1 | a field with two LFs+| 2 + | +| + | inside | +(2 rows) + -- test end of copy marker -CREATE TEMP TABLE testeoc (a text); +CREATE TABLE testeoc (a text); COPY testeoc FROM stdin CSV; +TRUNCATE testeoc; +COPY testeoc FROM stdin WITH (FORMAT 'csv', PARALLEL 1); COPY testeoc TO stdout CSV; a\. \.b c\.d "\." -- test handling of nonstandard null marker that violates escaping rules -CREATE TEMP TABLE testnull(a int, b text); +CREATE TABLE testnull(a int, b text); INSERT INTO testnull VALUES (1, E'\\0'), (NULL, NULL); COPY testnull TO stdout WITH NULL AS E'\\0'; 1 \\0 @@ -327,6 +341,15 @@ SELECT * FROM testnull; | (4 rows) +TRUNCATE testnull; +COPY testnull FROM stdin WITH (NULL E'\\0', PARALLEL 1); +SELECT * FROM testnull; + a | b +----+---- + 42 | \0 + | +(2 rows) + BEGIN; CREATE TABLE vistest (LIKE testeoc); COPY vistest FROM stdin CSV; @@ -396,6 +419,34 @@ SELECT * FROM vistest; BEGIN; TRUNCATE vistest; +COPY vistest FROM stdin WITH (FORMAT 'csv', FREEZE, PARALLEL 1); +SELECT * FROM vistest; + a +---- + a2 + b +(2 rows) + +SAVEPOINT s1; +TRUNCATE vistest; +COPY vistest FROM stdin WITH (FORMAT 'csv', FREEZE, PARALLEL 1); +SELECT * FROM vistest; + a +---- + d2 + e +(2 rows) + +COMMIT; +SELECT * FROM vistest; + a +---- + d2 + e +(2 rows) + +BEGIN; +TRUNCATE vistest; COPY vistest FROM stdin CSV FREEZE; SELECT * FROM vistest; a @@ -456,7 +507,7 @@ SELECT * FROM vistest; (2 rows) -- Test FORCE_NOT_NULL and FORCE_NULL options -CREATE TEMP TABLE forcetest ( +CREATE TABLE forcetest ( a INT NOT NULL, b TEXT NOT NULL, c TEXT, @@ -467,6 +518,8 @@ CREATE TEMP TABLE forcetest ( -- should succeed with no effect ("b" remains an empty string, "c" remains NULL) BEGIN; COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +TRUNCATE forcetest; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c), PARALLEL 1); COMMIT; SELECT b, c FROM forcetest WHERE a = 1; b | c @@ -477,6 +530,8 @@ SELECT b, c FROM forcetest WHERE a = 1; -- should succeed, FORCE_NULL and FORCE_NOT_NULL can be both specified BEGIN; COPY forcetest (a, b, c, d) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(c,d), FORCE_NULL(c,d)); +TRUNCATE forcetest; +COPY forcetest (a, b, c, d) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(c,d), FORCE_NULL(c,d), PARALLEL 1); COMMIT; SELECT c, d FROM forcetest WHERE a = 2; c | d @@ -533,6 +588,31 @@ select * from check_con_tbl; (2 rows) +\d+ check_con_tbl + Table "public.check_con_tbl" + Column | Type | Collation | Nullable | Default | Storage | Stats target | Description +--------+---------+-----------+----------+---------+---------+--------------+------------- + f1 | integer | | | | plain | | +Check constraints: + "check_con_tbl_check" CHECK (check_con_function(check_con_tbl.*)) + +truncate check_con_tbl; +copy check_con_tbl from stdin with (parallel 1); +NOTICE: input = {"f1":1} +NOTICE: input = {"f1":null} +copy check_con_tbl from stdin with (parallel 1); +NOTICE: input = {"f1":0} +ERROR: new row for relation "check_con_tbl" violates check constraint "check_con_tbl_check" +DETAIL: Failing row contains (0). +CONTEXT: COPY check_con_tbl, line 1: "0" +parallel worker +select * from check_con_tbl; + f1 +---- + 1 + +(2 rows) + -- test with RLS enabled. CREATE ROLE regress_rls_copy_user; CREATE ROLE regress_rls_copy_user_colperms; @@ -647,10 +727,248 @@ SELECT * FROM instead_of_insert_tbl; (2 rows) COMMIT; +-- Parallel copy tests. +CREATE TABLE test_parallel_copy ( + a INT, + b INT, + c TEXT not null default 'stuff', + d TEXT, + e TEXT +) ; +COPY test_parallel_copy (a, b, c, d, e) FROM stdin WITH (PARALLEL 1); +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1); +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1); +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1, FORMAT 'csv', HEADER); +-- zero workers: should perform non-parallel copy +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 0); +ERROR: value 0 out of bounds for option "parallel" +DETAIL: Valid values are between "1" and "1024". +-- referencing table: should perform non-parallel copy +CREATE TABLE test_copy_pk(c1 INT PRIMARY KEY); +INSERT INTO test_copy_pk VALUES(10); +CREATE TABLE test_copy_ri(c1 INT REFERENCES test_copy_pk(c1)); +COPY test_copy_ri FROM stdin WITH (FORMAT csv, DELIMITER ',', PARALLEL 1); +-- expressions: should perform non-parallel copy +CREATE TABLE test_copy_expr (index INT, height REAL, weight REAL); +COPY test_copy_expr FROM STDIN WITH (FORMAT csv, DELIMITER ',', PARALLEL 1) WHERE height > random() * 65; +-- serial data: should perform non-parallel copy +CREATE TABLE testserial (index SERIAL, height REAL); +COPY testserial(height) FROM STDIN WITH (FORMAT csv, DELIMITER ',', PARALLEL 1); +-- temporary table copy: should perform non-parallel copy +CREATE TEMPORARY TABLE temp_test( + a int +) ; +COPY temp_test (a) FROM stdin WITH (PARALLEL 1); +-- non-existent column in column list: should fail +COPY test_parallel_copy (xyz) FROM stdin WITH (PARALLEL 1); +ERROR: column "xyz" of relation "test_parallel_copy" does not exist +-- too many columns in column list: should fail +COPY test_parallel_copy (a, b, c, d, e, d, c) FROM stdin WITH (PARALLEL 1); +ERROR: column "d" specified more than once +-- missing data: should fail +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +ERROR: invalid input syntax for type integer: "" +CONTEXT: COPY test_parallel_copy, line 0, column a: "" +parallel worker +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +ERROR: missing data for column "e" +CONTEXT: COPY test_parallel_copy, line 1: "2000 230 23 23" +parallel worker +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +ERROR: missing data for column "e" +CONTEXT: COPY test_parallel_copy, line 1: "2001 231 \N \N" +parallel worker +-- extra data: should fail +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +ERROR: extra data after last expected column +CONTEXT: COPY test_parallel_copy, line 1: "2002 232 40 50 60 70 80" +parallel worker +-- various COPY options: delimiters, oids, NULL string, encoding +COPY test_parallel_copy (b, c, d, e) FROM stdin WITH (DELIMITER ',', null 'x', PARALLEL 1) ; +COPY test_parallel_copy FROM stdin WITH (DELIMITER ';', NULL '', PARALLEL 1); +COPY test_parallel_copy FROM stdin WITH (DELIMITER ':', NULL E'\\X', ENCODING 'sql_ascii', PARALLEL 1); +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a = 50004; +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a > 60003; +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE f > 60003; +ERROR: column "f" does not exist +LINE 1: ..._parallel_copy FROM stdin WITH (PARALLEL 1) WHERE f > 60003; + ^ +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a = max(x.b); +ERROR: missing FROM-clause entry for table "x" +LINE 1: ...rallel_copy FROM stdin WITH (PARALLEL 1) WHERE a = max(x.b); + ^ +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (SELECT 1 FROM x); +ERROR: cannot use subquery in COPY FROM WHERE condition +LINE 1: ...arallel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (SELECT... + ^ +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (generate_series(1,5)); +ERROR: set-returning functions are not allowed in COPY FROM WHERE conditions +LINE 1: ...lel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (generate_s... + ^ +COPY test_parallel_copy FROM stdin WITH(PARALLEL 1) WHERE a = row_number() over(b); +ERROR: window functions are not allowed in COPY FROM WHERE conditions +LINE 1: ...rallel_copy FROM stdin WITH(PARALLEL 1) WHERE a = row_number... + ^ +-- check results of copy in +SELECT * FROM test_parallel_copy ORDER BY 1; + a | b | c | d | e +-------+----+------------+---------+--------- + 1 | 11 | test_c1 | test_d1 | test_e1 + 2 | 12 | test_c2 | test_d2 | test_e2 + 3000 | | c | | + 4000 | | C | | + 4001 | 1 | empty | | + 4002 | 2 | null | | + 4003 | 3 | Backslash | \ | \ + 4004 | 4 | BackslashX | \X | \X + 4005 | 5 | N | N | N + 4006 | 6 | BackslashN | \N | \N + 4007 | 7 | XX | XX | XX + 4008 | 8 | Delimiter | : | : + 50004 | 25 | 35 | 45 | 55 + 60004 | 25 | 35 | 45 | 55 + 60005 | 26 | 36 | 46 | 56 + | 3 | stuff | test_d3 | + | 4 | stuff | test_d4 | + | 5 | stuff | test_d5 | + | | , | \, | \ + | | x | \x | \x + | | 45 | 80 | 90 +(21 rows) + +-- parallel copy test for unlogged tables. should execute in parallel worker +CREATE UNLOGGED TABLE test_parallel_copy_unlogged( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +); +COPY test_parallel_copy_unlogged FROM stdin WITH (PARALLEL 1); +SELECT count(*) FROM test_parallel_copy_unlogged; + count +------- + 2 +(1 row) + +-- parallel copy test for various trigger types +TRUNCATE test_parallel_copy; +-- parallel safe trigger function +CREATE OR REPLACE FUNCTION parallel_copy_trig_func() RETURNS TRIGGER +LANGUAGE plpgsql PARALLEL SAFE AS $$ +BEGIN + RETURN NEW; +END; +$$; +-- before insert row trigger +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- after insert row trigger +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- before insert statement trigger +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- parallelism should be picked, since the trigger function is parallel safe +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- after insert statement trigger +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- transition table is involved +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy REFERENCING NEW TABLE AS new_table +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- make trigger function parallel unsafe +ALTER FUNCTION parallel_copy_trig_func PARALLEL UNSAFE; +-- before statement trigger has a parallel unsafe function +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +SELECT count(*) FROM test_parallel_copy; + count +------- + 12 +(1 row) + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; +-- instead of insert trigger, no parallelism should be picked +COPY instead_of_insert_tbl_view FROM stdin WITH(PARALLEL 1);; +SELECT count(*) FROM instead_of_insert_tbl_view; + count +------- + 1 +(1 row) + +-- parallel copy test for a partitioned table with a before insert trigger on +-- one of the partition +ALTER FUNCTION parallel_copy_trig_func PARALLEL SAFE; +CREATE TABLE test_parallel_copy_part ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) PARTITION BY LIST (b); +CREATE TABLE test_parallel_copy_part_a1 (c TEXT, b INT, a INT, e TEXT, d TEXT); +CREATE TABLE test_parallel_copy_part_a2 (a INT, c TEXT, b INT, d TEXT, e TEXT); +ALTER TABLE test_parallel_copy_part ATTACH PARTITION test_parallel_copy_part_a1 FOR VALUES IN(1); +ALTER TABLE test_parallel_copy_part ATTACH PARTITION test_parallel_copy_part_a2 FOR VALUES IN(2); +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy_part_a2 +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); +COPY test_parallel_copy_part FROM stdin WITH (PARALLEL 1); +ERROR: cannot perform PARALLEL COPY if partition has BEFORE/INSTEAD OF triggers, or if the partition is foreign partition +HINT: Try COPY without PARALLEL option +CONTEXT: COPY test_parallel_copy_part, line 2: "2 2 test_c2 test_d2 test_e2" +parallel worker +SELECT count(*) FROM test_parallel_copy_part; + count +------- + 0 +(1 row) + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy_part_a2; -- clean up +DROP TABLE test_copy_ri; +DROP TABLE test_copy_pk; +DROP TABLE test_copy_expr; +DROP TABLE testeoc; +DROP TABLE testnl; DROP TABLE forcetest; +DROP TABLE test_parallel_copy; +DROP TABLE test_parallel_copy_unlogged; +DROP TABLE test_parallel_copy_part; +DROP TABLE testserial; +DROP TABLE testnull; DROP TABLE vistest; DROP FUNCTION truncate_in_subxact(); +DROP FUNCTION parallel_copy_trig_func(); DROP TABLE x, y; DROP TABLE rls_t1 CASCADE; DROP ROLE regress_rls_copy_user; diff --git a/src/test/regress/input/copy.source b/src/test/regress/input/copy.source index a1d529a..cb39c66 100644 --- a/src/test/regress/input/copy.source +++ b/src/test/regress/input/copy.source @@ -15,6 +15,13 @@ DELETE FROM onek; COPY onek FROM '@abs_builddir@/results/onek.data'; +-- test parallel copy +COPY tenk1 FROM '@abs_srcdir@/data/tenk.data' WITH (parallel 2); + +SELECT COUNT(*) FROM tenk1; + +TRUNCATE tenk1; + COPY tenk1 FROM '@abs_srcdir@/data/tenk.data'; COPY slow_emp4000 FROM '@abs_srcdir@/data/rect.data'; @@ -159,6 +166,30 @@ truncate parted_copytest; copy parted_copytest from '@abs_builddir@/results/parted_copytest.csv'; +-- Test parallel copy from with a partitioned table. +truncate parted_copytest; + +copy parted_copytest from '@abs_builddir@/results/parted_copytest.csv' with (parallel 2); + +-- Test parallel copy from for toast table with csv and binary format file +create table test_parallel_copy_toast(a1 int, b1 text, c1 text default null); + +insert into test_parallel_copy_toast select i, (repeat(md5(i::text), 100000)) FROM generate_series(1,2) AS i; + +copy test_parallel_copy_toast to '@abs_builddir@/results/parallelcopytoast.csv' with(format csv); + +copy test_parallel_copy_toast to '@abs_builddir@/results/parallelcopytoast.dat' with(format binary); + +truncate test_parallel_copy_toast; + +copy test_parallel_copy_toast from '@abs_builddir@/results/parallelcopytoast.csv' with(format csv, parallel 2); + +copy test_parallel_copy_toast from '@abs_builddir@/results/parallelcopytoast.dat' with(format binary, parallel 2); + +select count(*) from test_parallel_copy_toast; + +drop table test_parallel_copy_toast; + -- Ensure COPY FREEZE errors for partitioned tables. begin; truncate parted_copytest; diff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source index 938d355..a5dca79 100644 --- a/src/test/regress/output/copy.source +++ b/src/test/regress/output/copy.source @@ -9,6 +9,15 @@ COPY onek FROM '@abs_srcdir@/data/onek.data'; COPY onek TO '@abs_builddir@/results/onek.data'; DELETE FROM onek; COPY onek FROM '@abs_builddir@/results/onek.data'; +-- test parallel copy +COPY tenk1 FROM '@abs_srcdir@/data/tenk.data' WITH (parallel 2); +SELECT COUNT(*) FROM tenk1; + count +------- + 10000 +(1 row) + +TRUNCATE tenk1; COPY tenk1 FROM '@abs_srcdir@/data/tenk.data'; COPY slow_emp4000 FROM '@abs_srcdir@/data/rect.data'; COPY person FROM '@abs_srcdir@/data/person.data'; @@ -113,6 +122,24 @@ insert into parted_copytest select x,1,'One' from generate_series(1011,1020) x; copy (select * from parted_copytest order by a) to '@abs_builddir@/results/parted_copytest.csv'; truncate parted_copytest; copy parted_copytest from '@abs_builddir@/results/parted_copytest.csv'; +-- Test parallel copy from with a partitioned table. +truncate parted_copytest; +copy parted_copytest from '@abs_builddir@/results/parted_copytest.csv' with (parallel 2); +-- Test parallel copy from for toast table with csv and binary format file +create table test_parallel_copy_toast(a1 int, b1 text, c1 text default null); +insert into test_parallel_copy_toast select i, (repeat(md5(i::text), 100000)) FROM generate_series(1,2) AS i; +copy test_parallel_copy_toast to '@abs_builddir@/results/parallelcopytoast.csv' with(format csv); +copy test_parallel_copy_toast to '@abs_builddir@/results/parallelcopytoast.dat' with(format binary); +truncate test_parallel_copy_toast; +copy test_parallel_copy_toast from '@abs_builddir@/results/parallelcopytoast.csv' with(format csv, parallel 2); +copy test_parallel_copy_toast from '@abs_builddir@/results/parallelcopytoast.dat' with(format binary, parallel 2); +select count(*) from test_parallel_copy_toast; + count +------- + 4 +(1 row) + +drop table test_parallel_copy_toast; -- Ensure COPY FREEZE errors for partitioned tables. begin; truncate parted_copytest; diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index b3c16af..f5c6bec 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -171,7 +171,7 @@ COPY y TO stdout (FORMAT CSV, FORCE_QUOTE *); --test that we read consecutive LFs properly -CREATE TEMP TABLE testnl (a int, b text, c int); +CREATE TABLE testnl (a int, b text, c int); COPY testnl FROM stdin CSV; 1,"a field with two LFs @@ -179,8 +179,16 @@ COPY testnl FROM stdin CSV; inside",2 \. +COPY testnl FROM stdin WITH (FORMAT 'csv', PARALLEL 1); +1,"a field with two LFs + +inside",2 +\. + +SELECT * FROM testnl; + -- test end of copy marker -CREATE TEMP TABLE testeoc (a text); +CREATE TABLE testeoc (a text); COPY testeoc FROM stdin CSV; a\. @@ -189,11 +197,19 @@ c\.d "\." \. +TRUNCATE testeoc; +COPY testeoc FROM stdin WITH (FORMAT 'csv', PARALLEL 1); +a\. +\.b +c\.d +"\." +\. + COPY testeoc TO stdout CSV; -- test handling of nonstandard null marker that violates escaping rules -CREATE TEMP TABLE testnull(a int, b text); +CREATE TABLE testnull(a int, b text); INSERT INTO testnull VALUES (1, E'\\0'), (NULL, NULL); COPY testnull TO stdout WITH NULL AS E'\\0'; @@ -205,6 +221,14 @@ COPY testnull FROM stdin WITH NULL AS E'\\0'; SELECT * FROM testnull; +TRUNCATE testnull; +COPY testnull FROM stdin WITH (NULL E'\\0', PARALLEL 1); +42 \\0 +\0 \0 +\. + +SELECT * FROM testnull; + BEGIN; CREATE TABLE vistest (LIKE testeoc); COPY vistest FROM stdin CSV; @@ -249,6 +273,23 @@ SELECT * FROM vistest; BEGIN; TRUNCATE vistest; +COPY vistest FROM stdin WITH (FORMAT 'csv', FREEZE, PARALLEL 1); +a2 +b +\. +SELECT * FROM vistest; +SAVEPOINT s1; +TRUNCATE vistest; +COPY vistest FROM stdin WITH (FORMAT 'csv', FREEZE, PARALLEL 1); +d2 +e +\. +SELECT * FROM vistest; +COMMIT; +SELECT * FROM vistest; + +BEGIN; +TRUNCATE vistest; COPY vistest FROM stdin CSV FREEZE; x y @@ -298,7 +339,7 @@ SELECT * FROM vistest; COMMIT; SELECT * FROM vistest; -- Test FORCE_NOT_NULL and FORCE_NULL options -CREATE TEMP TABLE forcetest ( +CREATE TABLE forcetest ( a INT NOT NULL, b TEXT NOT NULL, c TEXT, @@ -311,6 +352,10 @@ BEGIN; COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); 1,,"" \. +TRUNCATE forcetest; +COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c), PARALLEL 1); +1,,"" +\. COMMIT; SELECT b, c FROM forcetest WHERE a = 1; -- should succeed, FORCE_NULL and FORCE_NOT_NULL can be both specified @@ -318,6 +363,10 @@ BEGIN; COPY forcetest (a, b, c, d) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(c,d), FORCE_NULL(c,d)); 2,'a',,"" \. +TRUNCATE forcetest; +COPY forcetest (a, b, c, d) FROM STDIN WITH (FORMAT csv, FORCE_NOT_NULL(c,d), FORCE_NULL(c,d), PARALLEL 1); +2,'a',,"" +\. COMMIT; SELECT c, d FROM forcetest WHERE a = 2; -- should fail with not-null constraint violation @@ -353,6 +402,16 @@ copy check_con_tbl from stdin; 0 \. select * from check_con_tbl; +\d+ check_con_tbl +truncate check_con_tbl; +copy check_con_tbl from stdin with (parallel 1); +1 +\N +\. +copy check_con_tbl from stdin with (parallel 1); +0 +\. +select * from check_con_tbl; -- test with RLS enabled. CREATE ROLE regress_rls_copy_user; @@ -454,10 +513,311 @@ test1 SELECT * FROM instead_of_insert_tbl; COMMIT; +-- Parallel copy tests. +CREATE TABLE test_parallel_copy ( + a INT, + b INT, + c TEXT not null default 'stuff', + d TEXT, + e TEXT +) ; + +COPY test_parallel_copy (a, b, c, d, e) FROM stdin WITH (PARALLEL 1); +1 11 test_c1 test_d1 test_e1 +2 12 test_c2 test_d2 test_e2 +\. + +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1); +3 test_d3 +\. + +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1); +4 test_d4 +5 test_d5 +\. + +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 1, FORMAT 'csv', HEADER); +b d +\. + +-- zero workers: should perform non-parallel copy +COPY test_parallel_copy (b, d) FROM stdin WITH (PARALLEL 0); + +-- referencing table: should perform non-parallel copy +CREATE TABLE test_copy_pk(c1 INT PRIMARY KEY); +INSERT INTO test_copy_pk VALUES(10); +CREATE TABLE test_copy_ri(c1 INT REFERENCES test_copy_pk(c1)); +COPY test_copy_ri FROM stdin WITH (FORMAT csv, DELIMITER ',', PARALLEL 1); +10 +\. + +-- expressions: should perform non-parallel copy +CREATE TABLE test_copy_expr (index INT, height REAL, weight REAL); + +COPY test_copy_expr FROM STDIN WITH (FORMAT csv, DELIMITER ',', PARALLEL 1) WHERE height > random() * 65; +60,60,60 +\. + +-- serial data: should perform non-parallel copy +CREATE TABLE testserial (index SERIAL, height REAL); + +COPY testserial(height) FROM STDIN WITH (FORMAT csv, DELIMITER ',', PARALLEL 1); +60 +\. + +-- temporary table copy: should perform non-parallel copy +CREATE TEMPORARY TABLE temp_test( + a int +) ; + +COPY temp_test (a) FROM stdin WITH (PARALLEL 1); +10 +\. + +-- non-existent column in column list: should fail +COPY test_parallel_copy (xyz) FROM stdin WITH (PARALLEL 1); + +-- too many columns in column list: should fail +COPY test_parallel_copy (a, b, c, d, e, d, c) FROM stdin WITH (PARALLEL 1); + +-- missing data: should fail +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); + +\. +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +2000 230 23 23 +\. +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +2001 231 \N \N +\. + +-- extra data: should fail +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +2002 232 40 50 60 70 80 +\. + +-- various COPY options: delimiters, oids, NULL string, encoding +COPY test_parallel_copy (b, c, d, e) FROM stdin WITH (DELIMITER ',', null 'x', PARALLEL 1) ; +x,45,80,90 +x,\x,\\x,\\\x +x,\,,\\\,,\\ +\. + +COPY test_parallel_copy FROM stdin WITH (DELIMITER ';', NULL '', PARALLEL 1); +3000;;c;; +\. + +COPY test_parallel_copy FROM stdin WITH (DELIMITER ':', NULL E'\\X', ENCODING 'sql_ascii', PARALLEL 1); +4000:\X:C:\X:\X +4001:1:empty:: +4002:2:null:\X:\X +4003:3:Backslash:\\:\\ +4004:4:BackslashX:\\X:\\X +4005:5:N:\N:\N +4006:6:BackslashN:\\N:\\N +4007:7:XX:\XX:\XX +4008:8:Delimiter:\::\: +\. + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a = 50004; +50003 24 34 44 54 +50004 25 35 45 55 +50005 26 36 46 56 +\. + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a > 60003; +60001 22 32 42 52 +60002 23 33 43 53 +60003 24 34 44 54 +60004 25 35 45 55 +60005 26 36 46 56 +\. + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE f > 60003; + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a = max(x.b); + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (SELECT 1 FROM x); + +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1) WHERE a IN (generate_series(1,5)); + +COPY test_parallel_copy FROM stdin WITH(PARALLEL 1) WHERE a = row_number() over(b); + +-- check results of copy in +SELECT * FROM test_parallel_copy ORDER BY 1; + +-- parallel copy test for unlogged tables. should execute in parallel worker +CREATE UNLOGGED TABLE test_parallel_copy_unlogged( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +); + +COPY test_parallel_copy_unlogged FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +SELECT count(*) FROM test_parallel_copy_unlogged; + +-- parallel copy test for various trigger types +TRUNCATE test_parallel_copy; + +-- parallel safe trigger function +CREATE OR REPLACE FUNCTION parallel_copy_trig_func() RETURNS TRIGGER +LANGUAGE plpgsql PARALLEL SAFE AS $$ +BEGIN + RETURN NEW; +END; +$$; + +-- before insert row trigger +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- after insert row trigger +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- before insert statement trigger +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- parallelism should be picked, since the trigger function is parallel safe +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- after insert statement trigger +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- transition table is involved +CREATE TRIGGER parallel_copy_trig +AFTER INSERT ON test_parallel_copy REFERENCING NEW TABLE AS new_table +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- make trigger function parallel unsafe +ALTER FUNCTION parallel_copy_trig_func PARALLEL UNSAFE; + +-- before statement trigger has a parallel unsafe function +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy +FOR EACH STATEMENT +EXECUTE PROCEDURE parallel_copy_trig_func(); + +-- no parallelism should be picked +COPY test_parallel_copy FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +SELECT count(*) FROM test_parallel_copy; + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy; + +-- instead of insert trigger, no parallelism should be picked +COPY instead_of_insert_tbl_view FROM stdin WITH(PARALLEL 1);; +test1 +\. + +SELECT count(*) FROM instead_of_insert_tbl_view; + +-- parallel copy test for a partitioned table with a before insert trigger on +-- one of the partition +ALTER FUNCTION parallel_copy_trig_func PARALLEL SAFE; + +CREATE TABLE test_parallel_copy_part ( + a INT, + b INT, + c TEXT default 'stuff', + d TEXT, + e TEXT +) PARTITION BY LIST (b); + +CREATE TABLE test_parallel_copy_part_a1 (c TEXT, b INT, a INT, e TEXT, d TEXT); + +CREATE TABLE test_parallel_copy_part_a2 (a INT, c TEXT, b INT, d TEXT, e TEXT); + +ALTER TABLE test_parallel_copy_part ATTACH PARTITION test_parallel_copy_part_a1 FOR VALUES IN(1); + +ALTER TABLE test_parallel_copy_part ATTACH PARTITION test_parallel_copy_part_a2 FOR VALUES IN(2); + +CREATE TRIGGER parallel_copy_trig +BEFORE INSERT ON test_parallel_copy_part_a2 +FOR EACH ROW +EXECUTE PROCEDURE parallel_copy_trig_func(); + +COPY test_parallel_copy_part FROM stdin WITH (PARALLEL 1); +1 1 test_c1 test_d1 test_e1 +2 2 test_c2 test_d2 test_e2 +\. + +SELECT count(*) FROM test_parallel_copy_part; + +DROP TRIGGER parallel_copy_trig ON test_parallel_copy_part_a2; + -- clean up +DROP TABLE test_copy_ri; +DROP TABLE test_copy_pk; +DROP TABLE test_copy_expr; +DROP TABLE testeoc; +DROP TABLE testnl; DROP TABLE forcetest; +DROP TABLE test_parallel_copy; +DROP TABLE test_parallel_copy_unlogged; +DROP TABLE test_parallel_copy_part; +DROP TABLE testserial; +DROP TABLE testnull; DROP TABLE vistest; DROP FUNCTION truncate_in_subxact(); +DROP FUNCTION parallel_copy_trig_func(); DROP TABLE x, y; DROP TABLE rls_t1 CASCADE; DROP ROLE regress_rls_copy_user; -- 1.8.3.1