From f7f68fd11211ffbe5770fb565ac80620dd6ccea9 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 11 May 2022 14:29:05 +0800 Subject: [PATCH] Disallow combining publication when column list is different for the same table the main purpose of introducing a column list are statically have different shapes on publisher and subscriber or hide sensitive columns data. In both cases, it doesn't seems make sense to combine column lists. So disallow the cases where column list is different for the same table when combining publications. --- src/backend/commands/subscriptioncmds.c | 37 +++++++-- src/backend/replication/logical/tablesync.c | 61 +++++++++----- src/backend/replication/pgoutput/pgoutput.c | 78 ++++++++--------- src/test/subscription/t/031_column_list.pl | 124 +++++++++------------------- 4 files changed, 154 insertions(+), 146 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index b94236f..8650cc5 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1753,7 +1753,8 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) /* * Get the list of tables which belong to specified publications on the - * publisher connection. + * publisher connection. Also get the column list for each table and check if + * column lists are the same in different publications. */ static List * fetch_table_list(WalReceiverConn *wrconn, List *publications) @@ -1761,17 +1762,34 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {TEXTOID, TEXTOID}; + Oid tableRow[3] = {TEXTOID, TEXTOID, INT2VECTOROID}; List *tablelist = NIL; initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" - " FROM pg_catalog.pg_publication_tables t\n" + appendStringInfoString(&cmd, + "SELECT DISTINCT t.schemaname,\n" + " t.tablename,\n" + " (CASE WHEN (array_length(pr.prattrs, 1) = t.relnatts)\n" + " THEN NULL ELSE pr.prattrs END)\n" + " FROM (SELECT P.pubname AS pubname,\n" + " N.nspname AS schemaname,\n" + " C.relname AS tablename,\n" + " P.oid AS pubid,\n" + " C.oid AS reloid,\n" + " C.relnatts\n" + " FROM pg_publication P,\n" + " LATERAL pg_get_publication_tables(P.pubname) GPT,\n" + " pg_class C JOIN pg_namespace N\n" + " ON (N.oid = C.relnamespace)\n" + " WHERE C.oid = GPT.relid) t\n" + " LEFT OUTER JOIN pg_publication_rel pr\n" + " ON (t.pubid = pr.prpubid AND\n" + " pr.prrelid = reloid)\n" " WHERE t.pubname IN ("); get_publications_str(publications, &cmd, true); appendStringInfoChar(&cmd, ')'); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); pfree(cmd.data); if (res->status != WALRCV_OK_TUPLES) @@ -1795,7 +1813,14 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(!isnull); rv = makeRangeVar(nspname, relname, -1); - tablelist = lappend(tablelist, rv); + + if (list_member(tablelist, rv)) + ereport(WARNING, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); + else + tablelist = lappend(tablelist, rv); ExecClearTuple(slot); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 49ceec3..8ac4171 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -771,7 +771,7 @@ fetch_remote_table_info(char *nspname, char *relname, { WalRcvExecResult *pubres; TupleTableSlot *slot; - Oid attrsRow[] = {INT2OID}; + Oid attrsRow[] = {INT2VECTOROID}; StringInfoData pub_names; bool first = true; @@ -786,21 +786,20 @@ fetch_remote_table_info(char *nspname, char *relname, /* * Fetch info about column lists for the relation (from all the - * publications). We unnest the int2vector values, because that - * makes it easier to combine lists by simply adding the attnums - * to a new bitmap (without having to parse the int2vector data). - * This preserves NULL values, so that if one of the publications - * has no column list, we'll know that. + * publications). */ resetStringInfo(&cmd); appendStringInfo(&cmd, - "SELECT DISTINCT unnest" + "SELECT DISTINCT" + " (CASE WHEN (array_length(pr.prattrs, 1) = c.relnatts)" + " THEN NULL ELSE pr.prattrs END)" " FROM pg_publication p" " LEFT OUTER JOIN pg_publication_rel pr" " ON (p.oid = pr.prpubid AND pr.prrelid = %u)" " LEFT OUTER JOIN unnest(pr.prattrs) ON TRUE," - " LATERAL pg_get_publication_tables(p.pubname) gpt" - " WHERE gpt.relid = %u" + " LATERAL pg_get_publication_tables(p.pubname) gpt," + " pg_class c" + " WHERE gpt.relid = %u AND c.oid = gpt.relid" " AND p.pubname IN ( %s )", lrel->remoteid, lrel->remoteid, @@ -815,27 +814,49 @@ fetch_remote_table_info(char *nspname, char *relname, errmsg("could not fetch column list info for table \"%s.%s\" from publisher: %s", nspname, relname, pubres->err))); + first = true; + /* - * Merge the column lists (from different publications) by creating - * a single bitmap with all the attnums. If we find a NULL value, - * that means one of the publications has no column list for the - * table we're syncing. + * Traverse the column lists from different publications and build a + * single bitmap with the attnums. + * + * During the loop, check that if all the column lists are the same and + * report an error if not. + * + * If we find a NULL value, that means one of the publications has no + * column list for the table we're syncing. */ slot = MakeSingleTupleTableSlot(pubres->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(pubres->tuplestore, true, false, slot)) { Datum cfval = slot_getattr(slot, 1, &isnull); + Bitmapset *cols = NULL; - /* NULL means empty column list, so we're done. */ - if (isnull) + if (!isnull) { - bms_free(included_cols); - included_cols = NULL; - break; + ArrayType *arr; + int nelems; + int16 *elems; + + arr = DatumGetArrayTypeP(cfval); + nelems = ARR_DIMS(arr)[0]; + elems = (int16 *) ARR_DATA_PTR(arr); + + for (int i = 0; i < nelems; i++) + cols = bms_add_member(cols, elems[i]); } - included_cols = bms_add_member(included_cols, - DatumGetInt16(cfval)); + /* NULL means empty column list. */ + if (first) + { + included_cols = cols; + first = false; + } + else if (!bms_equal(included_cols, cols)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + nspname, relname)); ExecClearTuple(slot); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index b197bfd..7ca09de 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -803,16 +803,13 @@ pgoutput_row_filter_exec_expr(ExprState *state, ExprContext *econtext) * Make sure the per-entry memory context exists. */ static void -pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry) +pgoutput_ensure_entry_cxt(PGOutputData *data, RelationSyncEntry *entry, + Relation relation) { - Relation relation; - /* The context may already exist, in which case bail out. */ if (entry->entry_cxt) return; - relation = RelationIdGetRelation(entry->publish_as_relid); - entry->entry_cxt = AllocSetContextCreate(data->cachectx, "entry private context", ALLOCSET_SMALL_SIZES); @@ -941,7 +938,7 @@ pgoutput_row_filter_init(PGOutputData *data, List *publications, { Relation relation = RelationIdGetRelation(entry->publish_as_relid); - pgoutput_ensure_entry_cxt(data, entry); + pgoutput_ensure_entry_cxt(data, entry, relation); /* * Now all the filters for all pubactions are known. Combine them when @@ -978,14 +975,20 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, RelationSyncEntry *entry) { ListCell *lc; + bool first = true; + Relation relation = RelationIdGetRelation(entry->publish_as_relid); /* * Find if there are any column lists for this relation. If there are, - * build a bitmap merging all the column lists. + * build a bitmap using the column lists. * - * All the given publication-table mappings must be checked. + * Note that we don't support the case where column list is different for + * the same table when combining publications. But we still need to check + * all the given publication-table mappings and report an error if any + * publications have different column list. * - * Multiple publications might have multiple column lists for this relation. + * Multiple publications might have multiple column lists for this + * relation. * * FOR ALL TABLES and FOR ALL TABLES IN SCHEMA implies "don't use column * list" so it takes precedence. @@ -995,12 +998,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, Publication *pub = lfirst(lc); HeapTuple cftuple = NULL; Datum cfdatum = 0; - - /* - * Assume there's no column list. Only if we find pg_publication_rel - * entry with a column list we'll switch it to false. - */ - bool pub_no_list = true; + Bitmapset *cols = NULL; /* * If the publication is FOR ALL TABLES then it is treated the same as if @@ -1008,6 +1006,8 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, */ if (!pub->alltables) { + bool pub_no_list = true; + /* * Check for the presence of a column list in this publication. * @@ -1033,39 +1033,43 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, /* * Build the column list bitmap in the per-entry context. - * - * We need to merge column lists from all publications, so we - * update the same bitmapset. If the column list is null, we - * interpret it as replicating all columns. */ if (!pub_no_list) /* when not null */ { - pgoutput_ensure_entry_cxt(data, entry); + pgoutput_ensure_entry_cxt(data, entry, relation); + + cols = pub_collist_to_bitmapset(cols, cfdatum, + entry->entry_cxt); - entry->columns = pub_collist_to_bitmapset(entry->columns, - cfdatum, - entry->entry_cxt); + /* + * If column list includes all the columns of the table, + * set it to NULL. + */ + if (bms_num_members(cols) == RelationGetNumberOfAttributes(relation)) + { + bms_free(cols); + cols = NULL; + } } + + ReleaseSysCache(cftuple); } } - /* - * Found a publication with no column list, so we're done. But first - * discard column list we might have from preceding publications. - */ - if (pub_no_list) + if (first) { - if (cftuple) - ReleaseSysCache(cftuple); - - bms_free(entry->columns); - entry->columns = NULL; - - break; + entry->columns = cols; + first = false; } - - ReleaseSysCache(cftuple); + else if (!bms_equal(entry->columns, cols)) + ereport(ERROR, + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot use different column lists for table \"%s.%s\" in different publications", + get_namespace_name(RelationGetNamespace(relation)), + RelationGetRelationName(relation))); } /* loop all subscribed publications */ + + RelationClose(relation); } /* diff --git a/src/test/subscription/t/031_column_list.pl b/src/test/subscription/t/031_column_list.pl index bdcf3e4..3dfe7d1 100644 --- a/src/test/subscription/t/031_column_list.pl +++ b/src/test/subscription/t/031_column_list.pl @@ -21,6 +21,8 @@ $node_subscriber->start; my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +my $offset = 0; + sub wait_for_subscription_sync { my ($node) = @_; @@ -334,12 +336,12 @@ is($result, qq(1|abc 2|xyz), 'update on column tab2.c is not replicated'); -# TEST: add a table to two publications with different column lists, and +# TEST: add a table to two publications with same column lists, and # create a single subscription replicating both publications $node_publisher->safe_psql('postgres', qq( CREATE TABLE tab5 (a int PRIMARY KEY, b int, c int, d int); CREATE PUBLICATION pub2 FOR TABLE tab5 (a, b); - CREATE PUBLICATION pub3 FOR TABLE tab5 (a, d); + CREATE PUBLICATION pub3 FOR TABLE tab5 (a, b); -- insert a couple initial records INSERT INTO tab5 VALUES (1, 11, 111, 1111); @@ -358,8 +360,7 @@ wait_for_subscription_sync($node_subscriber); $node_publisher->wait_for_catchup('sub1'); -# insert data and make sure all the columns (union of the columns lists) -# get fully replicated +# insert data and make sure all the columns get fully replicated $node_publisher->safe_psql('postgres', qq( INSERT INTO tab5 VALUES (3, 33, 333, 3333); INSERT INTO tab5 VALUES (4, 44, 444, 4444); @@ -368,39 +369,11 @@ $node_publisher->safe_psql('postgres', qq( $node_publisher->wait_for_catchup('sub1'); is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111 -2|22|2222 -3|33|3333 -4|44|4444), - 'overlapping publications with overlapping column lists'); - -# and finally, remove the column list for one of the publications, which -# means replicating all columns (removing the column list), but first add -# the missing column to the table on subscriber -$node_publisher->safe_psql('postgres', qq( - ALTER PUBLICATION pub3 SET TABLE tab5; -)); - -$node_subscriber->safe_psql('postgres', qq( - ALTER SUBSCRIPTION sub1 REFRESH PUBLICATION; - ALTER TABLE tab5 ADD COLUMN c INT; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql('postgres', qq( - INSERT INTO tab5 VALUES (5, 55, 555, 5555); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is($node_subscriber->safe_psql('postgres',"SELECT * FROM tab5 ORDER BY a"), - qq(1|11|1111| -2|22|2222| -3|33|3333| -4|44|4444| -5|55|5555|555), - 'overlapping publications with overlapping column lists'); + qq(1|11| +2|22| +3|33| +4|44|), + 'insert on column tab5.d is not replicated'); # TEST: create a table with a column list, then change the replica # identity by replacing a primary key (but use a different column in @@ -822,51 +795,18 @@ is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_part_d ORDER BY a, 3|), 'partitions with different replica identities not replicated correctly'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. So with column lists (a,b) and (a,c) we -# should replicate (a,b,c). - -$node_publisher->safe_psql('postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); - CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); - - -- initial data - INSERT INTO test_mix_1 VALUES (1, 2, 3); -)); - -$node_subscriber->safe_psql('postgres', qq( - CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); - ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2; -)); - -wait_for_subscription_sync($node_subscriber); - -$node_publisher->safe_psql('postgres', qq( - INSERT INTO test_mix_1 VALUES (4, 5, 6); -)); - -$node_publisher->wait_for_catchup('sub1'); - -is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_1 ORDER BY a"), - qq(1|2|3 -4|5|6), - 'a mix of publications should use a union of column list'); - - -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES, we should replicate all columns. +# TEST: With a table included in the publications is FOR ALL TABLES, it means +# replicate all columns. # drop unnecessary tables, so as not to interfere with the FOR ALL TABLES $node_publisher->safe_psql('postgres', qq( - DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_mix_1, + DROP TABLE tab1, tab2, tab3, tab4, tab5, tab6, tab7, test_part, test_part_a, test_part_b, test_part_c, test_part_d; )); $node_publisher->safe_psql('postgres', qq( CREATE TABLE test_mix_2 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b); + CREATE PUBLICATION pub_mix_3 FOR TABLE test_mix_2 (a, b, c); CREATE PUBLICATION pub_mix_4 FOR ALL TABLES; -- initial data @@ -890,12 +830,11 @@ $node_publisher->wait_for_catchup('sub1'); is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_2"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); + 'all columns should be replicated'); -# TEST: With a table included in multiple publications, we should use a -# union of the column lists. If any of the publications is FOR ALL -# TABLES IN SCHEMA, we should replicate all columns. +# TEST: With a table included in the publication which is FOR ALL TABLES IN +# SCHEMA, it means replicate all columns. $node_subscriber->safe_psql('postgres', qq( DROP SUBSCRIPTION sub1; @@ -905,7 +844,7 @@ $node_subscriber->safe_psql('postgres', qq( $node_publisher->safe_psql('postgres', qq( DROP TABLE test_mix_2; CREATE TABLE test_mix_3 (a int PRIMARY KEY, b int, c int); - CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b); + CREATE PUBLICATION pub_mix_5 FOR TABLE test_mix_3 (a, b, c); CREATE PUBLICATION pub_mix_6 FOR ALL TABLES IN SCHEMA public; -- initial data @@ -927,8 +866,7 @@ $node_publisher->wait_for_catchup('sub1'); is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_mix_3"), qq(1|2|3 4|5|6), - 'a mix of publications should use a union of column list'); - + 'all columns should be replicated'); # TEST: Check handling of publish_via_partition_root - if a partition is # published through partition root, we should only apply the column list @@ -979,7 +917,7 @@ is($node_subscriber->safe_psql('postgres',"SELECT * FROM test_root ORDER BY a, b # TEST: Multiple publications which publish schema of parent table and # partition. The partition is published through two publications, once # through a schema (so no column list) containing the parent, and then -# also directly (with a columns list). The expected outcome is there is +# also directly (all columns). The expected outcome is there is # no column list. $node_publisher->safe_psql('postgres', qq( @@ -990,7 +928,7 @@ $node_publisher->safe_psql('postgres', qq( CREATE TABLE t_1 PARTITION OF s1.t FOR VALUES FROM (1) TO (10); CREATE PUBLICATION pub1 FOR ALL TABLES IN SCHEMA s1; - CREATE PUBLICATION pub2 FOR TABLE t_1(b); + CREATE PUBLICATION pub2 FOR TABLE t_1(a, b, c); -- initial data INSERT INTO s1.t VALUES (1, 2, 3); @@ -1124,6 +1062,26 @@ is($node_subscriber->safe_psql('postgres',"SELECT * FROM t ORDER BY a, b, c"), 4||), 'publication containing both parent and child relation'); +# TEST: With a table included in multiple publications with different column +# lists, we should catch the error in the log + +$node_publisher->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + CREATE PUBLICATION pub_mix_1 FOR TABLE test_mix_1 (a, b); + CREATE PUBLICATION pub_mix_2 FOR TABLE test_mix_1 (a, c); +)); + +$node_subscriber->safe_psql('postgres', qq( + CREATE TABLE test_mix_1 (a int PRIMARY KEY, b int, c int); + ALTER SUBSCRIPTION sub1 SET PUBLICATION pub_mix_1, pub_mix_2 WITH (copy_data = false); +)); + +$node_publisher->wait_for_catchup('sub1'); + +my $logfile = slurp_file($node_subscriber->logfile, $offset); +ok( $logfile =~ + qr/cannot use different column lists for table "public.test_mix_1" in different publications/, + 'different column lists detected'); $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.7.2.windows.1