From 0bba087f29019e0ca0cc6f8996db4f21533b0767 Mon Sep 17 00:00:00 2001 From: Melih Mutlu Date: Mon, 8 Aug 2022 14:14:44 +0300 Subject: [PATCH] Allow logical replication to copy table in binary If binary option is enabled in a subscription, copy tables in binary format during table synchronization. This commit modifies tests 014_binary.pl and 002_types.pl to test initial table synchronization step in binary format --- doc/src/sgml/ref/create_subscription.sgml | 8 +- src/backend/replication/logical/tablesync.c | 11 +- src/test/subscription/t/002_types.pl | 263 ++++++++++++++++---- src/test/subscription/t/014_binary.pl | 28 ++- 4 files changed, 248 insertions(+), 62 deletions(-) diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7390c715bc..9a576a2b95 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -183,14 +183,18 @@ CREATE SUBSCRIPTION subscription_namebinary (boolean) - Specifies whether the subscription will request the publisher to - send the data in binary format (as opposed to text). + Specifies whether the subscription will copy the initial data to + synchronize relations in binary format and also request the publisher + to send the data in binary format (as opposed to text). The default is false. Even when this option is enabled, only data types having binary send and receive functions will be transferred in binary. + Since logical replication uses COPY command, + it inherits restrictions of COPY command for + binary format.(See .) When doing cross-version replication, it could be that the publisher has a binary send function for some data type, but the subscriber lacks a binary receive function for that type. In diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6a01ffd273..a3b128f6fb 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -101,6 +101,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "parser/parse_relation.h" #include "pgstat.h" #include "replication/logicallauncher.h" @@ -1032,6 +1033,7 @@ copy_table(Relation rel) CopyFromState cstate; List *attnamelist; ParseState *pstate; + List *options = NIL; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), @@ -1110,6 +1112,13 @@ copy_table(Relation rel) appendStringInfoString(&cmd, ") TO STDOUT"); } + + if (MySubscription->binary) + { + appendStringInfoString(&cmd, " WITH (FORMAT binary)"); + options = lappend(options, makeDefElem("format", (Node *) makeString("binary"), -1)); + } + res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL); pfree(cmd.data); if (res->status != WALRCV_OK_COPY_OUT) @@ -1126,7 +1135,7 @@ copy_table(Relation rel) NULL, false, false); attnamelist = make_copy_attnamelist(relmapentry); - cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, NIL); + cstate = BeginCopyFrom(pstate, rel, NULL, NULL, false, copy_read_data, attnamelist, options); /* Do the copy */ (void) CopyFrom(cstate); diff --git a/src/test/subscription/t/002_types.pl b/src/test/subscription/t/002_types.pl index d6c6f49327..4386033962 100644 --- a/src/test/subscription/t/002_types.pl +++ b/src/test/subscription/t/002_types.pl @@ -19,6 +19,11 @@ my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); $node_subscriber->start; +# Create binary subscriber node +my $node_subscriber_binary = PostgreSQL::Test::Cluster->new('subscriber_binary'); +$node_subscriber_binary->init(allows_streaming => 'logical'); +$node_subscriber_binary->start; + # Create some preexisting content on publisher my $ddl = qq( CREATE EXTENSION hstore WITH SCHEMA public; @@ -104,6 +109,100 @@ my $ddl = qq( # Setup structure on both nodes $node_publisher->safe_psql('postgres', $ddl); $node_subscriber->safe_psql('postgres', $ddl); +$node_subscriber_binary->safe_psql('postgres', $ddl); + +# Insert initial test data +$node_publisher->safe_psql( + 'postgres', qq( + -- test_tbl_one_array_col + INSERT INTO tst_one_array (a, b) VALUES + (1, '{1, 2, 3}'), + (2, '{2, 3, 1}'); + + -- test_tbl_arrays + INSERT INTO tst_arrays (a, b, c, d) VALUES + ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'), + ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'); + + -- test_tbl_single_enum + INSERT INTO tst_one_enum (a, b) VALUES + (1, 'a'), + (2, 'b'); + + -- test_tbl_enums + INSERT INTO tst_enums (a, b) VALUES + ('a', '{b, c}'), + ('b', '{c, a}'); + + -- test_tbl_single_composites + INSERT INTO tst_one_comp (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)); + + -- test_tbl_composites + INSERT INTO tst_comps (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]); + + -- test_tbl_composite_with_enums + INSERT INTO tst_comp_enum (a, b) VALUES + (1, ROW(1.0, 'a', 1)), + (2, ROW(2.0, 'b', 2)); + + -- test_tbl_composite_with_enums_array + INSERT INTO tst_comp_enum_array (a, b) VALUES + (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), + (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]); + + -- test_tbl_composite_with_single_enums_array_in_composite + INSERT INTO tst_comp_one_enum_array (a, b) VALUES + (1, ROW(1.0, '{a, b, c}', 1)), + (2, ROW(2.0, '{a, b, c}', 2)); + + -- test_tbl_composite_with_enums_array_in_composite + INSERT INTO tst_comp_enum_what (a, b) VALUES + (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), + (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]); + + -- test_tbl_mixed_composites + INSERT INTO tst_comp_mix_array (a, b) VALUES + (ROW( + ROW(1,'a',1), + ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], + 'a', + '{a,b,NULL,c}'), + ARRAY[ + ROW( + ROW(1,'a',1), + ARRAY[ + ROW(1,'a',1)::tst_comp_basic_t, + ROW(2,'b',2)::tst_comp_basic_t, + NULL + ], + 'a', + '{a,b,c}' + )::tst_comp_mix_t + ] + ); + + -- test_tbl_range + INSERT INTO tst_range (a, b) VALUES + (1, '[1, 10]'), + (2, '[2, 20]'); + + -- test_tbl_range_array + INSERT INTO tst_range_array (a, b, c) VALUES + (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'), + (2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'); + + -- tst_hstore + INSERT INTO tst_hstore (a, b) VALUES + (1, '"a"=>"1"'), + (2, '"zzz"=>"foo"'); + + -- tst_dom_constr + INSERT INTO tst_dom_constr VALUES (10); +)); # Setup logical replication my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; @@ -113,89 +212,126 @@ $node_publisher->safe_psql('postgres', $node_subscriber->safe_psql('postgres', "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_slot)" ); +$node_subscriber_binary->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_binary CONNECTION '$publisher_connstr' PUBLICATION tap_pub WITH (slot_name = tap_sub_binary_slot, binary = true)" +); # Wait for initial sync to finish as well $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub'); +$node_subscriber_binary->wait_for_subscription_sync($node_publisher, 'tap_sub_binary'); + +my $sync_check = qq( + SET timezone = '+2'; + SELECT a, b FROM tst_one_array ORDER BY a; + SELECT a, b, c, d FROM tst_arrays ORDER BY a; + SELECT a, b FROM tst_one_enum ORDER BY a; + SELECT a, b FROM tst_enums ORDER BY a; + SELECT a, b FROM tst_one_comp ORDER BY a; + SELECT a, b FROM tst_comps ORDER BY a; + SELECT a, b FROM tst_comp_enum ORDER BY a; + SELECT a, b FROM tst_comp_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_one_enum_array ORDER BY a; + SELECT a, b FROM tst_comp_enum_what ORDER BY a; + SELECT a, b FROM tst_comp_mix_array ORDER BY a; + SELECT a, b FROM tst_range ORDER BY a; + SELECT a, b, c FROM tst_range_array ORDER BY a; + SELECT a, b FROM tst_hstore ORDER BY a; +); + +# Check the synced data on subscribers +my $result = $node_subscriber->safe_psql('postgres', $sync_check); +my $result_binary = $node_subscriber_binary->safe_psql('postgres', $sync_check); + +my $sync_result = '1|{1,2,3} +2|{2,3,1} +{1,2,3}|{a,b,c}|{1.1,2.2,3.3}|{"1 day","2 days","3 days"} +{2,3,1}|{b,c,a}|{2.2,3.3,1.1}|{00:02:00,00:03:00,00:01:00} +1|a +2|b +a|{b,c} +b|{c,a} +1|(1,a,1) +2|(2,b,2) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +1|(1,a,1) +2|(2,b,2) +(1,a,1)|{"(1,a,1)"} +(2,b,2)|{"(2,b,2)"} +1|(1,"{a,b,c}",1) +2|(2,"{a,b,c}",2) +(1,"{a,b,c}",1)|{"(1,\"{a,b,c}\",1)"} +(2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} +("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} +1|[1,11) +2|[2,21) +1|["2014-08-04 00:00:00+02",infinity)|{"[1,3)","[10,21)"} +2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} +1|"a"=>"1" +2|"zzz"=>"foo"'; + +is( $result, $sync_result, 'check initial sync on subscriber'); +is( $result_binary, $sync_result, 'check initial sync on subscriber in binary'); # Insert initial test data $node_publisher->safe_psql( 'postgres', qq( -- test_tbl_one_array_col INSERT INTO tst_one_array (a, b) VALUES - (1, '{1, 2, 3}'), - (2, '{2, 3, 1}'), (3, '{3, 2, 1}'), (4, '{4, 3, 2}'), (5, '{5, NULL, 3}'); -- test_tbl_arrays INSERT INTO tst_arrays (a, b, c, d) VALUES - ('{1, 2, 3}', '{"a", "b", "c"}', '{1.1, 2.2, 3.3}', '{"1 day", "2 days", "3 days"}'), - ('{2, 3, 1}', '{"b", "c", "a"}', '{2.2, 3.3, 1.1}', '{"2 minutes", "3 minutes", "1 minute"}'), ('{3, 1, 2}', '{"c", "a", "b"}', '{3.3, 1.1, 2.2}', '{"3 years", "1 year", "2 years"}'), ('{4, 1, 2}', '{"d", "a", "b"}', '{4.4, 1.1, 2.2}', '{"4 years", "1 year", "2 years"}'), ('{5, NULL, NULL}', '{"e", NULL, "b"}', '{5.5, 1.1, NULL}', '{"5 years", NULL, NULL}'); -- test_tbl_single_enum INSERT INTO tst_one_enum (a, b) VALUES - (1, 'a'), - (2, 'b'), (3, 'c'), (4, 'd'), (5, NULL); -- test_tbl_enums INSERT INTO tst_enums (a, b) VALUES - ('a', '{b, c}'), - ('b', '{c, a}'), ('c', '{b, a}'), ('d', '{c, b}'), ('e', '{d, NULL}'); -- test_tbl_single_composites INSERT INTO tst_one_comp (a, b) VALUES - (1, ROW(1.0, 'a', 1)), - (2, ROW(2.0, 'b', 2)), (3, ROW(3.0, 'c', 3)), (4, ROW(4.0, 'd', 4)), (5, ROW(NULL, NULL, 5)); -- test_tbl_composites INSERT INTO tst_comps (a, b) VALUES - (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_basic_t]), - (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_basic_t]), (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_basic_t]), (ROW(4.0, 'd', 4), ARRAY[ROW(4, 'd', 3)::tst_comp_basic_t]), (ROW(5.0, 'e', NULL), ARRAY[NULL, ROW(5, NULL, 5)::tst_comp_basic_t]); -- test_tbl_composite_with_enums INSERT INTO tst_comp_enum (a, b) VALUES - (1, ROW(1.0, 'a', 1)), - (2, ROW(2.0, 'b', 2)), (3, ROW(3.0, 'c', 3)), (4, ROW(4.0, 'd', 4)), (5, ROW(NULL, 'e', NULL)); -- test_tbl_composite_with_enums_array INSERT INTO tst_comp_enum_array (a, b) VALUES - (ROW(1.0, 'a', 1), ARRAY[ROW(1, 'a', 1)::tst_comp_enum_t]), - (ROW(2.0, 'b', 2), ARRAY[ROW(2, 'b', 2)::tst_comp_enum_t]), (ROW(3.0, 'c', 3), ARRAY[ROW(3, 'c', 3)::tst_comp_enum_t]), (ROW(4.0, 'd', 3), ARRAY[ROW(3, 'd', 3)::tst_comp_enum_t]), (ROW(5.0, 'e', 3), ARRAY[ROW(3, 'e', 3)::tst_comp_enum_t, NULL]); -- test_tbl_composite_with_single_enums_array_in_composite INSERT INTO tst_comp_one_enum_array (a, b) VALUES - (1, ROW(1.0, '{a, b, c}', 1)), - (2, ROW(2.0, '{a, b, c}', 2)), (3, ROW(3.0, '{a, b, c}', 3)), (4, ROW(4.0, '{c, b, d}', 4)), (5, ROW(5.0, '{NULL, e, NULL}', 5)); -- test_tbl_composite_with_enums_array_in_composite INSERT INTO tst_comp_enum_what (a, b) VALUES - (ROW(1.0, '{a, b, c}', 1), ARRAY[ROW(1, '{a, b, c}', 1)::tst_comp_enum_array_t]), - (ROW(2.0, '{b, c, a}', 2), ARRAY[ROW(2, '{b, c, a}', 1)::tst_comp_enum_array_t]), (ROW(3.0, '{c, a, b}', 1), ARRAY[ROW(3, '{c, a, b}', 1)::tst_comp_enum_array_t]), (ROW(4.0, '{c, b, d}', 4), ARRAY[ROW(4, '{c, b, d}', 4)::tst_comp_enum_array_t]), (ROW(5.0, '{c, NULL, b}', NULL), ARRAY[ROW(5, '{c, e, b}', 1)::tst_comp_enum_array_t]); @@ -203,10 +339,10 @@ $node_publisher->safe_psql( -- test_tbl_mixed_composites INSERT INTO tst_comp_mix_array (a, b) VALUES (ROW( - ROW(1,'a',1), - ARRAY[ROW(1,'a',1)::tst_comp_basic_t, ROW(2,'b',2)::tst_comp_basic_t], - 'a', - '{a,b,NULL,c}'), + ROW(2,'b',2), + ARRAY[ROW(2,'b',2)::tst_comp_basic_t, ROW(3,'c',3)::tst_comp_basic_t], + 'b', + '{b,c,NULL,d}'), ARRAY[ ROW( ROW(1,'a',1), @@ -223,36 +359,29 @@ $node_publisher->safe_psql( -- test_tbl_range INSERT INTO tst_range (a, b) VALUES - (1, '[1, 10]'), - (2, '[2, 20]'), (3, '[3, 30]'), (4, '[4, 40]'), (5, '[5, 50]'); -- test_tbl_range_array INSERT INTO tst_range_array (a, b, c) VALUES - (1, tstzrange('Mon Aug 04 00:00:00 2014 CEST'::timestamptz, 'infinity'), '{"[1,2]", "[10,20]"}'), - (2, tstzrange('Sat Aug 02 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[2,3]", "[20,30]"}'), (3, tstzrange('Fri Aug 01 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[3,4]"}'), (4, tstzrange('Thu Jul 31 00:00:00 2014 CEST'::timestamptz, 'Mon Aug 04 00:00:00 2014 CEST'::timestamptz), '{"[4,5]", NULL, "[40,50]"}'), (5, NULL, NULL); -- tst_hstore INSERT INTO tst_hstore (a, b) VALUES - (1, '"a"=>"1"'), - (2, '"zzz"=>"foo"'), (3, '"123"=>"321"'), (4, '"yellow horse"=>"moaned"'); -- tst_dom_constr - INSERT INTO tst_dom_constr VALUES (10); + INSERT INTO tst_dom_constr VALUES (11); )); $node_publisher->wait_for_catchup('tap_sub'); +$node_publisher->wait_for_catchup('tap_sub_binary'); -# Check the data on subscriber -my $result = $node_subscriber->safe_psql( - 'postgres', qq( +my $initial_check = qq( SET timezone = '+2'; SELECT a, b FROM tst_one_array ORDER BY a; SELECT a, b, c, d FROM tst_arrays ORDER BY a; @@ -268,9 +397,13 @@ my $result = $node_subscriber->safe_psql( SELECT a, b FROM tst_range ORDER BY a; SELECT a, b, c FROM tst_range_array ORDER BY a; SELECT a, b FROM tst_hstore ORDER BY a; -)); +); + +# Check the data on subscribers +$result = $node_subscriber->safe_psql('postgres', $initial_check); +$result_binary = $node_subscriber_binary->safe_psql('postgres', $initial_check); -is( $result, '1|{1,2,3} +my $initial_result = '1|{1,2,3} 2|{2,3,1} 3|{3,2,1} 4|{4,3,2} @@ -321,6 +454,7 @@ e|{d,NULL} (4,"{c,b,d}",4)|{"(4,\"{c,b,d}\",4)"} (5,"{c,NULL,b}",)|{"(5,\"{c,e,b}\",1)"} ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} +("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} 1|[1,11) 2|[2,21) 3|[3,31) @@ -334,8 +468,10 @@ e|{d,NULL} 1|"a"=>"1" 2|"zzz"=>"foo" 3|"123"=>"321" -4|"yellow horse"=>"moaned"', - 'check replicated inserts on subscriber'); +4|"yellow horse"=>"moaned"'; + +is( $result, $initial_result, 'check replicated inserts on subscriber'); +is( $result_binary, $initial_result, 'check replicated inserts on subscriber in binary'); # Run batch of updates $node_publisher->safe_psql( @@ -370,10 +506,9 @@ $node_publisher->safe_psql( )); $node_publisher->wait_for_catchup('tap_sub'); +$node_publisher->wait_for_catchup('tap_sub_binary'); -# Check the data on subscriber -$result = $node_subscriber->safe_psql( - 'postgres', qq( +my $update_check = qq( SET timezone = '+2'; SELECT a, b FROM tst_one_array ORDER BY a; SELECT a, b, c, d FROM tst_arrays ORDER BY a; @@ -389,9 +524,13 @@ $result = $node_subscriber->safe_psql( SELECT a, b FROM tst_range ORDER BY a; SELECT a, b, c FROM tst_range_array ORDER BY a; SELECT a, b FROM tst_hstore ORDER BY a; -)); +); -is( $result, '1|{4,5,6} +# Check the data on subscribers +$result = $node_subscriber->safe_psql('postgres', $update_check); +$result_binary = $node_subscriber_binary->safe_psql('postgres', $update_check); + +my $update_result = '1|{4,5,6} 2|{2,3,1} 3|{3,2,1} 4|{4,5,6,1} @@ -442,6 +581,7 @@ e|{e,d} (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} ("(1,a,1)","{""(1,a,1)"",""(2,b,2)""}",a,"{a,b,NULL,c}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")",NULL} +("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} 1|[100,1001) 2|[2,21) 3|[3,31) @@ -455,8 +595,10 @@ e|{e,d} 1|"updated"=>"value" 2|"updated"=>"value" 3|"also"=>"updated" -4|"yellow horse"=>"moaned"', - 'check replicated updates on subscriber'); +4|"yellow horse"=>"moaned"'; + +is( $result, $update_result, 'check replicated updates on subscriber'); +is( $result_binary, $update_result, 'check replicated updates on subscriber in binary'); # Run batch of deletes $node_publisher->safe_psql( @@ -490,10 +632,9 @@ $node_publisher->safe_psql( )); $node_publisher->wait_for_catchup('tap_sub'); +$node_publisher->wait_for_catchup('tap_sub_binary'); -# Check the data on subscriber -$result = $node_subscriber->safe_psql( - 'postgres', qq( +my $delete_check = qq( SET timezone = '+2'; SELECT a, b FROM tst_one_array ORDER BY a; SELECT a, b, c, d FROM tst_arrays ORDER BY a; @@ -509,9 +650,13 @@ $result = $node_subscriber->safe_psql( SELECT a, b FROM tst_range ORDER BY a; SELECT a, b, c FROM tst_range_array ORDER BY a; SELECT a, b FROM tst_hstore ORDER BY a; -)); +); -is( $result, '3|{3,2,1} +# Check the data on subscribers +$result = $node_subscriber->safe_psql('postgres', $delete_check); +$result_binary = $node_subscriber_binary->safe_psql('postgres', $delete_check); + +my $delete_result = '3|{3,2,1} 4|{4,5,6,1} 5|{4,5,6,1} {3,1,2}|{c,a,b}|{3.3,1.1,2.2}|{"3 years","1 year","2 years"} @@ -540,26 +685,36 @@ e|{e,d} (2,"{b,c,a}",2)|{"(2,\"{b,c,a}\",1)"} (4,"{c,b,d}",4)|{"(5,\"{a,b,c}\",5)"} (5,"{c,NULL,b}",)|{"(5,\"{a,b,c}\",5)"} +("(2,b,2)","{""(2,b,2)"",""(3,c,3)""}",b,"{b,c,NULL,d}")|{"(\"(1,a,1)\",\"{\"\"(1,a,1)\"\",\"\"(2,b,2)\"\",NULL}\",a,\"{a,b,c}\")"} 2|["2014-08-02 00:00:00+02","2014-08-04 00:00:00+02")|{"[2,4)","[20,31)"} 3|["2014-08-01 00:00:00+02","2014-08-04 00:00:00+02")|{"[3,5)"} 2|"updated"=>"value" 3|"also"=>"updated" -4|"yellow horse"=>"moaned"', - 'check replicated deletes on subscriber'); +4|"yellow horse"=>"moaned"'; + +is( $result, $delete_result, 'check replicated deletes on subscriber'); +is( $result_binary, $delete_result, 'check replicated deletes on subscriber in binary'); # Test a domain with a constraint backed by a SQL-language function, # which needs an active snapshot in order to operate. $node_publisher->safe_psql('postgres', - "INSERT INTO tst_dom_constr VALUES (11)"); + "INSERT INTO tst_dom_constr VALUES (12)"); $node_publisher->wait_for_catchup('tap_sub'); +$node_publisher->wait_for_catchup('tap_sub_binary'); $result = $node_subscriber->safe_psql('postgres', "SELECT sum(a) FROM tst_dom_constr"); -is($result, '21', 'sql-function constraint on domain'); +is($result, '33', 'sql-function constraint on domain'); + +$result_binary = + $node_subscriber->safe_psql('postgres', + "SELECT sum(a) FROM tst_dom_constr"); +is($result_binary, '33', 'sql-function constraint on domain'); $node_subscriber->stop('fast'); +$node_subscriber_binary->stop('fast'); $node_publisher->stop('fast'); done_testing(); diff --git a/src/test/subscription/t/014_binary.pl b/src/test/subscription/t/014_binary.pl index 8d8b35721f..395efee1c7 100644 --- a/src/test/subscription/t/014_binary.pl +++ b/src/test/subscription/t/014_binary.pl @@ -36,6 +36,16 @@ my $ddl = qq( $node_publisher->safe_psql('postgres', $ddl); $node_subscriber->safe_psql('postgres', $ddl); +# Insert some content and make sure it's synced to subscriber +$node_publisher->safe_psql( + 'postgres', qq( + INSERT INTO public.test_arrays (a, b, c) VALUES + ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'); + + INSERT INTO public.test_numerical (a, b, c, d) VALUES + (1, 1.2, 1.3, 10); + )); + # Configure logical replication $node_publisher->safe_psql('postgres', "CREATE PUBLICATION tpub FOR ALL TABLES"); @@ -48,27 +58,35 @@ $node_subscriber->safe_psql('postgres', # Ensure nodes are in sync with each other $node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub'); +my $result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a; + SELECT a, b, c FROM test_arrays ORDER BY a;"); + +is( $result, '1|1.2|1.3|10 +{1,2,3}|{1.1,1.2,1.3}|{one,two,three}', 'check syned data on subscriber'); + # Insert some content and make sure it's replicated across $node_publisher->safe_psql( 'postgres', qq( INSERT INTO public.test_arrays (a, b, c) VALUES - ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'), ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}'); INSERT INTO public.test_numerical (a, b, c, d) VALUES - (1, 1.2, 1.3, 10), (2, 2.2, 2.3, 20), (3, 3.2, 3.3, 30); )); $node_publisher->wait_for_catchup('tsub'); -my $result = $node_subscriber->safe_psql('postgres', - "SELECT a, b, c, d FROM test_numerical ORDER BY a"); +$result = $node_subscriber->safe_psql('postgres', + "SELECT a, b, c, d FROM test_numerical ORDER BY a; + SELECT a, b, c FROM test_arrays ORDER BY a;"); is( $result, '1|1.2|1.3|10 2|2.2|2.3|20 -3|3.2|3.3|30', 'check replicated data on subscriber'); +3|3.2|3.3|30 +{1,2,3}|{1.1,1.2,1.3}|{one,two,three} +{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check replicated data on subscriber'); # Test updates as well $node_publisher->safe_psql( -- 2.25.1