From 1ce21afbab9a0f74ecd670c34b3d3cfbc21fc593 Mon Sep 17 00:00:00 2001 From: Vigneshwaran c Date: Mon, 26 Jul 2021 09:25:22 +0530 Subject: [PATCH v16 1/2] Added schema level support for publication. This patch adds schema-level support for publication. A new schema option allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. pg_publication maintains information about the publication. Previously, the "puballtables" bool column was used to indicate if the publication was the "FOR ALL TABLES" type (if true) or the "FOR TABLE" type (if false). With the introduction of the "FOR SCHEMA" publication type, it is not easy to determine the publication type. Therefore, a new column "pubtype" has been added to the pg_publication relation to indicate the publication type. There was the possibility of avoiding addition of this new column, but that would require checking puballtables of pg_publication and checking pg_publication_rel for table type publication and then checking pg_publication_schema for schema type publication. Instead, I preferred to add the "pubtype" column, which makes things easier, and also will help support new options in the future. A new system table "pg_publication_schema" has been added, to maintain the schemas that the user wants to publish through the publication. The schema/publication/publication_schema dependency was created to handle the corresponding renaming/removal of schemas to the publication/publication_schema when the schema is renamed/dropped. The Decoder identifies if the relation is part of the publication and replicates it to the subscriber. Changes were made to pg_dump to handle pubtype updation in the pg_publication table when the database is upgraded. Prototypes present in pg_publication.h have been moved to publicationcmds.h so that minimal data structures are exported to pg_dump and psql clients, as the rest of the information need not be exported. CATALOG_VERSION_NO needs to be updated while committing, as this feature involves a catalog change. --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 148 ++++++++++ src/backend/catalog/pg_publication.c | 228 ++++++++++++++- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 303 +++++++++++++++++++- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 2 + src/backend/parser/gram.y | 120 ++++++-- src/backend/replication/pgoutput/pgoutput.c | 22 +- src/backend/utils/cache/relcache.c | 5 + src/backend/utils/cache/syscache.c | 23 ++ src/bin/pg_dump/common.c | 3 + src/bin/pg_dump/pg_backup_archiver.c | 3 +- src/bin/pg_dump/pg_dump.c | 170 ++++++++++- src/bin/pg_dump/pg_dump.h | 17 ++ src/bin/pg_dump/pg_dump_sort.c | 7 + src/bin/psql/describe.c | 222 +++++++++++--- src/bin/psql/tab-complete.c | 22 +- src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 41 +-- src/include/catalog/pg_publication_schema.h | 47 +++ src/include/commands/publicationcmds.h | 22 ++ src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 20 ++ src/include/utils/rel.h | 1 + src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/publication.out | 100 +++---- src/test/regress/expected/sanity_check.out | 1 + src/tools/pgindent/typedefs.list | 5 + 33 files changed, 1391 insertions(+), 168 deletions(-) create mode 100644 src/include/catalog/pg_publication_schema.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e77361..b2ee87b105 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \ + pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154e..09d7f1a5ea 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3428,6 +3428,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_TABCONSTRAINT: @@ -3567,6 +3568,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_TRANSFORM: case OBJECT_TSPARSER: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 76b65e39c4..d974750473 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -50,6 +50,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -180,6 +181,7 @@ static const Oid object_classes[] = { PolicyRelationId, /* OCLASS_POLICY */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ + PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ TransformRelationId /* OCLASS_TRANSFORM */ }; @@ -1460,6 +1462,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePublicationRelById(object->objectId); break; + case OCLASS_PUBLICATION_SCHEMA: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_CAST: case OCLASS_COLLATION: case OCLASS_CONVERSION: @@ -2853,6 +2859,9 @@ getObjectClass(const ObjectAddress *object) case PublicationRelRelationId: return OCLASS_PUBLICATION_REL; + case PublicationSchemaRelationId: + return OCLASS_PUBLICATION_SCHEMA; + case SubscriptionRelationId: return OCLASS_SUBSCRIPTION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 9882e549c4..35d7d4fcff 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -49,6 +49,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -67,6 +68,7 @@ #include "commands/extension.h" #include "commands/policy.h" #include "commands/proclang.h" +#include "commands/publicationcmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" #include "foreign/foreign.h" @@ -829,6 +831,10 @@ static const struct object_type_map { "publication relation", OBJECT_PUBLICATION_REL }, + /* OCLASS_PUBLICATION_SCHEMA */ + { + "publication schema", OBJECT_PUBLICATION_SCHEMA + }, /* OCLASS_SUBSCRIPTION */ { "subscription", OBJECT_SUBSCRIPTION @@ -875,6 +881,9 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); + static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1118,6 +1127,10 @@ get_object_address(ObjectType objtype, Node *object, &relation, missing_ok); break; + case OBJECT_PUBLICATION_SCHEMA: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_DEFACL: address = get_object_address_defacl(castNode(List, object), missing_ok); @@ -1935,6 +1948,47 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication schema. The first element of + * the object parameter is the schema name, the second is the + * publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + char *pubname; + Publication *pub; + char *schemaname; + Oid schemaoid; + + ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid); + + /* Fetch publication name and schema oid from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaoid = get_namespace_oid(schemaname, false); + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2207,6 +2261,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_CAST: case OBJECT_USER_MAPPING: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_DEFACL: case OBJECT_TRANSFORM: if (list_length(args) != 1) @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -3902,6 +3958,46 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_SCHEMA: + { + HeapTuple tup; + char *pubname; + Form_pg_publication_schema psform; + char *nspname; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + break; + } + + psform = (Form_pg_publication_schema) GETSTRUCT(tup); + pubname = get_publication_name(psform->pspubid, false); + nspname = get_namespace_name(psform->psnspcid); + if (!nspname) + { + Oid psnspcid = psform->psnspcid; + + pfree(pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + psnspcid); + break; + } + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname = get_subscription_name(object->objectId, @@ -4476,6 +4572,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication relation"); break; + case OCLASS_PUBLICATION_SCHEMA: + appendStringInfoString(&buffer, "publication schema"); + break; + case OCLASS_SUBSCRIPTION: appendStringInfoString(&buffer, "subscription"); break; @@ -5711,6 +5811,54 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_SCHEMA: + { + HeapTuple tup; + char *pubname; + char *nspname; + Form_pg_publication_schema psform; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + break; + } + + psform = (Form_pg_publication_schema) GETSTRUCT(tup); + pubname = get_publication_name(psform->pspubid, false); + nspname = get_namespace_name(psform->psnspcid); + if (!nspname) + { + Oid psnspcid = psform->psnspcid; + + pfree(pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + psnspcid); + break; + } + + appendStringInfo(&buffer, "%s in publication %s", nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + ReleaseSysCache(tup); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2a2fe03c13..26a907bd17 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,9 +28,12 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" +#include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" #include "utils/array.h" @@ -214,6 +217,92 @@ publication_add_relation(Oid pubid, Relation targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_schema]; + bool nulls[Natts_pg_publication_schema]; + Oid psschid; + Publication *pub = GetPublication(pubid); + ObjectAddress myself, + referenced; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaoid), pub->name))); + } + + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaoid) || IsToastNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a system schema", + get_namespace_name(schemaoid)), + errdetail("System schemas cannot be added to publications."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a temporary schema", + get_namespace_name(schemaoid)), + errdetail("Temporary schemas cannot be added to publications."))); + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId, + Anum_pg_publication_schema_oid); + values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_schema_pspubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_schema_psnspcid - 1] = + ObjectIdGetDatum(schemaoid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationSchemaRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaoid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -304,6 +393,83 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used for normal publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationSchemaRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_schema_pspubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, PublicationSchemaPsnspcidPspubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_schema pubsch; + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->psnspcid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + + +/* + * Gets list of publication oids for publications marked as FOR SCHEMA. + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationSchemaRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_schema_psnspcid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(schemaid)); + + scan = systable_beginscan(pubschsrel, + PublicationSchemaPsnspcidPspubidIndexId, true, + NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_schema pubsch; + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + result = lappend_oid(result, pubsch->pspubid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -342,29 +508,37 @@ GetAllTablesPublications(void) } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). + * Gets the list of relations published. * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the - * root partitioned tables. + * root partitioned tables. If schemaOid is specified, get the relations present + * in the schema specified. */ List * -GetAllTablesPublicationRelations(bool pubviaroot) +GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid) { Relation classRel; - ScanKeyData key[1]; + ScanKeyData key[2]; TableScanDesc scan; HeapTuple tuple; List *result = NIL; + int keycount = 0; classRel = table_open(RelationRelationId, AccessShareLock); - ScanKeyInit(&key[0], + ScanKeyInit(&key[keycount++], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(RELKIND_RELATION)); - scan = table_beginscan_catalog(classRel, 1, key); + if (schemaOid != InvalidOid) + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaOid); + + scan = table_beginscan_catalog(classRel, keycount, key); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { @@ -380,12 +554,14 @@ GetAllTablesPublicationRelations(bool pubviaroot) if (pubviaroot) { - ScanKeyInit(&key[0], + ScanKeyData skey[1]; + + ScanKeyInit(&skey[0], Anum_pg_class_relkind, BTEqualStrategyNumber, F_CHAREQ, CharGetDatum(RELKIND_PARTITIONED_TABLE)); - scan = table_beginscan_catalog(classRel, 1, key); + scan = table_beginscan_catalog(classRel, 1, skey); while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) { @@ -404,6 +580,29 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets the list of all relations published by FOR SCHEMA publication(s). + */ +List * +GetAllSchemasPublicationRelations(bool pubviaroot, Oid puboid) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(puboid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaOid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetAllTablesPublicationRelations(pubviaroot, + schemaOid); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -431,6 +630,7 @@ GetPublication(Oid pubid) pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + pub->pubtype = pubform->pubtype; ReleaseSysCache(tup); @@ -530,13 +730,19 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * replicated using leaf partition identity and schema, so we only * need those. */ - if (publication->alltables) - tables = GetAllTablesPublicationRelations(publication->pubviaroot); - else + if (publication->pubtype == PUBTYPE_ALLTABLES) + tables = GetAllTablesPublicationRelations(publication->pubviaroot, + InvalidOid); + else if (publication->pubtype == PUBTYPE_TABLE) tables = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + else if (publication->pubtype == PUBTYPE_SCHEMA) + tables = GetAllSchemasPublicationRelations(publication->pubviaroot, + publication->oid); + else + tables = NIL; funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 29249498a9..e7c27459d8 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: /* ignore object types that don't have schema-qualified names */ diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 9c31c9e763..34cf049632 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROUTINE: case OBJECT_RULE: case OBJECT_SCHEMA: @@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: return true; @@ -2131,6 +2133,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: @@ -2213,6 +2216,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..9cf2c4e725 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,8 +25,10 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -53,6 +55,9 @@ static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -138,6 +143,50 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the SchemaSpec list into an Oid list. + */ +static List * +ConvertSchemaSpecListToOidList(List *schemas) +{ + List *schemaoidlist = NIL; + ListCell *cell; + + foreach(cell, schemas) + { + SchemaSpec *schema = (SchemaSpec *) lfirst(cell); + Oid schemaoid; + List *search_path; + char *nspname; + + if (schema->schematype == SCHEMASPEC_CURRENT_SCHEMA) + { + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + schemaoid = linitial_oid(search_path); + nspname = get_namespace_name(schemaoid); + if (nspname == NULL) /* recently-deleted namespace? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + } + else + schemaoid = get_namespace_oid(schema->schemaname, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + if (list_member_oid(schemaoidlist, schemaoid)) + continue; + + schemaoidlist = lappend_oid(schemaoidlist, schemaoid); + } + + return schemaoidlist; +} + /* * Create new publication. */ @@ -211,6 +260,15 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + if (stmt->schemas) + values[Anum_pg_publication_pubtype - 1] = PUBTYPE_SCHEMA; + else if (stmt->tables) + values[Anum_pg_publication_pubtype - 1] = PUBTYPE_TABLE; + else if (stmt->for_all_tables) + values[Anum_pg_publication_pubtype - 1] = PUBTYPE_ALLTABLES; + else + values[Anum_pg_publication_pubtype - 1] = PUBTYPE_EMPTY; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -224,6 +282,20 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); + if (stmt->schemas) + { + List *schemaoidlist = NIL; + Relation nspcrel; + + Assert(list_length(stmt->schemas) > 0); + + schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas); + + nspcrel = table_open(NamespaceRelationId, ShareUpdateExclusiveLock); + PublicationAddSchemas(puboid, schemaoidlist, true, NULL); + table_close(nspcrel, ShareUpdateExclusiveLock); + } + if (stmt->tables) { List *rels; @@ -250,6 +322,35 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) return myself; } +/* + * Update publication type in pg_publication relation. + */ +static void +UpdatePublicationTypeTupleValue(Relation rel, HeapTuple tup, int col, + char pubtype) +{ + bool nulls[Natts_pg_publication]; + bool replaces[Natts_pg_publication]; + Datum values[Natts_pg_publication]; + + + /* Everything ok, form a new tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + values[col - 1] = pubtype; + replaces[col - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + CommandCounterIncrement(); +} + /* * Change options of a publication. */ @@ -310,19 +411,25 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables) + if (pubform->pubtype == PUBTYPE_ALLTABLES) { CacheInvalidateRelcacheAll(); } else { + List *relids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + if (pubform->pubtype == PUBTYPE_TABLE) + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + else if (pubform->pubtype == PUBTYPE_SCHEMA) + relids = GetAllSchemasPublicationRelations(pubform->pubviaroot, + pubform->oid); /* * We don't want to send too many individual messages, at some point @@ -362,19 +469,31 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Oid pubid = pubform->oid; /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) + if (pubform->pubtype == PUBTYPE_ALLTABLES) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + if (pubform->pubtype == PUBTYPE_SCHEMA) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR SCHEMA", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added to or dropped from FOR SCHEMA publications."))); + Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); if (stmt->tableAction == DEFELEM_ADD) + { PublicationAddTables(pubid, rels, false, stmt); + if (pubform->pubtype == PUBTYPE_EMPTY) + UpdatePublicationTypeTupleValue(rel, tup, Anum_pg_publication_pubtype, + PUBTYPE_TABLE); + } else if (stmt->tableAction == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ @@ -421,16 +540,91 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, PublicationAddTables(pubid, rels, true, stmt); CloseTableList(delrels); + if (pubform->pubtype == PUBTYPE_EMPTY) + UpdatePublicationTypeTupleValue(rel, tup, + Anum_pg_publication_pubtype, + PUBTYPE_TABLE); } CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add/Remove/Set the schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel, + HeapTuple tup, Form_pg_publication pubform) +{ + List *schemaoidlist = NIL; + + /* Check that user is allowed to manipulate the publication tables */ + if (pubform->pubtype == PUBTYPE_ALLTABLES) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added to or dropped from FOR ALL TABLES publications."))); + + if (pubform->pubtype == PUBTYPE_TABLE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR TABLE", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added to or dropped from FOR TABLE publications."))); + + /* Convert the text list into oid list */ + schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas); + + if (stmt->tableAction == DEFELEM_ADD) + { + PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt); + if (pubform->pubtype == PUBTYPE_EMPTY) + UpdatePublicationTypeTupleValue(rel, tup, + Anum_pg_publication_pubtype, + PUBTYPE_SCHEMA); + } + else if (stmt->tableAction == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaoidlist, false); + else + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + ListCell *oldlc; + + /* Identify which schemas should be dropped */ + foreach(oldlc, oldschemaids) + { + Oid oldschemaid = lfirst_oid(oldlc); + + if (!list_member_oid(schemaoidlist, oldschemaid)) + delschemas = lappend_oid(delschemas, oldschemaid); + } + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt); + if (pubform->pubtype == PUBTYPE_EMPTY) + UpdatePublicationTypeTupleValue(rel, tup, + Anum_pg_publication_pubtype, + PUBTYPE_SCHEMA); + } + + return; +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -459,6 +653,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); + else if (stmt->schemas) + AlterPublicationSchemas(stmt, rel, tup, pubform); else AlterPublicationTables(stmt, rel, tup); @@ -497,6 +693,30 @@ RemovePublicationRelById(Oid proid) table_close(rel, RowExclusiveLock); } +/* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", + psoid); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + /* * Open relations specified by a RangeVar list. * The returned tables are locked in ShareUpdateExclusiveLock mode in order to @@ -607,7 +827,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || (!stmt->for_all_tables && !stmt->schemas)); foreach(lc, rels) { @@ -631,6 +851,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || (!stmt->for_all_tables && !stmt->tables)); + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + ObjectAddress obj; + + /* Must be owner of the schema or superuser */ + if (!pg_namespace_ownercheck(schemaoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA, + get_namespace_name(schemaoid)); + + obj = publication_add_schema(pubid, schemaoid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationSchemaRelationId, + obj.objectId, 0); + } + } +} + /* * Remove listed tables from the publication. */ @@ -665,6 +918,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, + Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("schema \"%s\" is not part of the publication", + get_namespace_name(schemaoid)))); + } + + ObjectAddressSet(obj, PublicationSchemaRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ @@ -696,7 +983,7 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) aclcheck_error(aclresult, OBJECT_DATABASE, get_database_name(MyDatabaseId)); - if (form->puballtables && !superuser_arg(newOwnerId)) + if (form->pubtype == PUBTYPE_ALLTABLES && !superuser_arg(newOwnerId)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to change owner of publication \"%s\"", diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index ddc019cb39..accaf2ed2e 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: case OBJECT_TABCONSTRAINT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index a16e749506..4373da14b1 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -52,6 +52,7 @@ #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" @@ -12227,6 +12228,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 10da5c5c51..6099cb14f3 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -169,6 +169,7 @@ static Node *makeNullAConst(int location); static Node *makeAConst(Value *v, int location); static Node *makeBoolAConst(bool state, int location); static RoleSpec *makeRoleSpec(RoleSpecType type, int location); +static SchemaSpec *makeSchemaSpec(SchemaSpecType type, int location); static void check_qualified_name(List *names, core_yyscan_t yyscanner); static List *check_func_name(List *names, core_yyscan_t yyscanner); static List *check_indirection(List *indirection, core_yyscan_t yyscanner); @@ -257,6 +258,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + SchemaSpec *schemaspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -426,14 +428,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list schema_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables %type opt_fdw_options fdw_options %type fdw_option @@ -554,6 +555,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by +%type SchemaSpec %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword @@ -9583,45 +9585,68 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR TABLE [WITH options] + * + * CREATE PUBLICATION FOR SCHEMA [WITH options] *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; $$ = (Node *)n; } - ; - -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE relation_expr_list + | CREATE PUBLICATION name FOR ALL TABLES opt_definition { - $$ = (Node *) $3; + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; } - | FOR ALL TABLES + | CREATE PUBLICATION name FOR TABLE relation_expr_list opt_definition { - $$ = (Node *) makeInteger(true); + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->tables = (List *)$6; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR SCHEMA schema_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->schemas = (List *)$6; + $$ = (Node *)n; } ; +/* Schema specifications */ +SchemaSpec: ColId + { + SchemaSpec *n; + n = makeSchemaSpec(SCHEMASPEC_CSTRING, @1); + n->schemaname = pstrdup($1); + $$ = n; + } + | CURRENT_SCHEMA + { + $$ = makeSchemaSpec(SCHEMASPEC_CURRENT_SCHEMA, @1); + } + ; + +schema_list: SchemaSpec + { $$ = list_make1($1); } + | schema_list ',' SchemaSpec + { $$ = lappend($1, $3); } + ; /***************************************************************************** * @@ -9633,6 +9658,11 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name DROP SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name SET SCHEMA schema [, schema2] *****************************************************************************/ AlterPublicationStmt: @@ -9667,6 +9697,30 @@ AlterPublicationStmt: n->tableAction = DEFELEM_DROP; $$ = (Node *)n; } + | ALTER PUBLICATION name ADD_P SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->tableAction = DEFELEM_DROP; + $$ = (Node *)n; + } ; /***************************************************************************** @@ -16613,6 +16667,20 @@ makeRoleSpec(RoleSpecType type, int location) return spec; } +/* + * makeSchemaSpec - Create a SchemaSpec with the given type and location + */ +static SchemaSpec * +makeSchemaSpec(SchemaSpecType type, int location) +{ + SchemaSpec *spec = makeNode(SchemaSpec); + + spec->schematype = type; + spec->location = location; + + return spec; +} + /* check_qualified_name --- check the result of qualified_name production * * It's easiest to let the grammar production for qualified_name allow diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4314af13a..283f9d2224 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,7 +15,9 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_schema.h" #include "commands/defrem.h" +#include "commands/publicationcmds.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" @@ -1059,6 +1061,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONSCHEMAMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1163,12 +1168,27 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Publication *pub = lfirst(lc); bool publish = false; - if (pub->alltables) + if (pub->pubtype == PUBTYPE_ALLTABLES) { publish = true; if (pub->pubviaroot && am_partition) publish_as_relid = llast_oid(get_partition_ancestors(relid)); } + else if (pub->pubtype == PUBTYPE_SCHEMA) + { + Oid schemaId = get_rel_namespace(relid); + Oid psid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, + Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaId), + ObjectIdGetDatum(pub->oid)); + + if (OidIsValid(psid)) + { + publish = true; + if (pub->pubviaroot && am_partition) + publish_as_relid = llast_oid(get_partition_ancestors(relid)); + } + } if (!publish) { diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..2ec805eefe 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -66,6 +66,7 @@ #include "catalog/schemapg.h" #include "catalog/storage.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -5447,6 +5448,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5478,6 +5480,9 @@ GetRelationPublicationActions(Relation relation) } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8..924b7bcad5 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -51,6 +51,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" #include "catalog/pg_rewrite.h" @@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = { }, 64 }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMA */ + PublicationSchemaObjectIndexId, + 1, + { + Anum_pg_publication_schema_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMAMAP */ + PublicationSchemaPsnspcidPspubidIndexId, + 2, + { + Anum_pg_publication_schema_psnspcid, + Anum_pg_publication_schema_pspubid, + 0, + 0 + }, + 64 + }, {RangeRelationId, /* RANGEMULTIRANGE */ RangeMultirangeTypidIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 1f24e79665..3e1f3cda09 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading publication membership"); getPublicationTables(fout, tblinfo, numTables); + pg_log_info("reading publication schemas"); + getPublicationSchemas(fout, nspinfo, numNamespaces); + pg_log_info("reading subscriptions"); getSubscriptions(fout); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index ee06dc6822..8d97b13154 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2788,7 +2788,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH) */ if (ropt->no_publications && (strcmp(te->desc, "PUBLICATION") == 0 || - strcmp(te->desc, "PUBLICATION TABLE") == 0)) + strcmp(te->desc, "PUBLICATION TABLE") == 0 || + strcmp(te->desc, "PUBLICATION SCHEMA") == 0)) return 0; /* If it's a security label, maybe ignore it */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 90ac445bcd..afa03452b5 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -51,6 +51,7 @@ #include "catalog/pg_largeobject_d.h" #include "catalog/pg_largeobject_metadata_d.h" #include "catalog/pg_proc_d.h" +#include "catalog/pg_publication.h" #include "catalog/pg_subscription.h" #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" @@ -1630,9 +1631,13 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout) if (nsinfo->nspowner == BOOTSTRAP_SUPERUSERID) nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION; nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; } else + { nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; + } /* * In any case, a namespace can be excluded by an exclusion switch @@ -3950,6 +3955,7 @@ getPublications(Archive *fout, int *numPublications) int i_pubdelete; int i_pubtruncate; int i_pubviaroot; + int i_pubtype; int i, ntups; @@ -3964,25 +3970,37 @@ getPublications(Archive *fout, int *numPublications) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 150000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, p.pubviaroot, p.pubtype " "FROM pg_publication p", username_subquery); + else if (fout->remoteVersion >= 130000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, p.pubviaroot, " + "NULL AS pubtype FROM pg_publication p", + username_subquery); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " - "FROM pg_publication p", + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, false AS pubviaroot, " + "NULL AS pubtype FROM pg_publication p", username_subquery); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, false AS pubtruncate, " + "false AS pubviaroot, NULL AS pubtype " "FROM pg_publication p", username_subquery); @@ -4000,6 +4018,7 @@ getPublications(Archive *fout, int *numPublications) i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); i_pubviaroot = PQfnumber(res, "pubviaroot"); + i_pubtype = PQfnumber(res, "pubtype"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -4024,6 +4043,7 @@ getPublications(Archive *fout, int *numPublications) (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); + pubinfo[i].pubtype = get_publication_type(PQgetvalue(res, i, i_pubtype)); if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -4066,7 +4086,7 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) appendPQExpBuffer(query, "CREATE PUBLICATION %s", qpubname); - if (pubinfo->puballtables) + if (pubinfo->puballtables || pubinfo->pubtype == PUBTYPE_ALLTABLES) appendPQExpBufferStr(query, " FOR ALL TABLES"); appendPQExpBufferStr(query, " WITH (publish = '"); @@ -4133,6 +4153,102 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) free(qpubname); } +/* + * getPublicationSchemas + * get information about publication membership for dumpable schemas + */ +void +getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], int numSchemas) +{ + PQExpBuffer query; + PGresult *res; + PublicationSchemaInfo *pubsinfo; + DumpOptions *dopt = fout->dopt; + int i_schemaoid; + int i_oid; + int i_pubname; + int i_pubid; + int i, + j, + ntups; + + if (dopt->no_publications || fout->remoteVersion < 150000) + return; + + query = createPQExpBuffer(); + + for (i = 0; i < numSchemas; i++) + { + NamespaceInfo *nsinfo = &nspinfo[i]; + PublicationInfo *pubinfo; + + /* + * Ignore publication membership of schemas whose definitions are not + * to be dumped. + */ + if (!(nsinfo->dobj.dump & DUMP_COMPONENT_PUBSCHEMA)) + continue; + + pg_log_info("reading publication membership for schema \"%s\"", + nsinfo->dobj.name); + + resetPQExpBuffer(query); + + /* Get the publication membership for the schema */ + appendPQExpBuffer(query, + "SELECT ps.psnspcid, ps.oid, p.pubname, p.oid AS pubid " + "FROM pg_publication_schema ps, pg_publication p " + "WHERE ps.psnspcid = '%u' " + "AND p.oid = ps.pspubid", + nsinfo->dobj.catId.oid); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + if (ntups == 0) + { + /* + * Schema is not a member of any publications. Clean up and + * process the next schema. + */ + PQclear(res); + continue; + } + + i_schemaoid = PQfnumber(res, "psnspcid"); + i_oid = PQfnumber(res, "oid"); + i_pubname = PQfnumber(res, "pubname"); + i_pubid = PQfnumber(res, "pubid"); + + pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); + + for (j = 0; j < ntups; j++) + { + Oid pspubid = atooid(PQgetvalue(res, j, i_pubid)); + + pubinfo = findPublicationByOid(pspubid); + if (pubinfo == NULL) + continue; + + pubsinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA; + pubsinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, j, i_schemaoid)); + pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, j, i_oid)); + AssignDumpId(&pubsinfo[j].dobj); + pubsinfo[j].dobj.namespace = nsinfo->dobj.namespace; + pubsinfo[j].dobj.name = nsinfo->dobj.name; + pubsinfo[j].pubname = pg_strdup(PQgetvalue(res, j, i_pubname)); + pubsinfo[j].pubschema = nsinfo; + pubsinfo[j].publication = pubinfo; + + /* Decide whether we want to dump it */ + selectDumpablePublicationTable(&(pubsinfo[j].dobj), fout); + } + PQclear(res); + } + destroyPQExpBuffer(query); +} + /* * getPublicationTables * get information about publication membership for dumpable tables. @@ -4220,6 +4336,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) destroyPQExpBuffer(query); } +/* + * dumpPublicationSchema + * dump the definition of the given publication schema mapping + */ +static void +dumpPublicationSchema(Archive *fout, const PublicationSchemaInfo *pubsinfo) +{ + NamespaceInfo *schemainfo = pubsinfo->pubschema; + PublicationInfo *pubinfo = pubsinfo->publication; + PQExpBuffer query; + char *tag; + + if (!(pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + return; + + tag = psprintf("%s %s", pubsinfo->pubname, schemainfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubsinfo->pubname)); + appendPQExpBuffer(query, "ADD SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + /* + * There is no point in creating drop query as the drop is done by schema + * drop. + */ + ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = schemainfo->dobj.name, + .owner = pubinfo->rolname, + .description = "PUBLICATION SCHEMA", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + free(tag); + destroyPQExpBuffer(query); +} + /* * dumpPublicationTable * dump the definition of the given publication table mapping @@ -10445,6 +10599,9 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj) case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; + case DO_PUBLICATION_SCHEMA: + dumpPublicationSchema(fout, (const PublicationSchemaInfo *) dobj); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; @@ -18693,6 +18850,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_POLICY: case DO_PUBLICATION: case DO_PUBLICATION_REL: + case DO_PUBLICATION_SCHEMA: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index f5e170e0db..37554cee63 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_POLICY, DO_PUBLICATION, DO_PUBLICATION_REL, + DO_PUBLICATION_SCHEMA, DO_SUBSCRIPTION } DumpableObjectType; @@ -94,6 +95,7 @@ typedef uint32 DumpComponents; /* a bitmask of dump object components */ #define DUMP_COMPONENT_ACL (1 << 4) #define DUMP_COMPONENT_POLICY (1 << 5) #define DUMP_COMPONENT_USERMAP (1 << 6) +#define DUMP_COMPONENT_PUBSCHEMA (1 << 7) #define DUMP_COMPONENT_ALL (0xFFFF) /* @@ -616,6 +618,7 @@ typedef struct _PublicationInfo bool pubdelete; bool pubtruncate; bool pubviaroot; + char pubtype; } PublicationInfo; /* @@ -629,6 +632,18 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; } PublicationRelInfo; +/* + * The PublicationSchemaInfo struct is used to represent publication schema + * mapping. + */ +typedef struct PublicationSchemaInfo +{ + DumpableObject dobj; + NamespaceInfo *pubschema; + char *pubname; + PublicationInfo *publication; +} PublicationSchemaInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -735,6 +750,8 @@ extern PublicationInfo *getPublications(Archive *fout, int *numPublications); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); +extern void getPublicationSchemas(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas); extern void getSubscriptions(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 46461fb6a1..13a6fcd660 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -82,6 +82,7 @@ enum dbObjectTypePriorities PRIO_POLICY, PRIO_PUBLICATION, PRIO_PUBLICATION_REL, + PRIO_PUBLICATION_SCHEMA, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] = PRIO_POLICY, /* DO_POLICY */ PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ + PRIO_PUBLICATION_SCHEMA, /* DO_PUBLICATION_SCHEMA */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION TABLE (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_SCHEMA: + snprintf(buf, bufsize, + "PUBLICATION SCHEMA (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 8333558bda..5e2b7d43cb 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -19,6 +19,7 @@ #include "catalog/pg_cast_d.h" #include "catalog/pg_class_d.h" #include "catalog/pg_default_acl_d.h" +#include "catalog/pg_publication.h" #include "common.h" #include "common/logging.h" #include "describe.h" @@ -3147,17 +3148,40 @@ describeOneTableDetails(const char *schemaname, /* print any publications */ if (pset.sversion >= 100000) { - printfPQExpBuffer(&buf, - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - "WHERE pr.prrelid = '%s'\n" - "UNION ALL\n" - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid); + if (pset.sversion >= 150000) + { + printfPQExpBuffer(&buf, + "SELECT p.pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_schema ps ON p.oid = ps.pspubid AND p.pubtype = 's'\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE p.pubtype = 't' AND pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.pubtype = 'a' \n" + " AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid); + } + else + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid); + } result = PSQLexec(buf.data); if (!result) @@ -5021,6 +5045,8 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; + int pub_schema_tuples = 0; + char **footers = NULL; initPQExpBuffer(&buf); printfPQExpBuffer(&buf, @@ -5061,9 +5087,66 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) myopt.title = _("List of schemas"); myopt.translate_header = true; + if (pattern && pset.sversion >= 150000) + { + PGresult *result; + int i; + + printfPQExpBuffer(&buf, + "SELECT p.pubname FROM pg_catalog.pg_publication p,\n" + "pg_catalog.pg_namespace n,\n" + "pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid AND\n" + "p.oid = ps.pspubid AND n.nspname = '%s'\n" + "ORDER BY 1", + pattern); + result = PSQLexec(buf.data); + if (!result) + return true; + else + pub_schema_tuples = PQntuples(result); + + if (pub_schema_tuples > 0) + { + /* + * Allocate memory for footers. Size of footers will be 1 (for + * storing "Publications:" string) + Schema count + 1 (for + * storing NULL). + */ + footers = (char **) palloc((1 + pub_schema_tuples + 1) * sizeof(char *)); + footers[0] = pstrdup(_("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < pub_schema_tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + footers[i + 1] = pstrdup(buf.data); + } + + footers[i + 1] = NULL; + myopt.footers = footers; + } + + PQclear(result); + } + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); PQclear(res); + + /* Free the memory allocated for the footer */ + if (footers) + { + char **footer = NULL; + + for (footer = footers; *footer; footer++) + pfree(*footer); + + pfree(footers); + } + return true; } @@ -6147,7 +6230,7 @@ listPublications(const char *pattern) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; - static const bool translate_columns[] = {false, false, false, false, false, false, false, false}; + static const bool translate_columns[] = {false, false, false, false, false, false, false, false, false}; if (pset.sversion < 100000) { @@ -6182,6 +6265,10 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", gettext_noop("Via root")); + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ",\n pubtype AS \"%s\"", + gettext_noop("PubType")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6210,6 +6297,42 @@ listPublications(const char *pattern) return true; } +/* + * Add footer to publication description. + */ +static bool +addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, + bool singlecol, printTableContent *cont) +{ + PGresult *res; + int count = 0; + int i = 0; + + res = PSQLexec(buf->data); + if (!res) + return false; + else + count = PQntuples(res); + + if (count > 0) + printTableAddFooter(cont, _(footermsg)); + + for (i = 0; i < count; i++) + { + if (!singlecol) + printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + else + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + + printTableAddFooter(cont, buf->data); + } + + PQclear(res); + termPQExpBuffer(buf); + return true; +} + /* * \dRp+ * Describes publications including the contents. @@ -6224,6 +6347,9 @@ describePublications(const char *pattern) PGresult *res; bool has_pubtruncate; bool has_pubviaroot; + bool has_pubtype; + PQExpBufferData title; + printTableContent cont; if (pset.sversion < 100000) { @@ -6237,6 +6363,7 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); + has_pubtype = (pset.sversion >= 150000); initPQExpBuffer(&buf); @@ -6250,6 +6377,10 @@ describePublications(const char *pattern) if (has_pubviaroot) appendPQExpBufferStr(&buf, ", pubviaroot"); + if (has_pubtype) + appendPQExpBufferStr(&buf, + ", pubtype"); + appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6287,20 +6418,18 @@ describePublications(const char *pattern) const char align = 'l'; int ncols = 5; int nrows = 1; - int tables = 0; - PGresult *tabres; char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; - int j; - PQExpBufferData title; + char pubtype = PUBTYPE_EMPTY; printTableOpt myopt = pset.popt.topt; - printTableContent cont; if (has_pubtruncate) ncols++; if (has_pubviaroot) ncols++; + if (has_pubtype) + ncols++; initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); @@ -6315,6 +6444,8 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); if (has_pubviaroot) printTableAddHeader(&cont, gettext_noop("Via root"), true, align); + if (has_pubtype) + printTableAddHeader(&cont, gettext_noop("Pubtype"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); @@ -6325,8 +6456,17 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (has_pubviaroot) printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); + if (has_pubtype) + { + char *type = PQgetvalue(res, i, 9); + + pubtype = get_publication_type(type); + printTableAddCell(&cont, type, false, false); + } - if (!puballtables) + /* Prior to version 15 check was based on all tables */ + if ((has_pubtype && pubtype == PUBTYPE_TABLE) || + (!has_pubtype && !puballtables)) { printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname\n" @@ -6337,31 +6477,20 @@ describePublications(const char *pattern) " AND c.oid = pr.prrelid\n" " AND pr.prpubid = '%s'\n" "ORDER BY 1,2", pubid); - - tabres = PSQLexec(buf.data); - if (!tabres) - { - printTableCleanup(&cont); - PQclear(res); - termPQExpBuffer(&buf); - termPQExpBuffer(&title); - return false; - } - else - tables = PQntuples(tabres); - - if (tables > 0) - printTableAddFooter(&cont, _("Tables:")); - - for (j = 0; j < tables; j++) - { - printfPQExpBuffer(&buf, " \"%s.%s\"", - PQgetvalue(tabres, j, 0), - PQgetvalue(tabres, j, 1)); - - printTableAddFooter(&cont, buf.data); - } - PQclear(tabres); + if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont)) + goto error_return; + } + else if (has_pubtype && pubtype == PUBTYPE_SCHEMA) + { + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid\n" + " AND ps.pspubid = '%s'\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, "Schemas:", true, &cont)) + goto error_return; } printTable(&cont, pset.queryFout, false, pset.logfile); @@ -6374,6 +6503,13 @@ describePublications(const char *pattern) PQclear(res); return true; + +error_return: + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index d6bf725971..c6227f95e2 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1640,10 +1640,19 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION */ else if (Matches("ALTER", "PUBLICATION", MatchAny)) - COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET"); + COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); + /* ALTER PUBLICATION ADD */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) + COMPLETE_WITH("SCHEMA", "TABLE"); + /* ALTER PUBLICATION DROP */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) + COMPLETE_WITH("SCHEMA", "TABLE"); /* ALTER PUBLICATION SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "TABLE"); + COMPLETE_WITH("(", "SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA'"); /* ALTER PUBLICATION SET ( */ else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); @@ -2634,15 +2643,20 @@ psql_completion(const char *text, int start, int end) /* CREATE PUBLICATION */ else if (Matches("CREATE", "PUBLICATION", MatchAny)) - COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "WITH ("); + COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR SCHEMA", "WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR")) - COMPLETE_WITH("TABLE", "ALL TABLES"); + COMPLETE_WITH("TABLE", "ALL TABLES", "SCHEMA"); /* Complete "CREATE PUBLICATION FOR TABLE , ..." */ else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* Complete "CREATE PUBLICATION [...] WITH" */ else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("WITH", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); + /* Complete "CREATE PUBLICATION FOR SCHEMA , ..." */ + else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA' " + "UNION SELECT 'WITH ('"); /* CREATE RULE */ /* Complete "CREATE [ OR REPLACE ] RULE " with "AS ON" */ diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd..e5e88d3a31 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -123,6 +123,7 @@ typedef enum ObjectClass OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ + OCLASS_PUBLICATION_SCHEMA, /* pg_publication_schema */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ } ObjectClass; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..b55d5205b1 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -18,7 +18,6 @@ #define PG_PUBLICATION_H #include "catalog/genbki.h" -#include "catalog/objectaddress.h" #include "catalog/pg_publication_d.h" /* ---------------- @@ -54,6 +53,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + + /* see PUBTYPE_xxx constants below */ + char pubtype; } FormData_pg_publication; /* ---------------- @@ -81,12 +83,9 @@ typedef struct Publication bool alltables; bool pubviaroot; PublicationActions pubactions; + char pubtype; } Publication; -extern Publication *GetPublication(Oid pubid); -extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); - /*--------- * Expected values for pub_partopt parameter of GetRelationPublications(), * which allows callers to specify which partitions of partitioned tables @@ -103,16 +102,26 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); -extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); - -extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, - bool if_not_exists); - -extern Oid get_publication_oid(const char *pubname, bool missing_ok); -extern char *get_publication_name(Oid pubid, bool missing_ok); - +/* Publication types */ +#define PUBTYPE_ALLTABLES 'a' /* all tables publication */ +#define PUBTYPE_TABLE 't' /* table publication */ +#define PUBTYPE_SCHEMA 's' /* schema publication */ +#define PUBTYPE_EMPTY 'e' /* empty publication */ + +/* + * Return the publication type. +*/ +static inline char +get_publication_type(char *strpubtype) +{ + if (strcmp(strpubtype, "a") == 0) + return PUBTYPE_ALLTABLES; + else if (strcmp(strpubtype, "t") == 0) + return PUBTYPE_TABLE; + else if (strcmp(strpubtype, "s") == 0) + return PUBTYPE_SCHEMA; + + return PUBTYPE_EMPTY; +} #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h new file mode 100644 index 0000000000..fc50655af1 --- /dev/null +++ b/src/include/catalog/pg_publication_schema.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_schema.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_schema) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_schema.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_SCHEMA_H +#define PG_PUBLICATION_SCHEMA_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_schema_d.h" + + +/* ---------------- + * pg_publication_schema definition. cpp turns this into + * typedef struct FormData_pg_publication_schema + * ---------------- + */ +CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId) +{ + Oid oid; /* oid */ + Oid pspubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid psnspcid BKI_LOOKUP(pg_class); /* Oid of the schema */ +} FormData_pg_publication_schema; + +/* ---------------- + * Form_pg_publication_schema corresponds to a pointer to a tuple with + * the format of pg_publication_schema relation. + * ---------------- + */ +typedef FormData_pg_publication_schema *Form_pg_publication_schema; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_schema_oid_index, 8902, PublicationSchemaObjectIndexId, on pg_publication_schema using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_schema_psnspcid_pspubid_index, 8903, PublicationSchemaPsnspcidPspubidIndexId, on pg_publication_schema using btree(psnspcid oid_ops, pspubid oid_ops)); + +#endif /* PG_PUBLICATION_SCHEMA_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index a3fa2ac6cd..76c10f2b3c 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -16,13 +16,35 @@ #define PUBLICATIONCMDS_H #include "catalog/objectaddress.h" +#include "catalog/pg_publication.h" #include "parser/parse_node.h" extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); +extern Publication *GetPublication(Oid pubid); +extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); +extern List *GetRelationPublications(Oid relid); + +extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetAllTablesPublications(void); +extern List *GetAllTablesPublicationRelations(bool pubviaroot, Oid schemaOid); +extern List *GetAllSchemasPublicationRelations(bool pubviaroot, Oid puboid); + +extern bool is_publishable_relation(Relation rel); +extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, + bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid, + bool if_not_exists); + +extern Oid get_publication_oid(const char *pubname, bool missing_ok); +extern char *get_publication_name(Oid pubid, bool missing_ok); + #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index f7b009ec43..4653f02624 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -484,6 +484,7 @@ typedef enum NodeTag T_CTECycleClause, T_CommonTableExpr, T_RoleSpec, + T_SchemaSpec, T_TriggerTransition, T_PartitionElem, T_PartitionSpec, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 947660a4b0..dd7e60105d 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -341,6 +341,23 @@ typedef struct RoleSpec int location; /* token location, or -1 if unknown */ } RoleSpec; +/* + * SchemaSpec - a schema name or CURRENT_SCHEMA + */ +typedef enum SchemaSpecType +{ + SCHEMASPEC_CSTRING, /* schema name is stored as a C string */ + SCHEMASPEC_CURRENT_SCHEMA /* schema spec is CURRENT_SCHEMA */ +} SchemaSpecType; + +typedef struct SchemaSpec +{ + NodeTag type; + SchemaSpecType schematype; /* type of this schemaspec */ + char *schemaname; /* filled only for SCHEMASPEC_CSTRING */ + int location; /* token location, or -1 if unknown */ +} SchemaSpec; + /* * FuncCall - a function or aggregate invocation * @@ -1805,6 +1822,7 @@ typedef enum ObjectType OBJECT_PROCEDURE, OBJECT_PUBLICATION, OBJECT_PUBLICATION_REL, + OBJECT_PUBLICATION_SCHEMA, OBJECT_ROLE, OBJECT_ROUTINE, OBJECT_RULE, @@ -3631,6 +3649,7 @@ typedef struct CreatePublicationStmt List *options; /* List of DefElem nodes */ List *tables; /* Optional list of tables to add */ bool for_all_tables; /* Special publication for all tables in db */ + List *schemas; /* Optional list of schemas */ } CreatePublicationStmt; typedef struct AlterPublicationStmt @@ -3645,6 +3664,7 @@ typedef struct AlterPublicationStmt List *tables; /* List of tables to add/drop */ bool for_all_tables; /* Special publication for all tables in db */ DefElemAction tableAction; /* What action to perform with the tables */ + List *schemas; /* Optional list of schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b4faa1c123..4415d9cd76 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -19,6 +19,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" +#include "catalog/objectaddress.h" #include "nodes/bitmapset.h" #include "partitioning/partdefs.h" #include "rewrite/prs2lock.h" diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600..1ba295206a 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -79,6 +79,8 @@ enum SysCacheIdentifier PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, + PUBLICATIONSCHEMA, + PUBLICATIONSCHEMAMAP, RANGEMULTIRANGE, RANGETYPE, RELNAMENSP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cd..ddb421c394 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -260,6 +260,8 @@ NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} +NOTICE: checking pg_publication_schema {pspubid} => pg_publication {oid} +NOTICE: checking pg_publication_schema {psnspcid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4a5ef0bc24..736df15463 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -30,20 +30,20 @@ ERROR: conflicting or redundant options LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi... ^ \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | e + testpub_default | regress_publication_user | f | f | t | f | f | f | e (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType +--------------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f | e + testpub_default | regress_publication_user | f | t | t | t | f | f | e (2 rows) --- adding tables @@ -87,10 +87,10 @@ Publications: "testpub_foralltables" \dRp+ testpub_foralltables - Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | t | t | t | f | f | f + Publication testpub_foralltables + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | t | t | t | f | f | f | a (1 row) DROP TABLE testpub_tbl2; @@ -102,19 +102,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | t Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4 + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | t Tables: "public.testpub_tbl3" @@ -133,10 +133,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | t Tables: "public.testpub_parted" @@ -149,10 +149,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1; ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | t + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | t | t Tables: "public.testpub_parted" @@ -172,10 +172,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortbl + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | t | f | t Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -213,10 +213,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | f | f | t Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -260,10 +260,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | Pubtype +--------------------------+------------+---------+---------+---------+-----------+----------+--------- + regress_publication_user | f | t | t | t | f | f | t (1 row) -- fail - must be owner of publication @@ -273,20 +273,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType +-------------+--------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_foo | regress_publication_user | f | t | t | t | f | f | t (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root | PubType +-----------------+---------------------------+------------+---------+---------+---------+-----------+----------+--------- + testpub_default | regress_publication_user2 | f | t | t | t | f | f | t (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff53..3b4f62025f 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -141,6 +141,7 @@ pg_policy|t pg_proc|t pg_publication|t pg_publication_rel|t +pg_publication_schema|t pg_range|t pg_replication_origin|t pg_rewrite|t diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 37cf4b2f76..fb5daa49eb 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -777,6 +777,7 @@ FormData_pg_policy FormData_pg_proc FormData_pg_publication FormData_pg_publication_rel +FormData_pg_publication_schema FormData_pg_range FormData_pg_replication_origin FormData_pg_rewrite @@ -833,6 +834,7 @@ Form_pg_policy Form_pg_proc Form_pg_publication Form_pg_publication_rel +Form_pg_publication_schema Form_pg_range Form_pg_replication_origin Form_pg_rewrite @@ -2045,6 +2047,7 @@ PublicationActions PublicationInfo PublicationPartOpt PublicationRelInfo +PublicationSchemaInfo PullFilter PullFilterOps PushFilter @@ -2329,6 +2332,8 @@ ScanState ScanTypeControl ScannerCallbackState SchemaQuery +SchemaSpec +SchemaSpecType SecBuffer SecBufferDesc SecLabelItem -- 2.30.2