From 6b7c23e4194958c581d4476090442d44c37d702f Mon Sep 17 00:00:00 2001 From: Shlok Kyal Date: Tue, 17 Jun 2025 12:12:24 +0530 Subject: [PATCH v13 3/3] Skip publishing the columns specified in FOR TABLE EXCEPT A new "TABLE table_name EXCEPT (column_list)" clause for CREATE/ALTER PUBLICATION allows one or more columns to be excluded. THe publisher will not send the data of excluded columns to the subscriber. The new syntax allows specifying excluded column list when creating or altering a publication. For example: CREATE PUBLICATION pubname FOR TABLE tabname EXCEPT (exclude_column_list) or ALTER PUBLICATION pubname ADD TABLE tabname EXCEPT (exclude_column_list) A new column "prexcludeattrs" is added to table "pg_publication_rel", to maintain the column list that user wants to exclude from the publication. pg_dump is updated to identify and dump the excluded column list of the publication. The psql \d family of command can now display excluded column list. e.g. psql \dRp+ variant will now display associated "EXCEPT (column_list)" if ans. --- doc/src/sgml/catalogs.sgml | 13 ++ doc/src/sgml/logical-replication.sgml | 145 ++++++++++++------ doc/src/sgml/ref/alter_publication.sgml | 10 +- doc/src/sgml/ref/create_publication.sgml | 17 +- src/backend/catalog/pg_publication.c | 135 +++++++++++++++- src/backend/commands/publicationcmds.c | 73 ++++++++- src/backend/parser/gram.y | 60 ++++++++ src/backend/replication/pgoutput/pgoutput.c | 47 +++++- src/bin/pg_dump/pg_dump.c | 39 ++++- src/bin/pg_dump/pg_dump.h | 1 + src/bin/psql/describe.c | 104 +++++++++---- src/include/catalog/pg_publication.h | 9 +- src/include/catalog/pg_publication_rel.h | 1 + src/include/nodes/parsenodes.h | 1 + src/test/regress/expected/publication.out | 65 ++++++++ src/test/regress/sql/publication.sql | 45 ++++++ .../t/036_rep_changes_except_table.pl | 60 +++++++- 17 files changed, 735 insertions(+), 90 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 4e37c928b44..b9e13b33064 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -6589,6 +6589,19 @@ SCRAM-SHA-256$<iteration count>:&l A null value indicates that all columns are published. + + + + prexcludeattrs int2vector + (references pg_attribute.attnum) + + + This is an array of values that indicates which table columns are + excluded from the publication. For example, a value of + 1 3 would mean that the columns except the first and + the third columns are published. + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 3d0d29cf8b1..318b4c43cfc 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1340,11 +1340,14 @@ Publications: Column Lists - Each publication can optionally specify which columns of each table are - replicated to subscribers. The table on the subscriber side must have at - least all the columns that are published. If no column list is specified, - then all columns on the publisher are replicated. - See for details on the syntax. + Each publication can optionally either specify which columns of each table + are replicated to subscribers or specify which columns of each table are + excluded from replication to subscriber. The table on the subscriber side + must have at least all the columns that are published. If no column list and + no exclude column list are specified, then all columns on the publisher are + replicated. If a exclude column list is specified all the columns except the + specified columns are replicated. See + for details on the syntax. @@ -1359,56 +1362,65 @@ Publications: If no column list is specified, any columns added to the table later are automatically replicated. This means that having a column list which names all columns is not the same as having no column list at all. + If a exclude column list is specified, any columns added to the table later + are automatically replicated. - A column list can contain only simple column references. The order - of columns in the list is not preserved. + A column list or exclude column list can contain only simple column + references. The order of columns in the list is not preserved. Generated columns can also be specified in a column list. This allows generated columns to be published, regardless of the publication parameter - publish_generated_columns. See + publish_generated_columns. Generated columns cannot + be specified in a exclude column list. See for details. - Specifying a column list when the publication also publishes - FOR TABLES IN SCHEMA - is not supported. + Specifying a column list or a exclude column list when the publication also + publishes + FOR TABLES IN SCHEMA is not supported. For partitioned tables, the publication parameter - publish_via_partition_root - determines which column list is used. If publish_via_partition_root - is true, the root partitioned table's column list is - used. Otherwise, if publish_via_partition_root is - false (the default), each partition's column list is used. + + publish_via_partition_root determines which column + list or exclude column list is used. If + publish_via_partition_root is true, the + root partitioned table's column list or exclude column list is used. + Otherwise, if publish_via_partition_root is + false (the default), each partition's column list or + exclude column list is used. If a publication publishes UPDATE or DELETE operations, any column list must include the - table's replica identity columns (see - ). - If a publication publishes only INSERT operations, then - the column list may omit replica identity columns. + table's replica identity columns or any exclude column list must not include + the table's replica identity columns (see + ). If a publication + publishes only INSERT operations, then the column list may + omit replica identity columns or exlude column list may contain replica + identity columns. - Column lists have no effect for the TRUNCATE command. + Column lists or exclude column lists have no effect for the + TRUNCATE command. During initial data synchronization, only the published columns are copied. However, if the subscriber is from a release prior to 15, then all the columns in the table are copied during initial data synchronization, - ignoring any column lists. If the subscriber is from a release prior to 18, - then initial table synchronization won't copy generated columns even if they - are defined in the publisher. + ignoring any column lists or exclude column list. If the subscriber is from a + release prior to 18, then initial table synchronization won't copy generated + columns even if they are defined in the publisher. @@ -1416,21 +1428,23 @@ Publications: There's currently no support for subscriptions comprising several publications where the same table has been published with different - column lists. disallows + column lists or exclude column list. + disallows creating such subscriptions, but it is still possible to get into - that situation by adding or altering column lists on the publication - side after a subscription has been created. + that situation by adding or altering column lists or exclude column lists + on the publication side after a subscription has been created. - This means changing the column lists of tables on publications that are - already subscribed could lead to errors being thrown on the subscriber - side. + This means changing the column lists or exclude column list of tables on + publications that are already subscribed could lead to errors being thrown + on the subscriber side. If a subscription is affected by this problem, the only way to resume - replication is to adjust one of the column lists on the publication - side so that they all match; and then either recreate the subscription, - or use + replication is to adjust one of the column lists or exclude column lists on + the publication side so that they all match; and then either recreate the + subscription, or use + ALTER SUBSCRIPTION ... DROP PUBLICATION to remove one of the offending publications and add it again. @@ -1440,18 +1454,21 @@ Publications: Examples - Create a table t1 to be used in the following example. + Create tables t1, t2 to be used in the + following example. /* pub # */ CREATE TABLE t1(id int, a text, b text, c text, d text, e text, PRIMARY KEY(id)); +/* pub # */ CREATE TABLE t2(id int, a text, b text, c text, d text, e text, PRIMARY KEY(id)); Create a publication p1. A column list is defined for - table t1 to reduce the number of columns that will be - replicated. Notice that the order of column names in the column list does - not matter. + table t1 and a exclude column list is defined for table + t2 to reduce the number of columns that will be + replicated. Notice that the order of column names in the column list or + exclude column list does not matter. -/* pub # */ CREATE PUBLICATION p1 FOR TABLE t1 (id, b, a, d); +/* pub # */ CREATE PUBLICATION p1 FOR TABLE t1 (id, b, a, d), t2 EXCEPT (d, a); @@ -1459,12 +1476,13 @@ Publications: for each publication. /* pub # */ \dRp+ - Publication p1 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root -----------+------------+---------+---------+---------+-----------+---------- - postgres | f | t | t | t | t | f + Publication p1 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Generated columns | Via root +--------+------------+---------+---------+---------+-----------+-------------------+---------- + ubuntu | f | t | t | t | t | none | f Tables: "public.t1" (id, a, b, d) + "public.t2" EXCEPT (a, d) @@ -1485,23 +1503,41 @@ Indexes: "t1_pkey" PRIMARY KEY, btree (id) Publications: "p1" (id, a, b, d) + +/* pub # */ \d t2 + Table "public.t2" + Column | Type | Collation | Nullable | Default +--------+---------+-----------+----------+--------- + id | integer | | not null | + a | text | | | + b | text | | | + c | text | | | + d | text | | | + e | text | | | +Indexes: + "t2_pkey" PRIMARY KEY, btree (id) +Publications: + "p1" EXCEPT (a, d) - On the subscriber node, create a table t1 which now - only needs a subset of the columns that were on the publisher table - t1, and also create the subscription + On the subscriber node, create tables t1 and + t2 which now only needs a subset of the columns that + were on the publisher tables t1 and + t2, and also create the subscription s1 that subscribes to the publication p1. /* sub # */ CREATE TABLE t1(id int, b text, a text, d text, PRIMARY KEY(id)); +/* sub # */ CREATE TABLE t2(id int, b text, c text, e text, PRIMARY KEY(id)); /* sub # */ CREATE SUBSCRIPTION s1 /* sub - */ CONNECTION 'host=localhost dbname=test_pub application_name=s1' /* sub - */ PUBLICATION p1; - On the publisher node, insert some rows to table t1. + On the publisher node, insert some rows to tables t1 + and t2 /* pub # */ INSERT INTO t1 VALUES(1, 'a-1', 'b-1', 'c-1', 'd-1', 'e-1'); /* pub # */ INSERT INTO t1 VALUES(2, 'a-2', 'b-2', 'c-2', 'd-2', 'e-2'); @@ -1513,6 +1549,16 @@ Publications: 2 | a-2 | b-2 | c-2 | d-2 | e-2 3 | a-3 | b-3 | c-3 | d-3 | e-3 (3 rows) +/* pub # */ INSERT INTO t2 VALUES(1, 'a-1', 'b-1', 'c-1', 'd-1', 'e-1'); +/* pub # */ INSERT INTO t2 VALUES(2, 'a-2', 'b-2', 'c-2', 'd-2', 'e-2'); +/* pub # */ INSERT INTO t2 VALUES(3, 'a-3', 'b-3', 'c-3', 'd-3', 'e-3'); +/* pub # */ SELECT * FROM t2 ORDER BY id; + id | a | b | c | d | e +----+-----+-----+-----+-----+----- + 1 | a-1 | b-1 | c-1 | d-1 | e-1 + 2 | a-2 | b-2 | c-2 | d-2 | e-2 + 3 | a-3 | b-3 | c-3 | d-3 | e-3 +(3 rows) @@ -1526,6 +1572,13 @@ Publications: 2 | b-2 | a-2 | d-2 3 | b-3 | a-3 | d-3 (3 rows) +/* sub # */ SELECT * FROM t2 ORDER BY id; + id | b | c | e +----+-----+-----+----- + 1 | b-1 | c-1 | e-1 + 2 | b-2 | c-2 | e-2 + 3 | b-3 | c-3 | e-3 +(3 rows) diff --git a/doc/src/sgml/ref/alter_publication.sgml b/doc/src/sgml/ref/alter_publication.sgml index 37e2c84bc10..5700bf83100 100644 --- a/doc/src/sgml/ref/alter_publication.sgml +++ b/doc/src/sgml/ref/alter_publication.sgml @@ -32,7 +32,7 @@ ALTER PUBLICATION name RESET where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ [ EXCEPT ] ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] where exception_object is: @@ -260,6 +260,14 @@ ALTER PUBLICATION production_publication ADD ALL TABLES EXCEPT users, department production_publication: ALTER PUBLICATION production_publication ADD TABLE users, departments, TABLES IN SCHEMA production; + + + + Alter publication mypublication to add table + users except column + security_pin: + +ALTER PUBLICATION production_publication ADD TABLE users EXCEPT (security_pin); diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 7fd8872db5f..af46d6a7919 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -28,7 +28,7 @@ CREATE PUBLICATION name where publication_object is one of: - TABLE [ ONLY ] table_name [ * ] [ ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] + TABLE [ ONLY ] table_name [ * ] [ [ EXCEPT ] ( column_name [, ... ] ) ] [ WHERE ( expression ) ] [, ... ] TABLES IN SCHEMA { schema_name | CURRENT_SCHEMA } [, ... ] where exception_object is: @@ -103,6 +103,13 @@ CREATE PUBLICATION name lists. + + When a column list is specified with EXCEPT, the named columns are not + replicated. The excluded column list cannot contain generated columns. + Specifying a column list has no effect on TRUNCATE + commands. + + Only persistent base tables and partitioned tables can be part of a publication. Temporary tables, unlogged tables, foreign tables, @@ -474,6 +481,14 @@ CREATE PUBLICATION mypublication FOR ALL TABLES EXCEPT users, departments; CREATE PUBLICATION users_filtered FOR TABLE users (user_id, firstname); + + + Create a publication that publishes all changes for table users + except changes for column security_pin: + +CREATE PUBLICATION users_safe FOR TABLE users EXCEPT (security_pin); + + diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index ec580e3b050..8fd9ac84451 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -302,6 +302,53 @@ check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, return found; } +/* + * Returns true if the relation has exluded column list associated with the + * publication, false otherwise. + * + * If a exclude column list is found, the corresponding bitmap is returned + * through the cols parameter, if provided. The bitmap is constructed within the + * given memory context (mcxt). + */ + +bool +check_and_fetch_exclude_column_list(Publication *pub, Oid relid, MemoryContext mcxt, + Bitmapset **cols) +{ + HeapTuple cftuple; + bool found = false; + + if (pub->alltables) + return false; + + cftuple = SearchSysCache2(PUBLICATIONRELMAP, + ObjectIdGetDatum(relid), + ObjectIdGetDatum(pub->oid)); + if (HeapTupleIsValid(cftuple)) + { + Datum cfdatum; + bool isnull; + + /* Lookup the column list attribute. */ + cfdatum = SysCacheGetAttr(PUBLICATIONRELMAP, cftuple, + Anum_pg_publication_rel_prexcludeattrs, &isnull); + + /* Was a column list found? */ + if (!isnull) + { + /* Build the column list bitmap in the given memory context. */ + if (cols) + *cols = pub_collist_to_bitmapset(*cols, cfdatum, mcxt); + + found = true; + } + + ReleaseSysCache(cftuple); + } + + return found; +} + /* * Gets the relations based on the publication partition option for a specified * relation. @@ -449,6 +496,7 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, Oid relid = RelationGetRelid(targetrel); Oid pubreloid; Bitmapset *attnums; + Bitmapset *excludeattnums; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; @@ -481,6 +529,13 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, /* Validate and translate column names into a Bitmapset of attnums. */ attnums = pub_collist_validate(pri->relation, pri->columns); + /* + * Validate and translate excluded column names into a Bitmapset of + * attnums. + */ + excludeattnums = pub_exclude_collist_validate(pri->relation, + pri->exclude_columns); + /* Form a tuple. */ memset(values, 0, sizeof(values)); memset(nulls, false, sizeof(nulls)); @@ -507,6 +562,11 @@ publication_add_relation(Oid pubid, PublicationRelInfo *pri, else nulls[Anum_pg_publication_rel_prattrs - 1] = true; + if (pri->exclude_columns) + values[Anum_pg_publication_rel_prexcludeattrs - 1] = PointerGetDatum(attnumstoint2vector(excludeattnums)); + else + nulls[Anum_pg_publication_rel_prexcludeattrs - 1] = true; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -609,6 +669,58 @@ pub_collist_validate(Relation targetrel, List *columns) return set; } +/* + * pub_exclude_collist_validate + * Process and validate the 'excluded columns' list and ensure the columns + * are all valid to exclude from publication. Checks for and raises an + * ERROR for any unknown columns, system columns, duplicate columns, or + * generated columns. + * + * Looks up each column's attnum and returns a 0-based Bitmapset of the + * corresponding attnums. + */ +Bitmapset * +pub_exclude_collist_validate(Relation targetrel, List *exclude_columns) +{ + Bitmapset *set = NULL; + ListCell *lc; + TupleDesc tupdesc = RelationGetDescr(targetrel); + + foreach(lc, exclude_columns) + { + char *colname = strVal(lfirst(lc)); + AttrNumber attnum = get_attnum(RelationGetRelid(targetrel), colname); + + if (attnum == InvalidAttrNumber) + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + colname, RelationGetRelationName(targetrel))); + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use system column \"%s\" in publication except column list", + colname)); + + if (TupleDescAttr(tupdesc, attnum - 1)->attgenerated) + ereport(ERROR, + errcode(ERRCODE_INVALID_COLUMN_REFERENCE), + errmsg("cannot use generated column \"%s\" in publication except column list", + colname)); + + if (bms_is_member(attnum, set)) + ereport(ERROR, + errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("duplicate column \"%s\" in publication except column list", + colname)); + + set = bms_add_member(set, attnum); + } + + return set; +} + /* * Transform a column list (represented by an array Datum) to a bitmapset. * @@ -646,10 +758,12 @@ pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt) * Returns a bitmap representing the columns of the specified table. * * Generated columns are included if include_gencols_type is - * PUBLISH_GENCOLS_STORED. + * PUBLISH_GENCOLS_STORED. Columns that are in the excludecols are excluded from + * the column list. */ Bitmapset * -pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type) +pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type, + Bitmapset *excludecols) { Bitmapset *result = NULL; TupleDesc desc = RelationGetDescr(relation); @@ -672,6 +786,9 @@ pub_form_cols_map(Relation relation, PublishGencolsType include_gencols_type) continue; } + if (excludecols && bms_is_member(att->attnum, excludecols)) + continue; + result = bms_add_member(result, att->attnum); } @@ -1263,6 +1380,9 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) Oid schemaid = get_rel_namespace(relid); Datum values[NUM_PUBLICATION_TABLES_ELEM] = {0}; bool nulls[NUM_PUBLICATION_TABLES_ELEM] = {0}; + Datum excludeattnums_datum; + Bitmapset *excludeattnums = NULL; + bool isnull; /* * Form tuple with appropriate data. @@ -1296,6 +1416,13 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) values[3] = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, Anum_pg_publication_rel_prqual, &(nulls[3])); + + /* get the excluded column list */ + excludeattnums_datum = SysCacheGetAttr(PUBLICATIONRELMAP, pubtuple, + Anum_pg_publication_rel_prexcludeattrs, + &isnull); + if (!isnull) + excludeattnums = pub_collist_to_bitmapset(NULL, excludeattnums_datum, NULL); } else { @@ -1335,6 +1462,10 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) continue; } + /* Skip columns that are part of excluded column list */ + if (excludeattnums && bms_is_member(att->attnum, excludeattnums)) + continue; + attnums[nattnums++] = att->attnum; } diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 5194b2fb6e2..e850c2345ea 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -358,7 +358,8 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, * This function evaluates two conditions: * * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered - * by the column list. If any column is missing, *invalid_column_list is set + * by the column list and not part of excluded column list. If any column is + * missing or is part of exclude column list, *invalid_column_list is set * to true. * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY * are published, either by being explicitly named in the column list or, if @@ -378,6 +379,7 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, Oid publish_as_relid = RelationGetRelid(relation); Bitmapset *idattrs; Bitmapset *columns = NULL; + Bitmapset *exclude_columns = NULL; TupleDesc desc = RelationGetDescr(relation); Publication *pub; int x; @@ -405,11 +407,15 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, /* Fetch the column list */ pub = GetPublication(pubid); check_and_fetch_column_list(pub, publish_as_relid, NULL, &columns); + check_and_fetch_exclude_column_list(pub, publish_as_relid, NULL, &exclude_columns); if (relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL) { - /* With REPLICA IDENTITY FULL, no column list is allowed. */ - *invalid_column_list = (columns != NULL); + /* + * With REPLICA IDENTITY FULL, no column list and no excluded column + * list is allowed. + */ + *invalid_column_list = (columns != NULL || exclude_columns != NULL); /* * As we don't allow a column list with REPLICA IDENTITY FULL, the @@ -471,6 +477,16 @@ pub_contains_invalid_column(Oid pubid, Relation relation, List *ancestors, break; } + /* + * If REPLICA IDENTITY should not contain columns which are + * excluded from the publication. + */ + if (exclude_columns && bms_is_member(att->attnum, exclude_columns)) + { + *invalid_column_list = true; + break; + } + /* Skip validating the column list since it is not defined */ continue; } @@ -798,7 +814,7 @@ CheckPubRelationColumnList(char *pubname, List *tables, { PublicationRelInfo *pri = (PublicationRelInfo *) lfirst(lc); - if (pri->columns == NIL) + if (pri->columns == NIL && pri->exclude_columns == NIL) continue; /* @@ -1043,6 +1059,7 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, char *relname; bool has_rowfilter; bool has_collist; + bool has_exclude_collist; /* * Beware: we don't have lock on the relations, so cope silently @@ -1056,7 +1073,9 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, continue; has_rowfilter = !heap_attisnull(rftuple, Anum_pg_publication_rel_prqual, NULL); has_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prattrs, NULL); - if (!has_rowfilter && !has_collist) + has_exclude_collist = !heap_attisnull(rftuple, Anum_pg_publication_rel_prexcludeattrs, NULL); + + if (!has_rowfilter && !has_collist && !has_exclude_collist) { ReleaseSysCache(rftuple); continue; @@ -1083,6 +1102,14 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, stmt->pubname), errdetail("The publication contains a WHERE clause for partitioned table \"%s\", which is not allowed when \"%s\" is false.", relname, "publish_via_partition_root"))); + if (has_exclude_collist) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot set parameter \"%s\" to false for publication \"%s\"", + "publish_via_partition_root", + stmt->pubname), + errdetail("The publication contains a except column list for partitioned table \"%s\", which is not allowed when \"%s\" is false.", + relname, "publish_via_partition_root"))); Assert(has_collist); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -1443,6 +1470,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, HeapTuple rftuple; Node *oldrelwhereclause = NULL; Bitmapset *oldcolumns = NULL; + Bitmapset *oldexcludecolumns = NULL; /* look up the cache for the old relmap */ rftuple = SearchSysCache2(PUBLICATIONRELMAP, @@ -1458,6 +1486,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, bool isnull = true; Datum whereClauseDatum; Datum columnListDatum; + Datum excludeColumnListDatum; /* Load the WHERE clause for this table. */ whereClauseDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, @@ -1474,6 +1503,14 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (!isnull) oldcolumns = pub_collist_to_bitmapset(NULL, columnListDatum, NULL); + /* Transform the int2vector exclude column list to a bitmap. */ + excludeColumnListDatum = SysCacheGetAttr(PUBLICATIONRELMAP, rftuple, + Anum_pg_publication_rel_prexcludeattrs, + &isnull); + + if (!isnull) + oldexcludecolumns = pub_collist_to_bitmapset(NULL, excludeColumnListDatum, NULL); + ReleaseSysCache(rftuple); } @@ -1482,6 +1519,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, PublicationRelInfo *newpubrel; Oid newrelid; Bitmapset *newcolumns = NULL; + Bitmapset *newexcludecolumns = NULL; newpubrel = (PublicationRelInfo *) lfirst(newlc); newrelid = RelationGetRelid(newpubrel->relation); @@ -1495,6 +1533,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, newcolumns = pub_collist_validate(newpubrel->relation, newpubrel->columns); + newexcludecolumns = pub_collist_validate(newpubrel->relation, + newpubrel->exclude_columns); + /* * Check if any of the new set of relations matches with the * existing relations in the publication. Additionally, if the @@ -1505,7 +1546,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, if (newrelid == oldrelid) { if (equal(oldrelwhereclause, newpubrel->whereClause) && - bms_equal(oldcolumns, newcolumns)) + bms_equal(oldcolumns, newcolumns) && + bms_equal(oldexcludecolumns, newexcludecolumns)) { found = true; break; @@ -1522,6 +1564,7 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, oldrel = palloc(sizeof(PublicationRelInfo)); oldrel->whereClause = NULL; oldrel->columns = NIL; + oldrel->exclude_columns = NIL; oldrel->relation = table_open(oldrelid, ShareUpdateExclusiveLock); delrels = lappend(delrels, oldrel); @@ -1596,6 +1639,17 @@ AlterPublicationSchemas(AlterPublicationStmt *stmt, stmt->pubname), errdetail("Schemas cannot be added if any tables that specify a column list are already part of the publication.")); + /* + * Disallow adding schema if exclude column list is already part + * of the publication. See CheckPubRelationColumnList. + */ + if (!heap_attisnull(coltuple, Anum_pg_publication_rel_prexcludeattrs, NULL)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add schema to publication \"%s\"", + stmt->pubname), + errdetail("Schemas cannot be added if any tables that specify an except column list are already part of the publication.")); + ReleaseSysCache(coltuple); } @@ -1922,6 +1976,7 @@ OpenTableList(List *tables) pub_rel->whereClause = t->whereClause; pub_rel->columns = t->columns; pub_rel->except = t->except; + pub_rel->exclude_columns = t->exclude_columns; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -1995,6 +2050,7 @@ OpenTableList(List *tables) /* child inherits column list from parent */ pub_rel->columns = t->columns; pub_rel->except = t->except; + pub_rel->exclude_columns = t->exclude_columns; rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); @@ -2114,6 +2170,11 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) errcode(ERRCODE_SYNTAX_ERROR), errmsg("column list must not be specified in ALTER PUBLICATION ... DROP")); + if (pubrel->exclude_columns) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("except column list must not be specified in ALTER PUBLICATION ... DROP")); + prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index d7fe95a840f..63ee4bb7079 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -446,6 +446,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list drop_option_list pub_obj_list except_pub_obj_list + opt_exclude_column_list %type returning_clause %type returning_option @@ -4413,6 +4414,10 @@ opt_column_list: | /*EMPTY*/ { $$ = NIL; } ; +opt_exclude_column_list: + '(' columnList ')' { $$ = $2; } + ; + columnList: columnElem { $$ = list_make1($1); } | columnList ',' columnElem { $$ = lappend($1, $3); } @@ -10679,6 +10684,15 @@ PublicationObjSpec: $$->pubtable->whereClause = $4; $$->location = @1; } + | TABLE relation_expr EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $2; + $$->pubtable->exclude_columns = $4; + $$->pubtable->whereClause = $5; + } | TABLES IN_P SCHEMA ColId { $$ = makeNode(PublicationObjSpec); @@ -10719,6 +10733,33 @@ PublicationObjSpec: } $$->location = @1; } + | ColId EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + /* + * If either a row filter or exclude column list is + * specified, create a PublicationTable object. + */ + if ($3 || $4) + { + /* + * The OptWhereClause must be stored here but it is + * valid only for tables. For non-table objects, an + * error will be thrown later via + * preprocess_pubobj_list(). + */ + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->exclude_columns = $3; + $$->pubtable->whereClause = $4; + } + else + { + $$->name = $1; + } + $$->location = @1; + } | ColId indirection opt_column_list OptWhereClause { $$ = makeNode(PublicationObjSpec); @@ -10729,6 +10770,16 @@ PublicationObjSpec: $$->pubtable->whereClause = $4; $$->location = @1; } + | ColId indirection EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->exclude_columns = $4; + $$->pubtable->whereClause = $5; + $$->location = @1; + } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ | extended_relation_expr opt_column_list OptWhereClause { @@ -10739,6 +10790,15 @@ PublicationObjSpec: $$->pubtable->columns = $2; $$->pubtable->whereClause = $3; } + | extended_relation_expr EXCEPT opt_exclude_column_list OptWhereClause + { + $$ = makeNode(PublicationObjSpec); + $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = $1; + $$->pubtable->exclude_columns = $3; + $$->pubtable->whereClause = $4; + } | CURRENT_SCHEMA { $$ = makeNode(PublicationObjSpec); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 5512b4cba7f..f36c361abd5 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -185,6 +185,9 @@ typedef struct RelationSyncEntry * row filter expressions, column list, etc. */ MemoryContext entry_cxt; + + /* Indicate if no column is included in the publication */ + bool no_cols_published; } RelationSyncEntry; /* @@ -1099,6 +1102,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, bool first = true; Relation relation = RelationIdGetRelation(entry->publish_as_relid); bool found_pub_collist = false; + bool found_pub_exclude_collist = false; Bitmapset *relcols = NULL; pgoutput_ensure_entry_cxt(data, entry); @@ -1120,12 +1124,32 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { Publication *pub = lfirst(lc); Bitmapset *cols = NULL; + Bitmapset *excludecols = NULL; /* Retrieve the bitmap of columns for a column list publication. */ found_pub_collist |= check_and_fetch_column_list(pub, entry->publish_as_relid, entry->entry_cxt, &cols); + /* Retrieve the bitmap of exclude columns for the publication. */ + found_pub_exclude_collist |= check_and_fetch_exclude_column_list(pub, + entry->publish_as_relid, + entry->entry_cxt, &excludecols); + + /* + * cols and exclude cols can't appear together. Syntax for it is not + * supported. If column list is not present check for excluded column + * list and construct a corresponding column list. + */ + if (!cols && found_pub_exclude_collist) + { + MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); + + cols = pub_form_cols_map(relation, + entry->include_gencols_type, excludecols); + MemoryContextSwitchTo(oldcxt); + } + /* * For non-column list publications — e.g. TABLE (without a column * list), ALL TABLES, or ALL TABLES IN SCHEMA, we consider all columns @@ -1144,7 +1168,7 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, MemoryContext oldcxt = MemoryContextSwitchTo(entry->entry_cxt); relcols = pub_form_cols_map(relation, - entry->include_gencols_type); + entry->include_gencols_type, NULL); MemoryContextSwitchTo(oldcxt); } @@ -1155,8 +1179,11 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, { entry->columns = cols; first = false; + + if (excludecols && !cols) + entry->no_cols_published = true; } - else if (!bms_equal(entry->columns, cols)) + else if ((entry->no_cols_published && cols) || !bms_equal(entry->columns, cols)) ereport(ERROR, errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use different column lists for table \"%s.%s\" in different publications", @@ -1165,10 +1192,11 @@ pgoutput_column_list_init(PGOutputData *data, List *publications, } /* loop all subscribed publications */ /* - * If no column list publications exist, columns to be published will be - * computed later according to the 'publish_generated_columns' parameter. + * If no column list or excluded column list publications exist, columns + * to be published will be computed later according to the + * 'publish_generated_columns' parameter. */ - if (!found_pub_collist) + if (!found_pub_collist && !found_pub_exclude_collist) entry->columns = NULL; RelationClose(relation); @@ -1480,6 +1508,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry = get_rel_sync_entry(data, relation); + /* + * If all columns of a table is present in the exclude column list. Skip + * publishing the changes. + */ + if (relentry->no_cols_published) + return; + /* First check the table filter */ switch (action) { @@ -2057,6 +2092,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->publish_as_relid = InvalidOid; entry->columns = NULL; entry->attrmap = NULL; + entry->no_cols_published = false; } /* Validate the entry */ @@ -2106,6 +2142,7 @@ get_rel_sync_entry(PGOutputData *data, Relation relation) entry->pubactions.pubupdate = false; entry->pubactions.pubdelete = false; entry->pubactions.pubtruncate = false; + entry->no_cols_published = false; /* * Tuple slots cleanups. (Will be rebuilt later if needed). diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6cc55afd498..e918c8b43b7 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4706,6 +4706,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) int i_prrelqual; int i_prattrs; int i_prexcept; + int i_prexcludeattrs; int i, j, ntups; @@ -4723,7 +4724,15 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) /* FIXME: 180000 should be changed to 190000 later for PG19. */ if (fout->remoteVersion >= 180000) - appendPQExpBufferStr(query, " prexcept,\n"); + appendPQExpBufferStr(query, " prexcept, " + "(CASE\n" + " WHEN pr.prexcludeattrs IS NOT NULL THEN\n" + " (SELECT array_agg(attname)\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prexcludeattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prexcludeattrs[s])\n" + " ELSE NULL END) prexcludeattrs, \n"); else appendPQExpBufferStr(query, " false AS prexcept,\n"); @@ -4755,6 +4764,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) i_prrelqual = PQfnumber(res, "prrelqual"); i_prattrs = PQfnumber(res, "prattrs"); i_prexcept = PQfnumber(res, "prexcept"); + i_prexcludeattrs = PQfnumber(res, "prexcludeattrs"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4822,6 +4832,30 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) else pubrinfo[j].pubrattrs = NULL; + if (!PQgetisnull(res, i, i_prexcludeattrs)) + { + char **attnames; + int nattnames; + PQExpBuffer excludeattribs; + + if (!parsePGArray(PQgetvalue(res, i, i_prexcludeattrs), + &attnames, &nattnames)) + pg_fatal("could not parse %s array", "prattrs"); + excludeattribs = createPQExpBuffer(); + for (int k = 0; k < nattnames; k++) + { + if (k > 0) + appendPQExpBufferStr(excludeattribs, ", "); + + appendPQExpBufferStr(excludeattribs, fmtId(attnames[k])); + } + pubrinfo[j].pubrexcludeattrs = excludeattribs->data; + free(excludeattribs); /* but not excludeattribs->data */ + free(attnames); + } + else + pubrinfo[j].pubrexcludeattrs = NULL; + /* Decide whether we want to dump it */ selectDumpablePublicationObject(&(pubrinfo[j].dobj), fout); @@ -4907,6 +4941,9 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) if (pubrinfo->pubrattrs) appendPQExpBuffer(query, " (%s)", pubrinfo->pubrattrs); + if (pubrinfo->pubrexcludeattrs) + appendPQExpBuffer(query, " EXCEPT (%s)", pubrinfo->pubrexcludeattrs); + if (pubrinfo->pubrelqual) { /* diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 096f29346d8..e01c2d1afbd 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -681,6 +681,7 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; char *pubrelqual; char *pubrattrs; + char *pubrexcludeattrs; } PublicationRelInfo; /* diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 10b5f7f29cb..75b6c20157b 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3019,12 +3019,14 @@ describeOneTableDetails(const char *schemaname, /* print any publications */ if (pset.sversion >= 100000) { - if (pset.sversion >= 150000) + /* FIXME: 180000 should be changed to 190000 later for PG19. */ + if (pset.sversion >= 180000) { printfPQExpBuffer(&buf, "SELECT pubname\n" " , NULL\n" " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" @@ -3038,37 +3040,64 @@ describeOneTableDetails(const char *schemaname, " pg_catalog.pg_attribute\n" " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" " ELSE NULL END) " + " , (CASE WHEN pr.prexcludeattrs IS NOT NULL THEN\n" + " (SELECT string_agg(attname, ', ')\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prexcludeattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prexcludeattrs[s])\n" + " ELSE NULL END) " "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" - "WHERE pr.prrelid = '%s'\n", - oid, oid, oid); - - /* FIXME: 180000 should be changed to 190000 later for PG19. */ - if (pset.sversion >= 180000) - appendPQExpBufferStr(&buf, " AND NOT pr.prexcept\n"); - - appendPQExpBuffer(&buf, + "WHERE pr.prrelid = '%s'\n" + "AND NOT pr.prexcept\n" + "UNION\n" + "SELECT pubname\n" + " , NULL\n" + " , NULL\n" + " , NULL\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "AND NOT EXISTS (\n" + " SELECT 1\n" + " FROM pg_catalog.pg_publication_rel pr\n" + " JOIN pg_catalog.pg_class pc\n" + " ON pr.prrelid = pc.oid\n" + " WHERE pr.prrelid = '%s' AND pr.prpubid = p.oid)\n" + "ORDER BY 1;", + oid, oid, oid, oid, oid); + } + else if (pset.sversion >= 150000) + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + " , NULL\n" + " , NULL\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_namespace pn ON p.oid = pn.pnpubid\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = pn.pnnspid\n" + "WHERE pc.oid ='%s' and pg_catalog.pg_relation_is_publishable('%s')\n" + "UNION\n" + "SELECT pubname\n" + " , pg_get_expr(pr.prqual, c.oid)\n" + " , (CASE WHEN pr.prattrs IS NOT NULL THEN\n" + " (SELECT string_agg(attname, ', ')\n" + " FROM pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = pr.prrelid AND attnum = prattrs[s])\n" + " ELSE NULL END) " + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + " JOIN pg_catalog.pg_class c ON c.oid = pr.prrelid\n" + "WHERE pr.prrelid = '%s'\n" "UNION\n" "SELECT pubname\n" - " , NULL\n" - " , NULL\n" + " , NULL\n" + " , NULL\n" "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n", - oid); - - /* FIXME: 180000 should be changed to 190000 later for PG19. */ - if (pset.sversion >= 180000) - appendPQExpBuffer(&buf, - " AND NOT EXISTS (\n" - " SELECT 1\n" - " FROM pg_catalog.pg_publication_rel pr\n" - " JOIN pg_catalog.pg_class pc\n" - " ON pr.prrelid = pc.oid\n" - " WHERE pr.prrelid = '%s' AND pr.prpubid = p.oid)\n", - oid); - - appendPQExpBufferStr(&buf, "ORDER BY 1;"); + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid, oid); } else { @@ -3109,6 +3138,11 @@ describeOneTableDetails(const char *schemaname, appendPQExpBuffer(&buf, " (%s)", PQgetvalue(result, i, 2)); + /* exclude column list (if any) */ + if (!PQgetisnull(result, i, 3)) + appendPQExpBuffer(&buf, " EXCEPT (%s)", + PQgetvalue(result, i, 3)); + /* row filter (if any) */ if (!PQgetisnull(result, i, 1)) appendPQExpBuffer(&buf, " WHERE %s", @@ -6525,6 +6559,9 @@ addFooterToPublicationDesc(PQExpBuffer buf, const char *footermsg, if (!PQgetisnull(res, i, 3)) appendPQExpBuffer(buf, " (%s)", PQgetvalue(res, i, 3)); + if (!PQgetisnull(res, i, 4)) + appendPQExpBuffer(buf, " EXCEPT (%s)", PQgetvalue(res, i, 4)); + if (!PQgetisnull(res, i, 2)) appendPQExpBuffer(buf, " WHERE %s", PQgetvalue(res, i, 2)); } @@ -6706,6 +6743,21 @@ describePublications(const char *pattern) else appendPQExpBufferStr(&buf, ", NULL, NULL"); + + /* FIXME: 180000 should be changed to 190000 later for PG19. */ + if (pset.sversion >= 180000) + appendPQExpBufferStr(&buf, + ", (CASE WHEN pr.prexcludeattrs IS NOT NULL THEN\n" + " pg_catalog.array_to_string(" + " ARRAY(SELECT attname\n" + " FROM\n" + " pg_catalog.generate_series(0, pg_catalog.array_upper(pr.prexcludeattrs::pg_catalog.int2[], 1)) s,\n" + " pg_catalog.pg_attribute\n" + " WHERE attrelid = c.oid AND attnum = prexcludeattrs[s]), ', ')\n" + " ELSE NULL END)"); + else + appendPQExpBufferStr(&buf, ", NULL"); + appendPQExpBuffer(&buf, "\nFROM pg_catalog.pg_class c,\n" " pg_catalog.pg_namespace n,\n" diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 33b771990bd..5344559c88e 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -140,6 +140,7 @@ typedef struct PublicationRelInfo Node *whereClause; List *columns; bool except; + List *exclude_columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); @@ -181,15 +182,21 @@ extern bool is_publishable_relation(Relation rel); extern bool is_schema_publication(Oid pubid); extern bool check_and_fetch_column_list(Publication *pub, Oid relid, MemoryContext mcxt, Bitmapset **cols); +extern bool check_and_fetch_exclude_column_list(Publication *pub, Oid relid, + MemoryContext mcxt, + Bitmapset **cols); extern ObjectAddress publication_add_relation(Oid pubid, PublicationRelInfo *pri, bool if_not_exists); extern Bitmapset *pub_collist_validate(Relation targetrel, List *columns); +extern Bitmapset *pub_exclude_collist_validate(Relation targetrel, + List *exclude_columns); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaid, bool if_not_exists); extern Bitmapset *pub_collist_to_bitmapset(Bitmapset *columns, Datum pubcols, MemoryContext mcxt); extern Bitmapset *pub_form_cols_map(Relation relation, - PublishGencolsType include_gencols_type); + PublishGencolsType include_gencols_type, + Bitmapset *excludecols); #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index e7d7f3ba85c..4c1b4ddbddc 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -36,6 +36,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) #ifdef CATALOG_VARLEN /* variable-length fields start here */ pg_node_tree prqual; /* qualifications */ int2vector prattrs; /* columns to replicate */ + int2vector prexcludeattrs; /* columns to exclude */ #endif } FormData_pg_publication_rel; diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index d901cb0ffa7..14861d180ab 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -4236,6 +4236,7 @@ typedef struct PublicationTable Node *whereClause; /* qualifications */ List *columns; /* List of columns in a publication table */ bool except; /* exclude the relation */ + List *exclude_columns; /* List of columns to be excluded */ } PublicationTable; /* diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 5d025328704..a274b3cff31 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -2125,6 +2125,71 @@ SET ROLE regress_publication_user; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; DROP TABLE pub_sch1.tbl2; +-- ====================================================== +-- Test EXCEPT columns for CREATE PUBLICATION +SET client_min_messages = 'ERROR'; +CREATE TABLE pub_test_except1 (a int NOT NULL, b int, c int NOT NULL, d int); +CREATE TABLE pub_sch1.pub_test_except2 (a int, b int, c int, d int); +CREATE TABLE pub_test_except3 (a int, gen1 int GENERATED ALWAYS AS (a * 2) STORED); +-- Verify that publication is created with EXCEPT +CREATE PUBLICATION testpub_except FOR TABLE pub_test_except1, pub_sch1.pub_test_except2 EXCEPT (b, c); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {a,b,c,d} | + testpub_except | pub_sch1 | pub_test_except2 | {a,d} | +(2 rows) + +-- Check for invalid cases +CREATE PUBLICATION testpub_except2 FOR TABLES IN SCHEMA pub_sch1, TABLE pub_test_except1 EXCEPT (b, c); +ERROR: cannot use column list for relation "public.pub_test_except1" in publication "testpub_except2" +DETAIL: Column lists cannot be specified in publications containing FOR TABLES IN SCHEMA elements. +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; +ERROR: syntax error at or near ";" +LINE 1: ...BLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; + ^ +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except3 EXCEPT (gen1); +ERROR: cannot use generated column "gen1" in publication except column list +-- Verify that publication can be altered with EXCEPT +ALTER PUBLICATION testpub_except SET TABLE pub_test_except1 EXCEPT (a, b), pub_sch1.pub_test_except2; +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {c,d} | + testpub_except | pub_sch1 | pub_test_except2 | {a,b,c,d} | +(2 rows) + +-- Verify ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1 EXCEPT (a, b); +ERROR: except column list must not be specified in ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1; +ALTER PUBLICATION testpub_except ADD TABLE pub_test_except1 EXCEPT (c, d); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + pubname | schemaname | tablename | attnames | rowfilter +----------------+------------+------------------+-----------+----------- + testpub_except | public | pub_test_except1 | {a,b} | + testpub_except | pub_sch1 | pub_test_except2 | {a,b,c,d} | +(2 rows) + +-- Verify excluded columns cannot be part of REPLICA IDENTITY +ALTER TABLE pub_test_except1 REPLICA IDENTITY FULL; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +ERROR: cannot update table "pub_test_except1" +DETAIL: Column list used by the publication does not cover the replica identity. +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a, c); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +ERROR: cannot update table "pub_test_except1" +DETAIL: Column list used by the publication does not cover the replica identity. +DROP INDEX pub_test_except1_a_idx; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +DROP INDEX pub_test_except1_a_idx; +DROP PUBLICATION testpub_except; +DROP TABLE pub_test_except1; +DROP TABLE pub_sch1.pub_test_except2; +DROP TABLE pub_test_except3; DROP SCHEMA pub_sch1; RESET client_min_messages; RESET SESSION AUTHORIZATION; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index af31a2214ca..6b23f215739 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -1318,6 +1318,51 @@ SET ROLE regress_publication_user; DROP PUBLICATION testpub_reset; DROP TABLE pub_sch1.tbl1; DROP TABLE pub_sch1.tbl2; + +-- ====================================================== +-- Test EXCEPT columns for CREATE PUBLICATION + +SET client_min_messages = 'ERROR'; +CREATE TABLE pub_test_except1 (a int NOT NULL, b int, c int NOT NULL, d int); +CREATE TABLE pub_sch1.pub_test_except2 (a int, b int, c int, d int); +CREATE TABLE pub_test_except3 (a int, gen1 int GENERATED ALWAYS AS (a * 2) STORED); + +-- Verify that publication is created with EXCEPT +CREATE PUBLICATION testpub_except FOR TABLE pub_test_except1, pub_sch1.pub_test_except2 EXCEPT (b, c); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Check for invalid cases +CREATE PUBLICATION testpub_except2 FOR TABLES IN SCHEMA pub_sch1, TABLE pub_test_except1 EXCEPT (b, c); +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except1 EXCEPT; +CREATE PUBLICATION testpub_except2 FOR TABLE pub_test_except3 EXCEPT (gen1); + +-- Verify that publication can be altered with EXCEPT +ALTER PUBLICATION testpub_except SET TABLE pub_test_except1 EXCEPT (a, b), pub_sch1.pub_test_except2; +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Verify ALTER PUBLICATION ... DROP +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1 EXCEPT (a, b); +ALTER PUBLICATION testpub_except DROP TABLE pub_test_except1; + +ALTER PUBLICATION testpub_except ADD TABLE pub_test_except1 EXCEPT (c, d); +SELECT * FROM pg_publication_tables WHERE pubname = 'testpub_except'; + +-- Verify excluded columns cannot be part of REPLICA IDENTITY +ALTER TABLE pub_test_except1 REPLICA IDENTITY FULL; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a, c); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; +DROP INDEX pub_test_except1_a_idx; +CREATE UNIQUE INDEX pub_test_except1_a_idx ON pub_test_except1 (a); +ALTER TABLE pub_test_except1 REPLICA IDENTITY USING INDEX pub_test_except1_a_idx; +UPDATE pub_test_except1 SET a = 3 WHERE a = 1; + +DROP INDEX pub_test_except1_a_idx; +DROP PUBLICATION testpub_except; +DROP TABLE pub_test_except1; +DROP TABLE pub_sch1.pub_test_except2; +DROP TABLE pub_test_except3; DROP SCHEMA pub_sch1; RESET client_min_messages; diff --git a/src/test/subscription/t/036_rep_changes_except_table.pl b/src/test/subscription/t/036_rep_changes_except_table.pl index 1d115283809..ec77f2e8d04 100644 --- a/src/test/subscription/t/036_rep_changes_except_table.pl +++ b/src/test/subscription/t/036_rep_changes_except_table.pl @@ -1,7 +1,7 @@ # Copyright (c) 2021-2022, PostgreSQL Global Development Group -# Logical replication tests for except table publications +# Logical replication tests for except table and except column publications use strict; use warnings; use PostgreSQL::Test::Cluster; @@ -77,6 +77,64 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), min(a), max(a) FROM public.tab1"); is($result, qq(0||), 'check rows on subscriber catchup'); +# Test for except column publications +# Initial setup +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab2 (a int, b int NOT NULL, c int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE sch1.tab2 (a int, b int, c int)"); +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab2 VALUES (1, 2, 3)"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_col FOR TABLE tab2 EXCEPT (a), sch1.tab2 EXCEPT (b, c)" +); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE tab2 (a int, b int NOT NULL, c int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE sch1.tab2 (a int, b int, c int)"); +$node_subscriber->safe_psql('postgres', + "CREATE SUBSCRIPTION tap_sub_col CONNECTION '$publisher_connstr' PUBLICATION tap_pub_col" +); +$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_col'); + +# Test initial sync +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is($result, qq(|2|3), + 'check that initial sync for except column publication'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.tab2"); +is($result, qq(1||), 'check that initial sync for except column publication'); +$node_publisher->safe_psql('postgres', "INSERT INTO tab2 VALUES (4, 5, 6)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO sch1.tab2 VALUES (4, 5, 6)"); +$node_publisher->wait_for_catchup('tap_sub_col'); + +# Test incremental changes +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is( $result, qq(|2|3 +|5|6), + 'check incremental insert for except column publication'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.tab2"); +is( $result, qq(1|| +4||), 'check incremental insert for except column publication'); + +# Test for update +$node_publisher->safe_psql('postgres', + "CREATE UNIQUE INDEX b_idx ON tab2 (b)"); +$node_publisher->safe_psql('postgres', + "ALTER TABLE tab2 REPLICA IDENTITY USING INDEX b_idx"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX b_idx ON tab2 (b)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE tab2 REPLICA IDENTITY USING INDEX b_idx"); +$node_publisher->safe_psql('postgres', + "UPDATE tab2 SET a = 3, b = 4, c = 5 WHERE a = 1"); +$node_publisher->wait_for_catchup('tap_sub_col'); +$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM tab2"); +is( $result, qq(|5|6 +|4|5), + 'check update for except column publication'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.34.1