From 423c9e47c0f06716db103b854549a7d7b777bf90 Mon Sep 17 00:00:00 2001 From: vignesh Date: Wed, 16 Jun 2021 18:05:27 +0530 Subject: [PATCH v6 3/4] Added skip-table option to skip tables for all tables/for schema publication. Added skip-table option to skip tables for all tables/for schema publication. --- src/backend/catalog/Makefile | 2 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 11 +- src/backend/catalog/objectaddress.c | 98 +++++++++++++- src/backend/catalog/pg_publication.c | 102 ++++++++++----- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 129 +++++++++++++------ src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 1 + src/backend/parser/gram.y | 53 +++++++- src/backend/replication/pgoutput/pgoutput.c | 37 +++--- src/backend/utils/cache/syscache.c | 23 ++++ src/bin/pg_dump/common.c | 5 +- src/bin/pg_dump/pg_dump.c | 41 ++++-- src/bin/pg_dump/pg_dump.h | 3 +- src/bin/pg_dump/pg_dump_sort.c | 7 + src/bin/psql/describe.c | 25 +++- src/bin/psql/tab-complete.c | 12 +- src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 27 ++++ src/include/catalog/pg_publication_skiprel.h | 48 +++++++ src/include/commands/publicationcmds.h | 10 +- src/include/nodes/parsenodes.h | 3 + src/include/parser/kwlist.h | 1 + src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/sanity_check.out | 1 + 28 files changed, 521 insertions(+), 131 deletions(-) create mode 100644 src/include/catalog/pg_publication_skiprel.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index 30026a967b..1b51cc9545 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,7 +68,7 @@ CATALOG_HEADERS := \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \ - pg_subscription.h pg_subscription_rel.h + pg_publication_skiprel.h pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 59600fc98d..f5f01c500f 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3434,6 +3434,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_TABCONSTRAINT: @@ -3574,6 +3575,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_ROLE: case OBJECT_TRANSFORM: case OBJECT_TSPARSER: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index c0a9fb0c7e..de88f1b046 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -50,6 +50,7 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_schema.h" +#include "catalog/pg_publication_skiprel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -181,6 +182,7 @@ static const Oid object_classes[] = { PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */ + PublicationSkipRelationId, /* OCLASS_PUBLICATION_SKIP */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ TransformRelationId /* OCLASS_TRANSFORM */ }; @@ -1469,13 +1471,17 @@ doDeletion(const ObjectAddress *object, int flags) break; case OCLASS_PUBLICATION_REL: - RemovePublicationRelById(object->objectId); + RemovePublicationRelById(object->objectId, false); break; case OCLASS_PUBLICATION_SCHEMA: RemovePublicationSchemaById(object->objectId); break; + case OCLASS_PUBLICATION_SKIP: + RemovePublicationRelById(object->objectId, true); + break; + case OCLASS_CAST: case OCLASS_COLLATION: case OCLASS_CONVERSION: @@ -2872,6 +2878,9 @@ getObjectClass(const ObjectAddress *object) case PublicationSchemaRelationId: return OCLASS_PUBLICATION_SCHEMA; + case PublicationSkipRelationId: + return OCLASS_PUBLICATION_SKIP; + case SubscriptionRelationId: return OCLASS_SUBSCRIPTION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index cae0b3b7e6..6aebfbbe06 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -50,6 +50,7 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_schema.h" +#include "catalog/pg_publication_skiprel.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -835,6 +836,10 @@ static const struct object_type_map { "publication schema", OBJECT_PUBLICATION_SCHEMA }, + /* OCLASS_PUBLICATION_SKIPREL */ + { + "publication skiprelation", OBJECT_PUBLICATION_SKIP + }, /* OCLASS_SUBSCRIPTION */ { "subscription", OBJECT_SUBSCRIPTION @@ -880,7 +885,8 @@ static ObjectAddress get_object_address_usermapping(List *object, bool missing_ok); static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, - bool missing_ok); + bool missing_ok, + bool is_skip); static ObjectAddress get_object_address_publication_schema(List *object, bool missing_ok); @@ -1125,12 +1131,17 @@ get_object_address(ObjectType objtype, Node *object, case OBJECT_PUBLICATION_REL: address = get_object_address_publication_rel(castNode(List, object), &relation, - missing_ok); + missing_ok, false); break; case OBJECT_PUBLICATION_SCHEMA: address = get_object_address_publication_schema(castNode(List, object), missing_ok); break; + case OBJECT_PUBLICATION_SKIP: + address = get_object_address_publication_rel(castNode(List, object), + &relation, + missing_ok, true); + break; case OBJECT_DEFACL: address = get_object_address_defacl(castNode(List, object), missing_ok); @@ -1900,16 +1911,19 @@ get_object_address_usermapping(List *object, bool missing_ok) * publication name. */ static ObjectAddress -get_object_address_publication_rel(List *object, - Relation *relp, bool missing_ok) +get_object_address_publication_rel(List *object, Relation *relp, + bool missing_ok, bool is_skip) { ObjectAddress address; Relation relation; List *relname; char *pubname; Publication *pub; + int relid = GET_PUBLICATION_REL_ID(is_skip); + int syscacheid = GET_PUBLICATION_MAP(is_skip); + int col_oid = GET_PUBLICATION_COL_OBJECTID(is_skip); - ObjectAddressSet(address, PublicationRelRelationId, InvalidOid); + ObjectAddressSet(address, relid, InvalidOid); relname = linitial(object); relation = relation_openrv_extended(makeRangeVarFromNameList(relname), @@ -1930,15 +1944,17 @@ get_object_address_publication_rel(List *object, /* Find the publication relation mapping in syscache. */ address.objectId = - GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, + GetSysCacheOid2(syscacheid, col_oid, ObjectIdGetDatum(RelationGetRelid(relation)), ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId)) { if (!missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("publication relation \"%s\" in publication \"%s\" does not exist", + errmsg("publication %s \"%s\" in publication \"%s\" does not exist", + GET_PUBLICATION_REL_STR(is_skip), RelationGetRelationName(relation), pubname))); relation_close(relation, AccessShareLock); return address; @@ -2266,6 +2282,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_USER_MAPPING: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_DEFACL: case OBJECT_TRANSFORM: if (list_length(args) != 1) @@ -2361,6 +2378,9 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_SCHEMA: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; + case OBJECT_PUBLICATION_SKIP: + objnode = (Node *) list_make2(name, linitial(args)); + break; case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -4003,6 +4023,37 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_SKIP: + { + HeapTuple tup; + char *pubname; + Form_pg_publication_skip psform; + StringInfoData rel; + + tup = SearchSysCache1(PUBLICATIONSKIP, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication table %u", + object->objectId); + break; + } + + psform = (Form_pg_publication_skip) GETSTRUCT(tup); + pubname = get_publication_name(psform->pspubid, false); + + initStringInfo(&rel); + getRelationDescription(&rel, psform->psrelid, false); + + /* translator: first %s is, e.g., "table %s" */ + appendStringInfo(&buffer, _("publication of %s in publication %s"), + rel.data, pubname); + pfree(rel.data); + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname = get_subscription_name(object->objectId, @@ -4581,6 +4632,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication schema"); break; + case OCLASS_PUBLICATION_SKIP: + appendStringInfoString(&buffer, "publication skiprelation"); + break; + case OCLASS_SUBSCRIPTION: appendStringInfoString(&buffer, "subscription"); break; @@ -5858,6 +5913,35 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_SKIP: + { + HeapTuple tup; + char *pubname; + Form_pg_publication_skip psform; + + tup = SearchSysCache1(PUBLICATIONSKIP, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication table %u", + object->objectId); + break; + } + + psform = (Form_pg_publication_skip) GETSTRUCT(tup); + pubname = get_publication_name(psform->pspubid, false); + + getRelationIdentity(&buffer, psform->psrelid, objname, false); + appendStringInfo(&buffer, " in publication %s", pubname); + + if (objargs) + *objargs = list_make1(pubname); + + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 41243790d6..05df5b43b0 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -32,6 +32,7 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_schema.h" +#include "catalog/pg_publication_skiprel.h" #include "catalog/pg_type.h" #include "commands/publicationcmds.h" #include "funcapi.h" @@ -145,26 +146,29 @@ pg_relation_is_publishable(PG_FUNCTION_ARGS) */ ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, - bool if_not_exists) + bool if_not_exists, bool is_skip) { Relation rel; HeapTuple tup; - Datum values[Natts_pg_publication_rel]; - bool nulls[Natts_pg_publication_rel]; + int ncolumns = GET_PUBLICATION_REL_COL_COUNT(is_skip); + Datum *values; + bool *nulls; Oid relid = RelationGetRelid(targetrel); Oid prrelid; + int colid; Publication *pub = GetPublication(pubid); ObjectAddress myself, referenced; - rel = table_open(PublicationRelRelationId, RowExclusiveLock); + rel = table_open(GET_PUBLICATION_REL_ID(is_skip), RowExclusiveLock); /* * Check for duplicates. Note that this does not really prevent * duplicates, it's here just to provide nicer error message in common * case. The real protection is the unique key on the catalog. */ - if (SearchSysCacheExists2(PUBLICATIONRELMAP, ObjectIdGetDatum(relid), + if (SearchSysCacheExists2(GET_PUBLICATION_MAP(is_skip), + ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid))) { table_close(rel, RowExclusiveLock); @@ -174,23 +178,24 @@ publication_add_relation(Oid pubid, Relation targetrel, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("relation \"%s\" is already member of publication \"%s\"", + errmsg("%s \"%s\" is already member of publication \"%s\"", + GET_PUBLICATION_REL_STR(is_skip), RelationGetRelationName(targetrel), pub->name))); } check_publication_add_relation(targetrel); - /* Form a tuple. */ - memset(values, 0, sizeof(values)); - memset(nulls, false, sizeof(nulls)); + values = (Datum *) palloc0(ncolumns * sizeof(Datum)); + nulls = (bool *) palloc0(ncolumns * sizeof(bool)); - prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, - Anum_pg_publication_rel_oid); - values[Anum_pg_publication_rel_oid - 1] = ObjectIdGetDatum(prrelid); - values[Anum_pg_publication_rel_prpubid - 1] = - ObjectIdGetDatum(pubid); - values[Anum_pg_publication_rel_prrelid - 1] = - ObjectIdGetDatum(relid); + colid = GET_PUBLICATION_COL_OBJECTID(is_skip); + prrelid = GetNewOidWithIndex(rel, PublicationRelObjectIndexId, colid); + + values[colid - 1] = ObjectIdGetDatum(prrelid); + colid = GET_PUBLICATION_COL_PUBID(is_skip); + values[colid - 1] = ObjectIdGetDatum(pubid); + colid = GET_PUBLICATION_COL_RELID(is_skip); + values[colid - 1] = ObjectIdGetDatum(relid); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -198,7 +203,7 @@ publication_add_relation(Oid pubid, Relation targetrel, CatalogTupleInsert(rel, tup); heap_freetuple(tup); - ObjectAddressSet(myself, PublicationRelRelationId, prrelid); + ObjectAddressSet(myself, GET_PUBLICATION_REL_ID(is_skip), prrelid); /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); @@ -318,7 +323,7 @@ GetRelationPublications(Oid relid) * should use GetAllTablesPublicationRelations(). */ List * -GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) +GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt, bool is_skip) { List *result; Relation pubrelsrel; @@ -327,28 +332,38 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) HeapTuple tup; /* Find all publications associated with the relation. */ - pubrelsrel = table_open(PublicationRelRelationId, AccessShareLock); + pubrelsrel = table_open(GET_PUBLICATION_REL_ID(is_skip), AccessShareLock); ScanKeyInit(&scankey, - Anum_pg_publication_rel_prpubid, + GET_PUBLICATION_COL_PUBID(is_skip), BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(pubid)); - scan = systable_beginscan(pubrelsrel, PublicationRelPrrelidPrpubidIndexId, + scan = systable_beginscan(pubrelsrel, + GET_PUBLICATION_MULCOL_INDEXID(is_skip), true, NULL, 1, &scankey); result = NIL; while (HeapTupleIsValid(tup = systable_getnext(scan))) { - Form_pg_publication_rel pubrel; - - pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + Oid relid; + if (is_skip) + { + Form_pg_publication_skip pubskip; + pubskip = (Form_pg_publication_skip) GETSTRUCT(tup); + relid = pubskip->psrelid; + } + else + { + Form_pg_publication_rel pubrel; + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + relid = pubrel->prrelid; + } - if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && + if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE && pub_partopt != PUBLICATION_PART_ROOT) { - List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock, - NULL); + List *all_parts = find_all_inheritors(relid, NoLock, NULL); if (pub_partopt == PUBLICATION_PART_ALL) result = list_concat(result, all_parts); @@ -368,7 +383,7 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) Assert(false); } else - result = lappend_oid(result, pubrel->prrelid); + result = lappend_oid(result, relid); } systable_endscan(scan); @@ -462,7 +477,8 @@ GetAllTablesPublications(void) * in the schema specified. */ List * -GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid) +GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid, + List *pubskiptablelist) { Relation classRel; ScanKeyData key[2]; @@ -492,7 +508,8 @@ GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !(relForm->relispartition && pubviaroot)) + !(relForm->relispartition && pubviaroot) && + (!pubskiptablelist || !list_member_oid(pubskiptablelist, relid))) result = lappend_oid(result, relid); } @@ -513,7 +530,8 @@ GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid) Oid relid = relForm->oid; if (is_publishable_class(relid, relForm) && - !relForm->relispartition) + !relForm->relispartition && + (!pubskiptablelist || !list_member_oid(pubskiptablelist, relid))) result = lappend_oid(result, relid); } @@ -532,6 +550,11 @@ GetAllSchemasPublicationRelations(Publication *publication) { List *result = NIL; List *pubschemalist = GetPublicationSchemas(publication->oid); + List *pubskiptablelist = GetPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF, + true); ListCell *cell; foreach(cell, pubschemalist) @@ -540,7 +563,8 @@ GetAllSchemasPublicationRelations(Publication *publication) List *schemaRels = NIL; schemaRels = GetAllTablesPublicationRelations(publication->pubviaroot, - schemaOid); + schemaOid, + pubskiptablelist); result = list_concat(result, schemaRels); } @@ -675,13 +699,23 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * need those. */ if (publication->pubtype == PUBTYPE_ALLTABLES) + { + List *pubskiptablelist; + pubskiptablelist = GetPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF, + true); tables = GetAllTablesPublicationRelations(publication->pubviaroot, - InvalidOid); + InvalidOid, + pubskiptablelist); + } else if (publication->pubtype == PUBTYPE_TABLE) tables = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : - PUBLICATION_PART_LEAF); + PUBLICATION_PART_LEAF, + false); else if (publication->pubtype == PUBTYPE_SCHEMA) tables = GetAllSchemasPublicationRelations(publication); funcctx->user_fctx = (void *) tables; diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index e7c27459d8..0af1738f7a 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -662,6 +662,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: case OCLASS_PUBLICATION_SCHEMA: + case OCLASS_PUBLICATION_SKIP: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: /* ignore object types that don't have schema-qualified names */ diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 34cf049632..b6e4b3c3e4 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -975,6 +975,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_ROUTINE: case OBJECT_RULE: case OBJECT_SCHEMA: @@ -1053,6 +1054,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: case OCLASS_PUBLICATION_SCHEMA: + case OCLASS_PUBLICATION_SKIP: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: return true; @@ -2134,6 +2136,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: @@ -2217,6 +2220,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 5fe4b1ad6f..a4a1846f62 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -29,6 +29,7 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_schema.h" +#include "catalog/pg_publication_skiprel.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -53,8 +54,9 @@ static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, - AlterPublicationStmt *stmt); -static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); + AlterPublicationStmt *stmt, bool is_skip); +static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok, + bool is_skip); static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); @@ -307,7 +309,18 @@ CreatePublication(CreatePublicationStmt *stmt) Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); - PublicationAddTables(puboid, rels, true, NULL); + PublicationAddTables(puboid, rels, true, NULL, false); + CloseTableList(rels); + } + + if (stmt->skiptables) + { + List *rels; + + Assert(list_length(stmt->skiptables) > 0); + + rels = OpenTableList(stmt->skiptables); + PublicationAddTables(puboid, rels, true, NULL, true); CloseTableList(rels); } @@ -423,7 +436,8 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, * trees, not just those explicitly mentioned in the publication. */ List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + PUBLICATION_PART_ALL, + false); /* * We don't want to send too many individual messages, at some point @@ -456,44 +470,58 @@ AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, */ static void AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, - HeapTuple tup) + HeapTuple tup, bool is_skip) { List *rels = NIL; Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); Oid pubid = pubform->oid; + List *tables = is_skip ? stmt->skiptables : stmt->tables; - /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->pubtype == PUBTYPE_ALLTABLES) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR ALL TABLES", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + if (is_skip) + { + if (pubform->pubtype == PUBTYPE_TABLE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR SCHEMA", + NameStr(pubform->pubname)), + errdetail("Skip_Tables cannot be added to or dropped from FOR TABLE publications."))); + } + else + { + /* Check that user is allowed to manipulate the publication tables. */ + if (pubform->pubtype == PUBTYPE_ALLTABLES) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); - if (pubform->pubtype == PUBTYPE_SCHEMA) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("publication \"%s\" is defined as FOR SCHEMA", - NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications."))); + if (pubform->pubtype == PUBTYPE_SCHEMA) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR SCHEMA", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications."))); + } - Assert(list_length(stmt->tables) > 0); + Assert(list_length(tables) > 0); - rels = OpenTableList(stmt->tables); + rels = OpenTableList(tables); if (stmt->tableAction == DEFELEM_ADD) { - PublicationAddTables(pubid, rels, false, stmt); - if (pubform->pubtype == PUBTYPE_EMPTY) + PublicationAddTables(pubid, rels, false, stmt, is_skip); + if (!is_skip && pubform->pubtype == PUBTYPE_EMPTY) UpdatePublicationTypeTupleValue(rel, tup, Anum_pg_publication_pubtype, PUBTYPE_TABLE); } else if (stmt->tableAction == DEFELEM_DROP) - PublicationDropTables(pubid, rels, false); + PublicationDropTables(pubid, rels, false, is_skip); else /* DEFELEM_SET */ { List *oldrelids = GetPublicationRelations(pubid, - PUBLICATION_PART_ROOT); + PUBLICATION_PART_ROOT, + is_skip); List *delrels = NIL; ListCell *oldlc; @@ -525,13 +553,13 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, } /* And drop them. */ - PublicationDropTables(pubid, delrels, true); + PublicationDropTables(pubid, delrels, true, is_skip); /* * Don't bother calculating the difference for adding, we'll catch and * skip existing ones when doing catalog update. */ - PublicationAddTables(pubid, rels, true, stmt); + PublicationAddTables(pubid, rels, true, stmt, is_skip); CloseTableList(delrels); } @@ -653,8 +681,10 @@ AlterPublication(AlterPublicationStmt *stmt) AlterPublicationOptions(stmt, rel, tup); else if (stmt->schemas) AlterPublicationSchemas(stmt, rel, tup, pubform); + else if (stmt->skiptables) + AlterPublicationTables(stmt, rel, tup, true); else - AlterPublicationTables(stmt, rel, tup); + AlterPublicationTables(stmt, rel, tup, false); /* Cleanup. */ heap_freetuple(tup); @@ -665,24 +695,36 @@ AlterPublication(AlterPublicationStmt *stmt) * Remove relation from publication by mapping OID. */ void -RemovePublicationRelById(Oid proid) +RemovePublicationRelById(Oid proid, bool is_skip) { Relation rel; HeapTuple tup; - Form_pg_publication_rel pubrel; + Oid relid; - rel = table_open(PublicationRelRelationId, RowExclusiveLock); + rel = table_open(GET_PUBLICATION_REL_ID(is_skip), RowExclusiveLock); - tup = SearchSysCache1(PUBLICATIONREL, ObjectIdGetDatum(proid)); + tup = SearchSysCache1(GET_PUBLICATION_SYSCACHE_REL(is_skip), + ObjectIdGetDatum(proid)); if (!HeapTupleIsValid(tup)) elog(ERROR, "cache lookup failed for publication table %u", proid); - pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + if (is_skip) + { + Form_pg_publication_skip pubrel; + pubrel = (Form_pg_publication_skip) GETSTRUCT(tup); + relid = pubrel->psrelid; + } + else + { + Form_pg_publication_rel pubrel; + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + relid = pubrel->prrelid; + } /* Invalidate relcache so that publication info is rebuilt. */ - CacheInvalidateRelcacheByRelid(pubrel->prrelid); + CacheInvalidateRelcacheByRelid(relid); CatalogTupleDelete(rel, &tup->t_self); @@ -821,11 +863,14 @@ CloseTableList(List *rels) */ static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, - AlterPublicationStmt *stmt) + AlterPublicationStmt *stmt, bool is_skip) { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables || !stmt->schemas); + if (is_skip) + Assert(!stmt || !stmt->tables); + else + Assert(!stmt || !stmt->for_all_tables || !stmt->schemas); foreach(lc, rels) { @@ -837,13 +882,13 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, aclcheck_error(ACLCHECK_NOT_OWNER, get_relkind_objtype(rel->rd_rel->relkind), RelationGetRelationName(rel)); - obj = publication_add_relation(pubid, rel, if_not_exists); + obj = publication_add_relation(pubid, rel, if_not_exists, is_skip); if (stmt) { EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, (Node *) stmt); - InvokeObjectPostCreateHook(PublicationRelRelationId, + InvokeObjectPostCreateHook(is_skip ? PublicationSkipRelationId : PublicationRelRelationId, obj.objectId, 0); } } @@ -886,7 +931,7 @@ PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, * Remove listed tables from the publication. */ static void -PublicationDropTables(Oid pubid, List *rels, bool missing_ok) +PublicationDropTables(Oid pubid, List *rels, bool missing_ok, bool is_skip) { ObjectAddress obj; ListCell *lc; @@ -897,7 +942,8 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) Relation rel = (Relation) lfirst(lc); Oid relid = RelationGetRelid(rel); - prid = GetSysCacheOid2(PUBLICATIONRELMAP, Anum_pg_publication_rel_oid, + prid = GetSysCacheOid2(GET_PUBLICATION_MAP(is_skip), + GET_PUBLICATION_COL_OBJECTID(is_skip), ObjectIdGetDatum(relid), ObjectIdGetDatum(pubid)); if (!OidIsValid(prid)) @@ -907,11 +953,12 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("relation \"%s\" is not part of the publication", + errmsg("%s \"%s\" is not part of the publication", + GET_PUBLICATION_REL_STR(is_skip), RelationGetRelationName(rel)))); } - ObjectAddressSet(obj, PublicationRelRelationId, prid); + ObjectAddressSet(obj, GET_PUBLICATION_REL_ID(is_skip), prid); performDeletion(&obj, DROP_CASCADE, 0); } } diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index b108b641c5..f32b4e52c9 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -81,6 +81,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION_REL: case OBJECT_PUBLICATION_SCHEMA: + case OBJECT_PUBLICATION_SKIP: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: case OBJECT_TABCONSTRAINT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index d57cc0bcba..61d18da4b2 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12141,6 +12141,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: case OCLASS_PUBLICATION_SCHEMA: + case OCLASS_PUBLICATION_SKIP: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 81e508b7a5..b852de6da8 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -396,6 +396,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type import_qualification %type vacuum_relation %type opt_select_limit select_limit limit_clause +%type opt_skip_table_list %type parse_toplevel stmtmulti routine_body_stmt_list OptTableElementList TableElementList OptInherit definition @@ -707,7 +708,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); SAVEPOINT SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW - SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P + SIMILAR SIMPLE SKIP SKIP_TABLE SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P START STATEMENT STATISTICS STDIN STDOUT STORAGE STORED STRICT_P STRIP_P SUBSCRIPTION SUBSTRING SUPPORT SYMMETRIC SYSID SYSTEM_P @@ -9587,11 +9588,11 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec * * CREATE PUBLICATION name [WITH options] * - * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * CREATE PUBLICATION FOR ALL TABLES SKIP_TABLE [WITH options] * * CREATE PUBLICATION FOR TABLE [WITH options] * - * CREATE PUBLICATION FOR SCHEMA [WITH options] + * CREATE PUBLICATION FOR SCHEMA SKIP_TABLE [WITH options] *****************************************************************************/ CreatePublicationStmt: @@ -9602,11 +9603,12 @@ CreatePublicationStmt: n->options = $4; $$ = (Node *)n; } - | CREATE PUBLICATION name FOR ALL TABLES opt_definition + | CREATE PUBLICATION name FOR ALL TABLES opt_skip_table_list opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $7; + n->options = $8; + n->skiptables = (List *)$7; n->for_all_tables = true; $$ = (Node *)n; } @@ -9618,12 +9620,13 @@ CreatePublicationStmt: n->tables = (List *)$6; $$ = (Node *)n; } - | CREATE PUBLICATION name FOR SCHEMA schema_list opt_definition + | CREATE PUBLICATION name FOR SCHEMA schema_list opt_skip_table_list opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $7; + n->options = $8; n->schemas = (List *)$6; + n->skiptables = (List *)$7; $$ = (Node *)n; } ; @@ -9648,6 +9651,10 @@ schema_list: SchemaSpec { $$ = lappend($1, $3); } ; +opt_skip_table_list: SKIP_TABLE relation_expr_list { $$ = $2; } + | /*EMPTY*/ { $$ = NIL; } + ; + /***************************************************************************** * * ALTER PUBLICATION name SET ( options ) @@ -9658,6 +9665,12 @@ schema_list: SchemaSpec * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SKIP_TABLE table [, table2] + * + * ALTER PUBLICATION name DROP SKIP_TABLE table [, table2] + * + * ALTER PUBLICATION name SET SKIP_TABLE table [, table2] + * * ALTER PUBLICATION name ADD SCHEMA schema [, schema2] * * ALTER PUBLICATION name DROP SCHEMA schema [, schema2] @@ -9697,6 +9710,30 @@ AlterPublicationStmt: n->tableAction = DEFELEM_DROP; $$ = (Node *)n; } + | ALTER PUBLICATION name ADD_P SKIP_TABLE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->skiptables = $6; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SKIP_TABLE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->skiptables = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SKIP_TABLE relation_expr_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->skiptables = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } | ALTER PUBLICATION name ADD_P SCHEMA schema_list { AlterPublicationStmt *n = makeNode(AlterPublicationStmt); @@ -15767,6 +15804,7 @@ unreserved_keyword: | SHOW | SIMPLE | SKIP + | SKIP_TABLE | SNAPSHOT | SQL_P | STABLE @@ -16355,6 +16393,7 @@ bare_label_keyword: | SIMILAR | SIMPLE | SKIP + | SKIP_TABLE | SMALLINT | SNAPSHOT | SOME diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 1057acd62e..6eb84a00d2 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1064,23 +1064,30 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Publication *pub = lfirst(lc); bool publish = false; - if (pub->pubtype == PUBTYPE_ALLTABLES) + if (pub->pubtype == PUBTYPE_ALLTABLES || + pub->pubtype == PUBTYPE_SCHEMA) { - publish = true; - if (pub->pubviaroot && am_partition) - publish_as_relid = llast_oid(get_partition_ancestors(relid)); - } - - if (pub->pubtype == PUBTYPE_SCHEMA) - { - Oid schemaId = get_rel_namespace(relid); - List *pubschemas = GetPublicationSchemas(pub->oid); - - if (list_member_oid(pubschemas, schemaId)) + List *skipRels = GetPublicationRelations(pub->oid, + pub->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF, + true); + + /* Check if publishing this relation should be skipped */ + if (!list_member_oid(skipRels, relid)) { - publish = true; - if (pub->pubviaroot && am_partition) - publish_as_relid = llast_oid(get_partition_ancestors(relid)); + Oid schemaId = (pub->pubtype == PUBTYPE_SCHEMA) ? + get_rel_namespace(relid) : InvalidOid; + List *pubschemas = (pub->pubtype == PUBTYPE_SCHEMA) ? + GetPublicationSchemas(pub->oid) : NIL; + + if (pub->pubtype == PUBTYPE_ALLTABLES || + list_member_oid(pubschemas, schemaId)) + { + publish = true; + if (pub->pubviaroot && am_partition) + publish_as_relid = llast_oid(get_partition_ancestors(relid)); + } } } diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index b2f8b8add8..10e6b1d783 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -52,6 +52,7 @@ #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_publication_schema.h" +#include "catalog/pg_publication_skiprel.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" #include "catalog/pg_rewrite.h" @@ -673,6 +674,28 @@ static const struct cachedesc cacheinfo[] = { }, 64 }, + {PublicationSkipRelationId, /* PUBLICATIONSKIP */ + PublicationSkipObjectIndexId, + 1, + { + Anum_pg_publication_skiprel_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationSkipRelationId, /* PUBLICATIONSKIPMAP */ + PublicationSkipPsrelidPspubidIndexId, + 2, + { + Anum_pg_publication_skiprel_psrelid, + Anum_pg_publication_skiprel_pspubid, + 0, + 0 + }, + 64 + }, {RangeRelationId, /* RANGEMULTIRANGE */ RangeMultirangeTypidIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 773f038b24..ef70b86f0b 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -255,7 +255,10 @@ getSchemaData(Archive *fout, int *numTablesPtr) sizeof(PublicationInfo)); pg_log_info("reading publication membership"); - getPublicationTables(fout, tblinfo, numTables); + getPublicationTables(fout, tblinfo, numTables, false); + + pg_log_info("reading publication skiptables"); + getPublicationTables(fout, tblinfo, numTables, true); pg_log_info("reading publciation schemas"); getPublicationSchemas(fout, nspinfo, numNamespaces); diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index dd049da209..110f893283 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -269,7 +269,9 @@ static void dumpBlob(Archive *fout, const BlobInfo *binfo); static int dumpBlobs(Archive *fout, const void *arg); static void dumpPolicy(Archive *fout, const PolicyInfo *polinfo); static void dumpPublication(Archive *fout, const PublicationInfo *pubinfo); -static void dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo); +static void dumpPublicationTable(Archive *fout, + const PublicationRelInfo *pubrinfo, + bool isskip); static void dumpSubscription(Archive *fout, const SubscriptionInfo *subinfo); static void dumpDatabase(Archive *AH); static void dumpDatabaseConfig(Archive *AH, PQExpBuffer outbuf, @@ -4257,7 +4259,8 @@ getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas) * get information about publication membership for dumpable tables. */ void -getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) +getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables, + bool isSkip) { PQExpBuffer query; PGresult *res; @@ -4274,20 +4277,26 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) if (dopt->no_publications || fout->remoteVersion < 100000) return; + if (isSkip && fout->remoteVersion < 140000) + return; + query = createPQExpBuffer(); /* Collect all publication membership info. */ - appendPQExpBufferStr(query, - "SELECT tableoid, oid, prpubid, prrelid " - "FROM pg_catalog.pg_publication_rel"); + if (isSkip) + appendPQExpBufferStr(query, "SELECT tableoid, oid, pspubid, psrelid " + "FROM pg_catalog.pg_publication_skiprel"); + else + appendPQExpBufferStr(query, "SELECT tableoid, oid, prpubid, prrelid " + "FROM pg_catalog.pg_publication_rel"); res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); ntups = PQntuples(res); i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); - i_prpubid = PQfnumber(res, "prpubid"); - i_prrelid = PQfnumber(res, "prrelid"); + i_prpubid = PQfnumber(res, isSkip ? "pspubid" : "prpubid"); + i_prrelid = PQfnumber(res, isSkip ? "psrelid" : "prrelid"); /* this allocation may be more than we need */ pubrinfo = pg_malloc(ntups * sizeof(PublicationRelInfo)); @@ -4319,7 +4328,7 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) continue; /* OK, make a DumpableObject for this relationship */ - pubrinfo[j].dobj.objType = DO_PUBLICATION_REL; + pubrinfo[j].dobj.objType = (isSkip) ? DO_PUBLICATION_SKIPREL : DO_PUBLICATION_REL; pubrinfo[j].dobj.catId.tableoid = atooid(PQgetvalue(res, i, i_tableoid)); pubrinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); @@ -4382,12 +4391,14 @@ dumpPublicationSchema(Archive *fout, PublicationSchemaInfo *pubrinfo) * dump the definition of the given publication table mapping */ static void -dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) +dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo, + bool isskip) { PublicationInfo *pubinfo = pubrinfo->publication; TableInfo *tbinfo = pubrinfo->pubtable; PQExpBuffer query; char *tag; + char *addtabopt = (isskip) ? "SKIP_TABLE" : "TABLE ONLY"; if (!(pubrinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) return; @@ -4396,8 +4407,8 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) query = createPQExpBuffer(); - appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD TABLE ONLY", - fmtId(pubinfo->dobj.name)); + appendPQExpBuffer(query, "ALTER PUBLICATION %s ADD %s", + fmtId(pubinfo->dobj.name), addtabopt); appendPQExpBuffer(query, " %s;\n", fmtQualifiedDumpable(tbinfo)); @@ -4412,7 +4423,7 @@ dumpPublicationTable(Archive *fout, const PublicationRelInfo *pubrinfo) ARCHIVE_OPTS(.tag = tag, .namespace = tbinfo->dobj.namespace->dobj.name, .owner = pubinfo->rolname, - .description = "PUBLICATION TABLE", + .description = (isskip) ? "PUBLICATION SKIP TABLE" : "PUBLICATION TABLE", .section = SECTION_POST_DATA, .createStmt = query->data)); @@ -10479,11 +10490,14 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj) dumpPublication(fout, (const PublicationInfo *) dobj); break; case DO_PUBLICATION_REL: - dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); + dumpPublicationTable(fout, (const PublicationRelInfo *) dobj, false); break; case DO_PUBLICATION_SCHEMA: dumpPublicationSchema(fout, (PublicationSchemaInfo *) dobj); break; + case DO_PUBLICATION_SKIPREL: + dumpPublicationTable(fout, (const PublicationRelInfo *) dobj, true); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; @@ -18681,6 +18695,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_PUBLICATION: case DO_PUBLICATION_REL: case DO_PUBLICATION_SCHEMA: + case DO_PUBLICATION_SKIPREL: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index a9db477f25..dd0873b93b 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -82,6 +82,7 @@ typedef enum DO_PUBLICATION, DO_PUBLICATION_REL, DO_PUBLICATION_SCHEMA, + DO_PUBLICATION_SKIPREL, DO_SUBSCRIPTION } DumpableObjectType; @@ -744,7 +745,7 @@ extern void getPolicies(Archive *fout, TableInfo tblinfo[], int numTables); extern PublicationInfo *getPublications(Archive *fout, int *numPublications); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], - int numTables); + int numTables, bool isSkip); extern void getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas); extern void getSubscriptions(Archive *fout); diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 13a6fcd660..298d3dcbc2 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -83,6 +83,7 @@ enum dbObjectTypePriorities PRIO_PUBLICATION, PRIO_PUBLICATION_REL, PRIO_PUBLICATION_SCHEMA, + PRIO_PUBLICATION_SKIPREL, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -137,6 +138,7 @@ static const int dbObjectTypePriority[] = PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ PRIO_PUBLICATION_SCHEMA, /* DO_PUBLICATION_SCHEMA */ + PRIO_PUBLICATION_SKIPREL, /* DO_PUBLICATION_SKIPREL */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1484,6 +1486,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION SCHEMA (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_SKIPREL: + snprintf(buf, bufsize, + "PUBLICATION SKPREL (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 73417cbfc4..2184d2e27f 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3154,7 +3154,8 @@ describeOneTableDetails(const char *schemaname, "SELECT p.pubname\n" "FROM pg_catalog.pg_publication p\n" " JOIN pg_catalog.pg_publication_schema ps ON p.oid = ps.pspubid AND p.pubtype = 's'\n" - " JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s'\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s' AND\n" + " '%s' NOT IN (SELECT psrelid FROM pg_publication_skiprel WHERE pspubid = ps.pspubid)\n" "UNION ALL\n" "SELECT pubname\n" "FROM pg_catalog.pg_publication p\n" @@ -3163,10 +3164,11 @@ describeOneTableDetails(const char *schemaname, "UNION ALL\n" "SELECT pubname\n" "FROM pg_catalog.pg_publication p\n" - "WHERE p.pubtype = 'a' \n" + "WHERE p.pubtype = 'a' AND\n" + " '%s' NOT IN (SELECT psrelid FROM pg_publication_skiprel WHERE pspubid = p.oid)\n" " AND pg_catalog.pg_relation_is_publishable('%s')\n" "ORDER BY 1;", - oid, oid, oid); + oid, oid, oid, oid, oid); } else { @@ -6345,6 +6347,7 @@ describePublications(const char *pattern) bool has_pubtruncate; bool has_pubviaroot; bool has_pubtype; + bool has_pubskiptable; PQExpBufferData title; printTableContent cont; @@ -6361,6 +6364,7 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); has_pubtype = (pset.sversion >= 140000); + has_pubskiptable = (pset.sversion >= 140000); initPQExpBuffer(&buf); @@ -6489,6 +6493,21 @@ describePublications(const char *pattern) goto error_return; } + if (has_pubskiptable) + { + printfPQExpBuffer(&buf, + "SELECT n.nspname, c.relname\n" + "FROM pg_catalog.pg_class c,\n" + " pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_skiprel ps\n" + "WHERE c.relnamespace = n.oid\n" + " AND c.oid = ps.psrelid\n" + " AND ps.pspubid = '%s'\n" + "ORDER BY 1,2", pubid); + if (!addFooterToPublicationDesc(&buf, "Skip Tables:", false, &cont)) + goto error_return; + } + printTable(&cont, pset.queryFout, false, pset.logfile); printTableCleanup(&cont); diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index ba549e106e..63a672dd1f 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1643,13 +1643,13 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); /* ALTER PUBLICATION ADD */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) - COMPLETE_WITH("SCHEMA", "TABLE"); + COMPLETE_WITH("SCHEMA", "SKIP_TABLE", "TABLE"); /* ALTER PUBLICATION DROP */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) - COMPLETE_WITH("SCHEMA", "TABLE"); + COMPLETE_WITH("SCHEMA", "SKIP_TABLE", "TABLE"); /* ALTER PUBLICATION SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "SCHEMA", "TABLE"); + COMPLETE_WITH("(", "SCHEMA", "SKIP_TABLE", "TABLE"); else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "SCHEMA")) COMPLETE_WITH_QUERY(Query_for_list_of_schemas " UNION SELECT 'CURRENT_SCHEMA'"); @@ -2642,6 +2642,12 @@ psql_completion(const char *text, int start, int end) COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR SCHEMA", "WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR")) COMPLETE_WITH("TABLE", "ALL TABLES", "SCHEMA"); + else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("FOR", "ALL", "TABLES")) + COMPLETE_WITH("SKIP_TABLE"); + else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("FOR", "SCHEMA")) + COMPLETE_WITH("SKIP_TABLE"); + else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("SKIP_TABLE")) + COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* Complete "CREATE PUBLICATION FOR TABLE , ..." */ else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 08ec4c79f1..c82fa2765d 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -132,6 +132,7 @@ typedef enum ObjectClass OCLASS_PUBLICATION, /* pg_publication */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ OCLASS_PUBLICATION_SCHEMA, /* pg_publication_schema */ + OCLASS_PUBLICATION_SKIP, /* pg_publication_skip */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ } ObjectClass; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f67f92f918..fff16bf76f 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -110,6 +110,33 @@ typedef enum PublicationPartOpt #define PUBTYPE_SCHEMA 's' /* schema publication */ #define PUBTYPE_EMPTY 'e' /* empty publication */ +#define GET_PUBLICATION_REL_COL_COUNT(is_skip) \ + (is_skip) ? Natts_pg_publication_skiprel : Natts_pg_publication_rel + +#define GET_PUBLICATION_REL_ID(is_skip) \ + (is_skip) ? PublicationSkipRelationId : PublicationRelRelationId + +#define GET_PUBLICATION_MAP(is_skip) \ + (is_skip) ? PUBLICATIONSKIPMAP : PUBLICATIONRELMAP + +#define GET_PUBLICATION_SYSCACHE_REL(is_skip) \ + (is_skip) ? PUBLICATIONSKIP : PUBLICATIONREL + +#define GET_PUBLICATION_COL_OBJECTID(is_skip) \ + (is_skip) ? Anum_pg_publication_skiprel_oid : Anum_pg_publication_rel_oid + +#define GET_PUBLICATION_COL_PUBID(is_skip) \ + (is_skip) ? Anum_pg_publication_skiprel_pspubid : Anum_pg_publication_rel_prpubid + +#define GET_PUBLICATION_COL_RELID(is_skip) \ + (is_skip) ? Anum_pg_publication_skiprel_psrelid : Anum_pg_publication_rel_prrelid + +#define GET_PUBLICATION_MULCOL_INDEXID(is_skip) \ + (is_skip) ? PublicationSkipPsrelidPspubidIndexId : PublicationRelPrrelidPrpubidIndexId + +#define GET_PUBLICATION_REL_STR(is_skip) \ + (is_skip) ? "skip relation": "relation" + /* * Return the publication type. */ diff --git a/src/include/catalog/pg_publication_skiprel.h b/src/include/catalog/pg_publication_skiprel.h new file mode 100644 index 0000000000..6bfc226c35 --- /dev/null +++ b/src/include/catalog/pg_publication_skiprel.h @@ -0,0 +1,48 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_skiprel.h + * definition of the system catalog for mappings between relations and + * publications (pg_publication_skiprel) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_skiprel.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_SKIPREL_H +#define PG_PUBLICATION_SKIPREL_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_skiprel_d.h" + +/* ---------------- + * pg_publication_skiprel definition. cpp turns this into + * typedef struct FormData_pg_publication_skiprel + * ---------------- + */ +CATALOG(pg_publication_skiprel,6122,PublicationSkipRelationId) +{ + Oid oid; /* oid */ + Oid pspubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid psrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +} FormData_pg_publication_skip; + +/* ---------------- + * Form_pg_publication_skip corresponds to a pointer to a tuple with + * the format of pg_publication_skip relation. + * ---------------- + */ +typedef FormData_pg_publication_skip *Form_pg_publication_skip; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_skip_oid_index, 6123, on pg_publication_skiprel using btree(oid oid_ops)); +#define PublicationSkipObjectIndexId 6123 +DECLARE_UNIQUE_INDEX(pg_publication_skip_psrelid_prpubid_index, 6124, on pg_publication_skiprel using btree(psrelid oid_ops, pspubid oid_ops)); +#define PublicationSkipPsrelidPspubidIndexId 6124 + +#endif /* PG_PUBLICATION_SKIP_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index 3c2a77d0b0..f3b803cf81 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -21,7 +21,7 @@ extern ObjectAddress CreatePublication(CreatePublicationStmt *stmt); extern void AlterPublication(AlterPublicationStmt *stmt); -extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationRelById(Oid proid, bool is_skip); extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); @@ -31,15 +31,17 @@ extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt, + bool is_skip); extern List *GetPublicationSchemas(Oid pubid); extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid); +extern List *GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid, + List *pubskiptablelist); extern List *GetAllSchemasPublicationRelations(Publication *publication); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, - bool if_not_exists); + bool if_not_exists, bool is_skip); extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 169bdce07c..d1a89bc807 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -1823,6 +1823,7 @@ typedef enum ObjectType OBJECT_PUBLICATION, OBJECT_PUBLICATION_REL, OBJECT_PUBLICATION_SCHEMA, + OBJECT_PUBLICATION_SKIP, OBJECT_ROLE, OBJECT_ROUTINE, OBJECT_RULE, @@ -3648,6 +3649,7 @@ typedef struct CreatePublicationStmt char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ List *tables; /* Optional list of tables to add */ + List *skiptables; /* Optional list of skip tables to add */ bool for_all_tables; /* Special publication for all tables in db */ List *schemas; /* Optional list of schemas */ } CreatePublicationStmt; @@ -3662,6 +3664,7 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ List *tables; /* List of tables to add/drop */ + List *skiptables; /* List of skip tables to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ List *schemas; /* Optional list of schemas */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index f836acf876..95b43396ea 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -385,6 +385,7 @@ PG_KEYWORD("show", SHOW, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("similar", SIMILAR, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("simple", SIMPLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("skip", SKIP, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("skip_table", SKIP_TABLE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("smallint", SMALLINT, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("snapshot", SNAPSHOT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("some", SOME, RESERVED_KEYWORD, BARE_LABEL) diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index 1ba295206a..48a284edca 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -81,6 +81,8 @@ enum SysCacheIdentifier PUBLICATIONRELMAP, PUBLICATIONSCHEMA, PUBLICATIONSCHEMAMAP, + PUBLICATIONSKIP, + PUBLICATIONSKIPMAP, RANGEMULTIRANGE, RANGETYPE, RELNAMENSP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index ddb421c394..d34b342248 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -262,6 +262,8 @@ NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} NOTICE: checking pg_publication_schema {pspubid} => pg_publication {oid} NOTICE: checking pg_publication_schema {psnspcid} => pg_class {oid} +NOTICE: checking pg_publication_skiprel {pspubid} => pg_publication {oid} +NOTICE: checking pg_publication_skiprel {psrelid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index fe5a038824..675d6503ab 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -141,6 +141,7 @@ pg_proc|t pg_publication|t pg_publication_rel|t pg_publication_schema|t +pg_publication_skiprel|t pg_range|t pg_replication_origin|t pg_rewrite|t -- 2.25.1