From 81dbaa5dd4d569417ad5f907dc898699de5e6c49 Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Mon, 8 Aug 2022 09:47:26 +0530 Subject: [PATCH v42 1/2] Check and throw a warning if publication tables were also subscribing from other publishers. Checks and throws a warning if 'copy_data = true' and 'origin = none' but the publication tables were also replicated from other publishers. ------------------------------------------------------------------------------- The steps below help to demonstrate how the new warning is useful: The initial copy phase has no way to know the origin of the row data, so if 'copy_data = true' in the step 4 below, then a warning will be thrown to notify user that potentially non-local data might have been copied. e.g. step 1: node1=# CREATE PUBLICATION pub_node1 FOR TABLE t1; CREATE PUBLICATION step 2: node2=# CREATE PUBLICATION pub_node2 FOR TABLE t1; CREATE PUBLICATION step 3: node1=# CREATE SUBSCRIPTION sub_node1_node2 CONNECTION '' node1-# PUBLICATION pub_node2 WITH (copy_data = true, origin = none); CREATE SUBSCRIPTION step 4: CREATE SUBSCRIPTION sub_node2_node1 CONNECTION '' PUBLICATION pub_node1 WITH (copy_data = true, origin = none); WARNING: publisher has subscribed table "public.t1" from some other publisher DETAIL: Publisher might have subscribed one or more tables from some other publisher. HINT: Verify that these publisher tables do not have data that has an origin associated before proceeding to avoid inconsistency. Author: Vignesh C Reviewed-By: Peter Smith, Amit Kapila, Jonathan Katz, Shi yu, Wang wei Discussion: https://www.postgresql.org/message-id/CALDaNm0gwjY_4HFxvvty01BOT01q_fJLKQ3pWP9=9orqubhjcQ@mail.gmail.com --- doc/src/sgml/ref/alter_subscription.sgml | 6 + doc/src/sgml/ref/create_subscription.sgml | 22 ++ src/backend/commands/subscriptioncmds.c | 138 +++++++- src/test/subscription/t/030_origin.pl | 364 +++++++++++++++++++--- 4 files changed, 487 insertions(+), 43 deletions(-) diff --git a/doc/src/sgml/ref/alter_subscription.sgml b/doc/src/sgml/ref/alter_subscription.sgml index 64efc21f53..c1e586bddf 100644 --- a/doc/src/sgml/ref/alter_subscription.sgml +++ b/doc/src/sgml/ref/alter_subscription.sgml @@ -168,6 +168,12 @@ ALTER SUBSCRIPTION name RENAME TO < that are being subscribed to when the replication starts. The default is true. + + Refer to the for the + usage of true for copy_data + parameter and its interaction with the origin + parameter. + Previously subscribed tables are not copied, even if a table's row filter WHERE clause has since been modified. diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 7390c715bc..e805642c44 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -213,6 +213,12 @@ CREATE SUBSCRIPTION subscription_name for details. + + Refer to the for the + usage of true for copy_data + parameter and its interaction with the origin + parameter. + @@ -315,6 +321,12 @@ CREATE SUBSCRIPTION subscription_nameany means that the publisher sends changes regardless of their origin. The default is any. + + Refer to the for the + usage of true for copy_data + parameter and its interaction with the origin + parameter. + @@ -386,6 +398,16 @@ CREATE SUBSCRIPTION subscription_name + + If the subscription is created with origin = none and + copy_data = true, it will check if the publisher has + subscribed to the same table from other publishers and, if so, throw a + warning to notify user to check the publisher tables. The user can ensure + that publisher tables do not have data which has an origin associated before + continuing with any other operations to prevent inconsistent data being + replicated. + + diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 670b219c8d..5e9b65fe72 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -92,6 +92,10 @@ typedef struct SubOpts } SubOpts; static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static void check_pub_table_subscribed(WalReceiverConn *wrconn, + List *publications, bool copydata, + char *origin, Oid *subrel_local_oids, + int subrel_count); static void check_duplicates_in_publist(List *publist, Datum *datums); static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname); static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err); @@ -680,6 +684,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, PG_TRY(); { check_publications(wrconn, publications); + check_pub_table_subscribed(wrconn, publications, opts.copy_data, + opts.origin, NULL, 0); /* * Set sync state based on if we were asked to do data copy or @@ -786,6 +792,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, ListCell *lc; int off; int remove_rel_len; + int subrel_count; Relation rel = NULL; typedef struct SubRemoveRels { @@ -815,13 +822,14 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, /* Get local table list. */ subrel_states = GetSubscriptionRelations(sub->oid, false); + subrel_count = list_length(subrel_states); /* * Build qsorted array of local table oids for faster lookup. This can * potentially contain all tables in the database so speed of lookup * is important. */ - subrel_local_oids = palloc(list_length(subrel_states) * sizeof(Oid)); + subrel_local_oids = palloc(subrel_count * sizeof(Oid)); off = 0; foreach(lc, subrel_states) { @@ -829,14 +837,19 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, subrel_local_oids[off++] = relstate->relid; } - qsort(subrel_local_oids, list_length(subrel_states), + qsort(subrel_local_oids, subrel_count, sizeof(Oid), oid_cmp); + /* Check whether we can allow copy of newly added relations. */ + check_pub_table_subscribed(wrconn, sub->publications, copy_data, + sub->origin, subrel_local_oids, + subrel_count); + /* * Rels that we want to remove from subscription and drop any slots * and origins corresponding to them. */ - sub_remove_rels = palloc(list_length(subrel_states) * sizeof(SubRemoveRels)); + sub_remove_rels = palloc(subrel_count * sizeof(SubRemoveRels)); /* * Walk over the remote tables and try to match them to locally known @@ -862,7 +875,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, pubrel_local_oids[off++] = relid; if (!bsearch(&relid, subrel_local_oids, - list_length(subrel_states), sizeof(Oid), oid_cmp)) + subrel_count, sizeof(Oid), oid_cmp)) { AddSubscriptionRelState(sub->oid, relid, copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY, @@ -881,7 +894,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, sizeof(Oid), oid_cmp); remove_rel_len = 0; - for (off = 0; off < list_length(subrel_states); off++) + for (off = 0; off < subrel_count; off++) { Oid relid = subrel_local_oids[off]; @@ -1781,6 +1794,121 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) table_close(rel, RowExclusiveLock); } +/* + * Check and throw a warning if the publisher has subscribed to the same table + * from some other publisher. This check is required only if "copy_data = true" + * and "origin = none" for CREATE SUBSCRIPTION and + * ALTER SUBSCRIPTION ... REFRESH statements to notify user that data having + * origin might have been copied. + * + * This check need not be performed on the tables that are already added + * because incremental sync for those tables will happen through WAL and the + * origin of the data can be identified from the WAL records. + * + * subrel_local_oids contains the list of relation oids that are already + * present on the subscriber. + */ +static void +check_pub_table_subscribed(WalReceiverConn *wrconn, List *publications, + bool copydata, char *origin, Oid *subrel_local_oids, + int subrel_count) +{ + WalRcvExecResult *res; + StringInfoData cmd; + TupleTableSlot *slot; + Oid tableRow[2] = {TEXTOID, TEXTOID}; + + if (copydata == false || !origin || + (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0)) + return; + + initStringInfo(&cmd); + appendStringInfoString(&cmd, + "SELECT DISTINCT N.nspname AS schemaname,\n" + " C.relname AS tablename\n" + "FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT\n" + " LEFT JOIN pg_subscription_rel PS ON (GPT.relid = PS.srrelid),\n" + " pg_class C JOIN pg_namespace N ON (N.oid = C.relnamespace)\n" + "WHERE C.oid = GPT.relid AND PS.srrelid IS NOT NULL AND P.pubname IN ("); + get_publications_str(publications, &cmd, true); + appendStringInfoChar(&cmd, ')'); + + res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + pfree(cmd.data); + + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not receive list of replicated tables from the publisher: %s", + res->err))); + + /* Process tables. */ + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + char *nspname; + char *relname; + bool isnull; + bool isnewtable = true; + + nspname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + relname = TextDatumGetCString(slot_getattr(slot, 2, &isnull)); + Assert(!isnull); + + /* Skip already added tables */ + if (subrel_count) + { + RangeVar *rv; + Oid relid; + + rv = makeRangeVar(nspname, relname, -1); + relid = RangeVarGetRelid(rv, AccessShareLock, false); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + if (bsearch(&relid, subrel_local_oids, + subrel_count, sizeof(Oid), oid_cmp)) + isnewtable = false; + } + + ExecClearTuple(slot); + + if (!isnewtable) + { + pfree(nspname); + pfree(relname); + continue; + } + + /* + * Throw a warning if the publisher has subscribed to the same table + * from some other publisher. We cannot know the origin of data during + * the initial sync. Data origins can be found only from the WAL by + * looking at the origin id. + * + * XXX: For simplicity, we don't check whether the table has any data + * or not. If the table doesn't have any data then we don't need to + * distinguish between data having origin and data not having origin so + * we can avoid throwing a warning in that case. + */ + ereport(WARNING, + errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publisher has subscribed table \"%s.%s\" from some other publisher", + nspname, relname), + errdetail("Publisher might have subscribed one or more tables from some other publisher."), + errhint("Verify that these publisher tables do not have data that has an origin associated before proceeding to avoid inconsistency.")); + break; + } + + ExecDropSingleTupleTableSlot(slot); + + walrcv_clear_result(res); +} + /* * Get the list of tables which belong to specified publications on the * publisher connection. diff --git a/src/test/subscription/t/030_origin.pl b/src/test/subscription/t/030_origin.pl index b297a51f7c..0be1c29a2e 100644 --- a/src/test/subscription/t/030_origin.pl +++ b/src/test/subscription/t/030_origin.pl @@ -1,13 +1,115 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Test the CREATE SUBSCRIPTION 'origin' parameter. +# Test the CREATE SUBSCRIPTION 'origin' parameter and its interaction with +# 'copy_data' parameter. use strict; use warnings; use PostgreSQL::Test::Cluster; use PostgreSQL::Test::Utils; use Test::More; +my $result; +my $stdout; +my $stderr; + +my $subname_AB = 'tap_sub_A_B'; +my $subname_AC = 'tap_sub_A_C'; +my $subname_BA = 'tap_sub_B_A'; +my $subname_BC = 'tap_sub_B_C'; +my $subname_CA = 'tap_sub_C_A'; +my $subname_CB = 'tap_sub_C_B'; + +# Detach node_C from the node-group of (node_A, node_B, node_C) and clean the +# table contents from all nodes. +sub detach_node_clean_table_data +{ + my ($node_A, $node_B, $node_C) = @_; + $node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A_C"); + $node_B->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_B_C"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_A"); + $node_C->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_C_B"); + + $result = + $node_A->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_B->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(1), 'check subscription was dropped on subscriber'); + + $result = + $node_C->safe_psql('postgres', "SELECT count(*) FROM pg_subscription"); + is($result, qq(0), 'check subscription was dropped on subscriber'); + + $result = $node_A->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_B->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(1), 'check replication slot was dropped on publisher'); + + $result = $node_C->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_slots"); + is($result, qq(0), 'check replication slot was dropped on publisher'); + + $node_A->safe_psql('postgres', "TRUNCATE tab"); + $node_B->safe_psql('postgres', "TRUNCATE tab"); + $node_C->safe_psql('postgres', "TRUNCATE tab"); +} + +# Subroutine to verify the data is replicated successfully. +sub verify_data +{ + my ($node_A, $node_B, $node_C, $expect) = @_; + + $node_A->wait_for_catchup($subname_BA); + $node_A->wait_for_catchup($subname_CA); + $node_B->wait_for_catchup($subname_AB); + $node_B->wait_for_catchup($subname_CB); + $node_C->wait_for_catchup($subname_AC); + $node_C->wait_for_catchup($subname_BC); + + # check that data is replicated to all the nodes + $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); + is($result, qq($expect), 'Data is replicated as expected'); + + $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); + is($result, qq($expect), 'Data is replicated as expected'); + + $result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); + is($result, qq($expect), 'Data is replicated as expected'); +} + +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; + +# Subroutine to create subscription and wait until the initial sync is +# completed. Subroutine expects subscriber node, publisher node, subscription +# name, destination connection string, publication name and the subscription +# parameters to be passed as input parameters. +sub create_subscription +{ + my ($node_subscriber, $node_publisher, $sub_name, $node_connstr, + $pub_name, $sub_params) + = @_; + + # Application_name is always assigned the same value as the subscription + # name. + $node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION $sub_name + CONNECTION '$node_connstr application_name=$sub_name' + PUBLICATION $pub_name + WITH ($sub_params)"); + $node_publisher->wait_for_catchup($sub_name); + + # also wait for initial table sync to finish + $node_subscriber->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +} + ############################################################################### # Setup a bidirectional logical replication between node_A & node_B ############################################################################### @@ -32,33 +134,17 @@ $node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_A (pub) -> node_B (sub) my $node_A_connstr = $node_A->connstr . ' dbname=postgres'; $node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab"); -my $appname_B1 = 'tap_sub_B1'; -$node_B->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_B1 - CONNECTION '$node_A_connstr application_name=$appname_B1' - PUBLICATION tap_pub_A - WITH (origin = none)"); +create_subscription($node_B, $node_A, $subname_BA, $node_A_connstr, + 'tap_pub_A', 'copy_data = on, origin = none'); # node_B (pub) -> node_A (sub) my $node_B_connstr = $node_B->connstr . ' dbname=postgres'; $node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab"); -my $appname_A = 'tap_sub_A'; -$node_A->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_A - CONNECTION '$node_B_connstr application_name=$appname_A' - PUBLICATION tap_pub_B - WITH (origin = none, copy_data = off)"); - -# Wait for initial table sync to finish -$node_A->wait_for_subscription_sync($node_B, $appname_A); -$node_B->wait_for_subscription_sync($node_A, $appname_B1); +create_subscription($node_A, $node_B, $subname_AB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = none'); is(1, 1, 'Bidirectional replication setup is complete'); -my $result; - ############################################################################### # Check that bidirectional logical replication setup does not cause infinite # recursive insertion. @@ -68,8 +154,8 @@ my $result; $node_A->safe_psql('postgres', "INSERT INTO tab VALUES (11);"); $node_B->safe_psql('postgres', "INSERT INTO tab VALUES (21);"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); # check that transaction was committed on subscriber(s) $result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); @@ -85,8 +171,8 @@ is( $result, qq(11 $node_A->safe_psql('postgres', "DELETE FROM tab;"); -$node_A->wait_for_catchup($appname_B1); -$node_B->wait_for_catchup($appname_A); +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); ############################################################################### # Check that remote data of node_B (that originated from node_C) is not @@ -109,23 +195,15 @@ $node_C->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY)"); # node_C (pub) -> node_B (sub) my $node_C_connstr = $node_C->connstr . ' dbname=postgres'; $node_C->safe_psql('postgres', "CREATE PUBLICATION tap_pub_C FOR TABLE tab"); - -my $appname_B2 = 'tap_sub_B2'; -$node_B->safe_psql( - 'postgres', " - CREATE SUBSCRIPTION tap_sub_B2 - CONNECTION '$node_C_connstr application_name=$appname_B2' - PUBLICATION tap_pub_C - WITH (origin = none)"); - -$node_B->wait_for_subscription_sync($node_C, $appname_B2); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = none'); # insert a record $node_C->safe_psql('postgres', "INSERT INTO tab VALUES (32);"); -$node_C->wait_for_catchup($appname_B2); -$node_B->wait_for_catchup($appname_A); -$node_A->wait_for_catchup($appname_B1); +$node_C->wait_for_catchup($subname_BC); +$node_B->wait_for_catchup($subname_AB); +$node_A->wait_for_catchup($subname_BA); $result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); is($result, qq(32), 'The node_C data replicated to node_B'); @@ -136,6 +214,216 @@ is($result, qq(), 'Remote data originating from another node (not the publisher) is not replicated when origin parameter is none' ); +# clear the operations done by this test +$node_B->safe_psql( + 'postgres', " + DROP SUBSCRIPTION $subname_BC"); +# no need to wait for catchup of delete operation performed in node_C as +# the subscription for node_C publication has been dropped +$node_C->safe_psql( + 'postgres', " + DELETE FROM tab"); + +# wait for catchup of bidirectional logical replication nodes node_A & node_B +$node_B->safe_psql( + 'postgres', " + DELETE FROM tab where a = 32"); + +$node_A->wait_for_catchup($subname_BA); +$node_B->wait_for_catchup($subname_AB); + +############################################################################### +# Specify origin as 'none' which indicates that the publisher should only +# replicate the changes that are generated locally from node_B, but in +# this case since the node_B is also subscribing data from node_A, node_B can +# have remotely originated data from node_A. We throw a warning, in this case, +# to draw attention to there being possible remote data. +############################################################################### +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub_A2 + CONNECTION '$node_B_connstr application_name=$subname_AB' + PUBLICATION tap_pub_B + WITH (origin = none, copy_data = on)"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? publisher has subscribed table "public.tab" from some other publisher/, + "Create subscription with origin = none and copy_data when the publisher has subscribed same table" +); + +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Alter subscription ... refresh publication should be successful when no new +# table is added +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION"); + +# Check Alter subscription ... refresh publication when there is a new +# table that is subscribing data from a different publication +$node_A->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); +$node_B->safe_psql('postgres', "CREATE TABLE tab_new (a int PRIMARY KEY)"); + +# add a new table to the publication +$node_A->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_A ADD TABLE tab_new"); +$node_B->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_B ADD TABLE tab_new"); + +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION"); + +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# Alter subscription ... refresh publication should throw a warning when a new +# table in the publisher is subscribing data from a different publication +($result, $stdout, $stderr) = $node_A->psql( + 'postgres', " + ALTER SUBSCRIPTION tap_sub_A2 REFRESH PUBLICATION"); +like( + $stderr, + qr/WARNING: ( [A-Z0-9]+:)? publisher has subscribed table "public.tab_new" from some other publisher/, + "Refresh publication when the publisher has subscribed for the new table, but the subscriber-side wants origin=none" +); + +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# clear the operations done by this test +$node_A->safe_psql('postgres', "DROP TABLE tab_new"); +$node_B->safe_psql('postgres', "DROP TABLE tab_new"); +$node_A->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_A2"); +$node_A->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION"); +$node_A->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_B->safe_psql( + 'postgres', " + ALTER SUBSCRIPTION $subname_BA REFRESH PUBLICATION"); +$node_B->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) has pre-existing +# data and the new node (node_C) does not have any data. +############################################################################### +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = none'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = none'); +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = on, origin = none'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = none'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (13);"); +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (23);"); +$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (33);"); + +verify_data( + $node_A, $node_B, $node_C, '13 +23 +33'); + +detach_node_clean_table_data($node_A, $node_B, $node_C); + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) and the new node +# (node_C) does not have any data. +############################################################################### +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = none'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = off, origin = none'); +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = off, origin = none'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = none'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (14);"); +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (24);"); +$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (34);"); + +verify_data( + $node_A, $node_B, $node_C, '14 +24 +34'); + +detach_node_clean_table_data($node_A, $node_B, $node_C); + +############################################################################### +# Join 3rd node (node_C) to the existing 2 nodes(node_A & node_B) bidirectional +# replication setup when the existing nodes (node_A & node_B) has no data and +# the new node (node_C) some pre-existing data. +############################################################################### +$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (35);"); + +$result = $node_A->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_B->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(), 'Check existing data'); + +$result = $node_C->safe_psql('postgres', "SELECT * FROM tab ORDER BY 1;"); +is($result, qq(35), 'Check existing data'); + +create_subscription($node_A, $node_C, $subname_AC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = none'); +create_subscription($node_B, $node_C, $subname_BC, $node_C_connstr, + 'tap_pub_C', 'copy_data = on, origin = none'); + +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete');"); + +$node_C->safe_psql('postgres', "TRUNCATE tab"); + +# include truncates now +$node_C->safe_psql('postgres', + "ALTER PUBLICATION tap_pub_C SET (publish='insert,update,delete,truncate');" +); + +create_subscription($node_C, $node_A, $subname_CA, $node_A_connstr, + 'tap_pub_A', 'copy_data = on, origin = none'); +create_subscription($node_C, $node_B, $subname_CB, $node_B_connstr, + 'tap_pub_B', 'copy_data = off, origin = none'); + +# insert some data in all the nodes +$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (16);"); +$node_B->safe_psql('postgres', "INSERT INTO tab VALUES (26);"); +$node_C->safe_psql('postgres', "INSERT INTO tab VALUES (36);"); + +verify_data( + $node_A, $node_B, $node_C, '16 +26 +35 +36'); + # shutdown $node_B->stop('fast'); $node_A->stop('fast'); -- 2.32.0