From 5cbd8836a12fe5423cd5751fbc0fe8ec4fb2dfab Mon Sep 17 00:00:00 2001 From: Vigneshwaran C Date: Thu, 2 Sep 2021 11:13:46 +0530 Subject: [PATCH v24 2/5] Added schema level support for publication. This patch adds schema-level support for publication. A new option "FOR ALL TABLES IN SCHEMA" allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. A new system table "pg_publication_schema" has been added, to maintain the schemas that the user wants to publish through the publication. The schema/publication/publication_schema dependency was created to handle the corresponding renaming/removal of schemas to the publication/publication_schema when the schema is renamed/dropped. The Decoder identifies if the relation is part of the publication and replicates it to the subscriber. CATALOG_VERSION_NO needs to be updated while committing, as this feature involves a catalog change. --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 149 ++++++++ src/backend/catalog/pg_publication.c | 280 ++++++++++++++- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 378 +++++++++++++++++++- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 4 +- src/backend/nodes/copyfuncs.c | 5 +- src/backend/nodes/equalfuncs.c | 5 +- src/backend/parser/gram.y | 193 ++++++++-- src/backend/replication/pgoutput/pgoutput.c | 17 +- src/backend/utils/cache/relcache.c | 4 + src/backend/utils/cache/syscache.c | 23 ++ src/bin/pg_dump/common.c | 3 + src/bin/pg_dump/pg_backup_archiver.c | 3 +- src/bin/pg_dump/pg_dump.c | 144 +++++++- src/bin/pg_dump/pg_dump.h | 15 + src/bin/pg_dump/pg_dump_sort.c | 7 + src/bin/psql/describe.c | 192 +++++++--- src/bin/psql/tab-complete.c | 32 +- src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 8 + src/include/catalog/pg_publication_schema.h | 47 +++ src/include/commands/publicationcmds.h | 1 + src/include/nodes/nodes.h | 2 + src/include/nodes/parsenodes.h | 48 ++- src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/publication.out | 6 +- src/test/regress/expected/sanity_check.out | 1 + src/tools/pgindent/typedefs.list | 7 + 34 files changed, 1492 insertions(+), 108 deletions(-) create mode 100644 src/include/catalog/pg_publication_schema.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e77361..b2ee87b105 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \ + pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154e..09d7f1a5ea 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3428,6 +3428,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_TABCONSTRAINT: @@ -3567,6 +3568,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_TRANSFORM: case OBJECT_TSPARSER: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 76b65e39c4..d974750473 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -50,6 +50,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -180,6 +181,7 @@ static const Oid object_classes[] = { PolicyRelationId, /* OCLASS_POLICY */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ + PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ TransformRelationId /* OCLASS_TRANSFORM */ }; @@ -1460,6 +1462,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePublicationRelById(object->objectId); break; + case OCLASS_PUBLICATION_SCHEMA: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_CAST: case OCLASS_COLLATION: case OCLASS_CONVERSION: @@ -2853,6 +2859,9 @@ getObjectClass(const ObjectAddress *object) case PublicationRelRelationId: return OCLASS_PUBLICATION_REL; + case PublicationSchemaRelationId: + return OCLASS_PUBLICATION_SCHEMA; + case SubscriptionRelationId: return OCLASS_SUBSCRIPTION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index deaabaeae9..f6df10ec0b 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -49,6 +49,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -829,6 +830,10 @@ static const struct object_type_map { "publication relation", OBJECT_PUBLICATION_REL }, + /* OCLASS_PUBLICATION_SCHEMA */ + { + "publication schema", OBJECT_PUBLICATION_SCHEMA + }, /* OCLASS_SUBSCRIPTION */ { "subscription", OBJECT_SUBSCRIPTION @@ -875,6 +880,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1118,6 +1125,10 @@ get_object_address(ObjectType objtype, Node *object, &relation, missing_ok); break; + case OBJECT_PUBLICATION_SCHEMA: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_DEFACL: address = get_object_address_defacl(castNode(List, object), missing_ok); @@ -1935,6 +1946,49 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication tables in schema. The first + * element of the object parameter is the schema name, the second is the + * publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + char *pubname; + Publication *pub; + char *schemaname; + Oid schemaoid; + + ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaoid = get_namespace_oid(schemaname, missing_ok); + if (!OidIsValid(schemaoid)) + return address; + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication tables of schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2207,6 +2261,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_CAST: case OBJECT_USER_MAPPING: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_DEFACL: case OBJECT_TRANSFORM: if (list_length(args) != 1) @@ -2299,6 +2354,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2848,6 +2904,55 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) return tuple; } +/* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd string which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_schema psform; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + psform = (Form_pg_publication_schema) GETSTRUCT(tup); + *pubname = get_publication_name(psform->pspubid, missing_ok); + if (!(*pubname)) + { + ReleaseSysCache(tup); + return false; + } + + *nspname = get_namespace_name(psform->psnspcid); + if (!(*nspname)) + { + Oid psnspcid = psform->psnspcid; + + pfree(pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + psnspcid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + /* * getObjectDescription: build an object description for messages * @@ -3903,6 +4008,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_SCHEMA: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname = get_subscription_name(object->objectId, @@ -4477,6 +4598,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication relation"); break; + case OCLASS_PUBLICATION_SCHEMA: + appendStringInfoString(&buffer, "publication schema"); + break; + case OCLASS_SUBSCRIPTION: appendStringInfoString(&buffer, "subscription"); break; @@ -5712,6 +5837,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_SCHEMA: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_SUBSCRIPTION: { char *subname; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 49a089fbbb..81b0db3f86 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,16 +28,18 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" +#include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -75,6 +77,30 @@ check_publication_add_relation(Relation targetrel) errdetail("Temporary and unlogged relations cannot be replicated."))); } +/* + * Check if schema can be in given publication and throws appropriate + * error if not. + */ +static void +check_publication_add_schema(Oid schemaoid) +{ + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaoid) || IsToastNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a system schema", + get_namespace_name(schemaoid)), + errdetail("This operation is not supported for system schemas."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a temporary schema", + get_namespace_name(schemaoid)), + errdetail("Temporary schemas cannot be replicated."))); +} + /* * Returns if relation represented by oid and Form_pg_class entry * is publishable. @@ -214,6 +240,84 @@ publication_add_relation(Oid pubid, Relation targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_schema]; + bool nulls[Natts_pg_publication_schema]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + check_publication_add_schema(schemaoid); + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaoid), pub->name))); + } + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId, + Anum_pg_publication_schema_oid); + values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_schema_pspubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_schema_psnspcid - 1] = + ObjectIdGetDatum(schemaoid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationSchemaRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaoid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + schemaRels = GetSchemaPublicationRelations(schemaoid, PUBLICATION_PART_ALL); + + /* Invalidate relcache so that publication info is rebuilt. */ + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -317,6 +421,73 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) return result; } +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR ALL TABLES IN SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; + + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationSchemaRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_schema_pspubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, PublicationSchemaPsnspcidPspubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_schema pubsch; + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->psnspcid); + } + + systable_endscan(scan); + table_close(pubschsrel, AccessShareLock); + + return result; +} + + +/* + * Gets the list of FOR ALL TABLES IN SCHEMA publication oids associated with a + * specified schema oid + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONSCHEMAMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_schema) GETSTRUCT(tup))->pspubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); + + return result; +} + /* * Gets list of publication oids for publications marked as FOR ALL TABLES. */ @@ -355,7 +526,7 @@ GetAllTablesPublications(void) } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). + * Gets the list of relations published. * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the @@ -417,6 +588,97 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets list of relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaOid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[3]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + int keycount = 0; + + Assert(schemaOid != InvalidOid); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_RELATION)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaOid); + + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + keycount = 0; + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_PARTITIONED_TABLE)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaOid); + + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + + /* Skip the relations which are not publishable */ + if (!is_publishable_class(relForm->oid, relForm)) + continue; + + result = GetPubPartitionOptionRelations(result, pub_partopt, + relForm->oid); + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR ALL TABLES IN SCHEMA + * publication(s). + */ +List * +GetAllSchemasPublicationRelations(Oid puboid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(puboid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaOid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaOid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -546,10 +808,22 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) if (publication->alltables) tables = GetAllTablesPublicationRelations(publication->pubviaroot); else - tables = GetPublicationRelations(publication->oid, + { + List *relids, + *schemarelids; + + relids = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + schemarelids = GetAllSchemasPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + + tables = list_concat_unique_oid(relids, schemarelids); + } + funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 29249498a9..e7c27459d8 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: /* ignore object types that don't have schema-qualified names */ diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e..35f47d3253 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROUTINE: case OBJECT_RULE: case OBJECT_SCHEMA: @@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: return true; @@ -2127,6 +2129,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: @@ -2209,6 +2212,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index e1d17f6fa6..dd3fb27f28 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,8 +25,10 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -34,12 +36,12 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -50,6 +52,10 @@ static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void LockSchemaList(List *schemalist); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -135,6 +141,145 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the SchemaSpec list into an Oid list. + */ +static List * +ConvertSchemaSpecListToOidList(List *schemas) +{ + List *schemaoidlist = NIL; + ListCell *cell; + + foreach(cell, schemas) + { + SchemaSpec *schema = (SchemaSpec *) lfirst(cell); + Oid schemaoid; + List *search_path; + char *nspname; + + if (schema->schematype == SCHEMASPEC_CURRENT_SCHEMA) + { + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + schemaoid = linitial_oid(search_path); + nspname = get_namespace_name(schemaoid); + if (nspname == NULL) /* recently-deleted namespace? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + list_free(search_path); + } + else + schemaoid = get_namespace_oid(schema->schemaname, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + schemaoidlist = list_append_unique_oid(schemaoidlist, schemaoid); + } + + return schemaoidlist; +} + +/* + * Convert the PublicationObjSpecType list into schema oid list and rangevar + * list. + */ +static void +ConvertPubObjSpecListToOidList(List *pubobjspec_list, ParseState *pstate, + List **rels, List **schemas) +{ + ListCell *cell; + PublicationObjSpec *pubobj; + PublicationObjSpecType prevobjtype = PUBLICATIONOBJ_UNKNOWN; + + if (!pubobjspec_list) + return; + + pubobj = (PublicationObjSpec *) linitial(pubobjspec_list); + if (pubobj->pubobjtype == PUBLICATIONOBJ_UNKNOWN) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("FOR TABLE/FOR ALL TABLES IN SCHEMA should be specified before the table/schema name(s)"), + parser_errposition(pstate, pubobj->location)); + + foreach(cell, pubobjspec_list) + { + pubobj = (PublicationObjSpec *) lfirst(cell); + + if (pubobj->pubobjtype == PUBLICATIONOBJ_UNKNOWN) + pubobj->pubobjtype = prevobjtype; + else + prevobjtype = pubobj->pubobjtype; + + if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE) + { + RangeVar *rel; + char *relname = strVal(linitial(pubobj->name)); + + if (list_length(pubobj->name) == 1 && + (strcmp(relname, "CURRENT_SCHEMA") == 0)) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid relation name at or near"), + parser_errposition(pstate, pubobj->location)); + + rel = makeRangeVarFromNameList(pubobj->name); + rel->inh = pubobj->inh; + rel->location = pubobj->location; + *rels = lappend(*rels, rel); + } + else if (pubobj->pubobjtype == PUBLICATIONOBJ_SCHEMA) + { + Oid schemaoid; + char *schemaname; + + if (list_length(pubobj->name) > 1) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("improper qualified name (too many dotted names): %s", + NameListToString(pubobj->name)), + parser_errposition(pstate, pubobj->location))); + + if (pubobj->spl_rel_type_syn) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid schema name at or near"), + parser_errposition(pstate, pubobj->location)); + + schemaname = strVal(linitial(pubobj->name)); + if (strcmp(schemaname, "CURRENT_SCHEMA") == 0) + { + List *search_path; + char *nspname; + + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + schemaoid = linitial_oid(search_path); + nspname = get_namespace_name(schemaoid); + if (nspname == NULL) /* recently-deleted namespace? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + list_free(search_path); + } + else + schemaoid = get_namespace_oid(schemaname, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + *schemas = list_append_unique_oid(*schemas, schemaoid); + } + } +} + /* * Create new publication. */ @@ -152,6 +297,8 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) bool publish_via_partition_root_given; bool publish_via_partition_root; AclResult aclresult; + List *relations = NIL; + List *schemaoidlist = NIL; /* must have CREATE privilege on database */ aclresult = pg_database_aclcheck(MyDatabaseId, GetUserId(), ACL_CREATE); @@ -221,19 +368,40 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); - if (stmt->tables) + ConvertPubObjSpecListToOidList(stmt->pubobjects, pstate, &relations, + &schemaoidlist); + if (relations != NIL) { List *rels; - Assert(list_length(stmt->tables) > 0); + Assert(list_length(relations) > 0); - rels = OpenTableList(stmt->tables); + rels = OpenTableList(relations); PublicationAddTables(puboid, rels, true, NULL); CloseTableList(rels); } - table_close(rel, RowExclusiveLock); + if (schemaoidlist != NIL) + { + /* FOR ALL TABLES IN SCHEMA requires superuser */ + if (!superuser()) + ereport(ERROR, + errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR ALL TABLES IN SCHEMA publication")); + Assert(list_length(schemaoidlist) > 0); + + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. No need to unlock the schemas, the + * locks will be released automatically at the end of create + * publication command. + */ + LockSchemaList(schemaoidlist); + PublicationAddSchemas(puboid, schemaoidlist, true, NULL); + } + + table_close(rel, RowExclusiveLock); InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0); if (wal_level != WAL_LEVEL_LOGICAL) @@ -313,13 +481,19 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, } else { + List *relids = NIL; + List *schemarelids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + schemarelids = GetAllSchemasPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + relids = list_concat_unique_oid(relids, schemarelids); InvalidatePublicationRels(relids); } @@ -369,15 +543,15 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + errdetail("Tables cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); - if (stmt->tableAction == DEFELEM_ADD) + if (stmt->action == DEFELEM_ADD) PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -428,11 +602,70 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add/Remove/Set all tables from schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup) +{ + List *schemaoidlist = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* Check that user is allowed to manipulate the publication tables */ + if (pubform->puballtables) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Tables from schema cannot be added to, dropped from, or set on FOR ALL TABLES publications."))); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* Convert the text list into oid list */ + schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas); + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. No need to unlock the schemas, the locks + * will be released automatically at the end of alter publication command. + */ + LockSchemaList(schemaoidlist); + if (stmt->action == DEFELEM_ADD) + PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt); + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaoidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaoidlist); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt); + } + + return; +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -461,6 +694,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); + else if (stmt->schemas) + AlterPublicationSchemas(stmt, rel, tup); else AlterPublicationTables(stmt, rel, tup); @@ -499,6 +734,58 @@ RemovePublicationRelById(Oid proid) table_close(rel, RowExclusiveLock); } +/* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_schema pubsch; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + schemaRels = GetSchemaPublicationRelations(pubsch->psnspcid, + PUBLICATION_PART_ALL); + + /* Invalidate relcache so that publication info is rebuilt. */ + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * The schemas specified in the schema list are locked in AccessShareLock mode + * in order to add them to a publication. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + + foreach(lc, schemalist) + { + Oid schemaoid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + + LockDatabaseObject(NamespaceRelationId, schemaoid, 0, AccessShareLock); + } +} + /* * Open relations specified by a RangeVar list. * The returned tables are locked in ShareUpdateExclusiveLock mode in order to @@ -633,6 +920,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || !stmt->for_all_tables); + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + ObjectAddress obj; + + /* Must be owner of the schema or superuser */ + if (!pg_namespace_ownercheck(schemaoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA, + get_namespace_name(schemaoid)); + + obj = publication_add_schema(pubid, schemaoid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationSchemaRelationId, + obj.objectId, 0); + } + } +} + /* * Remove listed tables from the publication. */ @@ -667,6 +987,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, + Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("tables from schema \"%s\" are not part of the publication", + get_namespace_name(schemaoid)))); + } + + ObjectAddressSet(obj, PublicationSchemaRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index ddc019cb39..accaf2ed2e 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: case OBJECT_TABCONSTRAINT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index dbee6ae199..3e57a152f4 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -12253,6 +12253,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15832,7 +15833,8 @@ ATPrepChangePersistence(Relation rel, bool toLogged) * UNLOGGED as UNLOGGED tables can't be published. */ if (!toLogged && - list_length(GetRelationPublications(RelationGetRelid(rel))) > 0) + (list_length(GetRelationPublications(RelationGetRelid(rel))) > 0 || + list_length(GetSchemaPublications(rel->rd_rel->relnamespace)) > 0)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 38251c2b8e..bcb937c7be 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4808,7 +4808,7 @@ _copyCreatePublicationStmt(const CreatePublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); - COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(pubobjects); COPY_SCALAR_FIELD(for_all_tables); return newnode; @@ -4822,8 +4822,9 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_STRING_FIELD(pubname); COPY_NODE_FIELD(options); COPY_NODE_FIELD(tables); + COPY_NODE_FIELD(schemas); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8a1762000c..05ca195af8 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2295,7 +2295,7 @@ _equalCreatePublicationStmt(const CreatePublicationStmt *a, { COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); - COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(pubobjects); COMPARE_SCALAR_FIELD(for_all_tables); return true; @@ -2308,8 +2308,9 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, COMPARE_STRING_FIELD(pubname); COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(tables); + COMPARE_NODE_FIELD(schemas); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..e408acfce3 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -169,6 +169,7 @@ static Node *makeNullAConst(int location); static Node *makeAConst(Value *v, int location); static Node *makeBoolAConst(bool state, int location); static RoleSpec *makeRoleSpec(RoleSpecType type, int location); +static SchemaSpec *makeSchemaSpec(SchemaSpecType type, int location); static void check_qualified_name(List *names, core_yyscan_t yyscanner); static List *check_func_name(List *names, core_yyscan_t yyscanner); static List *check_indirection(List *indirection, core_yyscan_t yyscanner); @@ -257,6 +258,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + SchemaSpec *schemaspec; + PublicationObjSpec *publicationobjectspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -426,14 +429,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list schema_list pub_obj_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables %type opt_fdw_options fdw_options %type fdw_option @@ -554,7 +556,9 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by - +%type SchemaSpec +%type PublicationObjSpec +%type pubobj_expr %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword %type bare_label_keyword @@ -9591,45 +9595,135 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] + * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR TABLE [WITH options] * + * CREATE PUBLICATION FOR ALL TABLES IN SCHEMA [WITH options] *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR pub_obj_list opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $6; + n->pubobjects = (List *)$5; $$ = (Node *)n; } ; -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE relation_expr_list +pubobj_expr: + any_name + { + /* inheritance query, implicitly */ + PublicationObjSpec *n = makeNode(PublicationObjSpec); + n->name = $1; + n->inh = true; + n->spl_rel_type_syn = false; + $$ = n; + } + | any_name '*' + { + /* inheritance query, explicitly */ + PublicationObjSpec *n = makeNode(PublicationObjSpec); + n->name = $1; + n->inh = true; + n->spl_rel_type_syn = true; + $$ = n; + } + | ONLY any_name + { + /* no inheritance */ + PublicationObjSpec *n = makeNode(PublicationObjSpec); + n->name = $2; + n->inh = false; + n->spl_rel_type_syn = true; + $$ = n; + } + | ONLY '(' any_name ')' { - $$ = (Node *) $3; + /* no inheritance, SQL99-style syntax */ + PublicationObjSpec *n = makeNode(PublicationObjSpec); + n->name = $3; + n->inh = false; + n->spl_rel_type_syn = true; + $$ = n; } - | FOR ALL TABLES + | CURRENT_SCHEMA { - $$ = (Node *) makeInteger(true); + PublicationObjSpec *n = makeNode(PublicationObjSpec); + n->name = list_make1(makeString("CURRENT_SCHEMA")); + n->inh = false; + n->spl_rel_type_syn = false; + $$ = n; } ; +/* FOR TABLE and FOR ALL TABLES IN SCHEMA specifications */ +PublicationObjSpec: TABLE pubobj_expr + { + $$ = $2; + $$->pubobjtype = PUBLICATIONOBJ_TABLE; + $$->location = @1; + } + + | ALL TABLES IN_P SCHEMA pubobj_expr + { + $$ = $5; + $$->pubobjtype = PUBLICATIONOBJ_SCHEMA; + $$->location = @1; + } + | pubobj_expr + { + $$ = $1; + $$->pubobjtype = PUBLICATIONOBJ_UNKNOWN; + $$->location = @1; + } + ; + +pub_obj_list: PublicationObjSpec + { $$ = list_make1($1); } + | pub_obj_list ',' PublicationObjSpec + { $$ = lappend($1, $3); } + ; + +/* Schema specifications */ +SchemaSpec: ColId + { + SchemaSpec *n; + n = makeSchemaSpec(SCHEMASPEC_CSTRING, @1); + n->schemaname = pstrdup($1); + $$ = n; + } + | CURRENT_SCHEMA + { + $$ = makeSchemaSpec(SCHEMASPEC_CURRENT_SCHEMA, @1); + } + ; + +schema_list: SchemaSpec + { $$ = list_make1($1); } + | schema_list ',' SchemaSpec + { $$ = lappend($1, $3); } + ; /***************************************************************************** * @@ -9641,6 +9735,11 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD ALL TABLES IN SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name DROP ALL TABLES IN SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name SET ALL TABLES IN SCHEMA schema [, schema2] *****************************************************************************/ AlterPublicationStmt: @@ -9656,7 +9755,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->action = DEFELEM_ADD; $$ = (Node *)n; } | ALTER PUBLICATION name SET TABLE relation_expr_list @@ -9664,7 +9763,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_SET; + n->action = DEFELEM_SET; $$ = (Node *)n; } | ALTER PUBLICATION name DROP TABLE relation_expr_list @@ -9672,7 +9771,31 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->action = DEFELEM_DROP; + $$ = (Node *)n; + } + | ALTER PUBLICATION name ADD_P ALL TABLES IN_P SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $9; + n->action = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET ALL TABLES IN_P SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $9; + n->action = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP ALL TABLES IN_P SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $9; + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -16621,6 +16744,20 @@ makeRoleSpec(RoleSpecType type, int location) return spec; } +/* + * makeSchemaSpec - Create a SchemaSpec with the given type and location + */ +static SchemaSpec * +makeSchemaSpec(SchemaSpecType type, int location) +{ + SchemaSpec *spec = makeNode(SchemaSpec); + + spec->schematype = type; + spec->location = location; + + return spec; +} + /* check_qualified_name --- check the result of qualified_name production * * It's easiest to let the grammar production for qualified_name allow diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..1d80bb0fef 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1068,6 +1068,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONSCHEMAMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1149,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1203,6 +1214,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; @@ -1212,7 +1225,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) } } - if (list_member_oid(pubids, pub->oid) || ancestor_published) + if (list_member_oid(pubids, pub->oid) || + list_member_oid(schemaPubids, pub->oid) || + ancestor_published) publish = true; } diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..c83d6421a2 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -5447,6 +5447,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5478,6 +5479,9 @@ GetRelationPublicationActions(Relation relation) } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8..924b7bcad5 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -51,6 +51,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" #include "catalog/pg_rewrite.h" @@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = { }, 64 }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMA */ + PublicationSchemaObjectIndexId, + 1, + { + Anum_pg_publication_schema_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMAMAP */ + PublicationSchemaPsnspcidPspubidIndexId, + 2, + { + Anum_pg_publication_schema_psnspcid, + Anum_pg_publication_schema_pspubid, + 0, + 0 + }, + 64 + }, {RangeRelationId, /* RANGEMULTIRANGE */ RangeMultirangeTypidIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 1f24e79665..baf44424c8 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading publication membership"); getPublicationTables(fout, tblinfo, numTables); + pg_log_info("reading publication tables in schemas"); + getPublicationNamespaces(fout, nspinfo, numNamespaces); + pg_log_info("reading subscriptions"); getSubscriptions(fout); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index ee06dc6822..6d690ee49c 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2788,7 +2788,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH) */ if (ropt->no_publications && (strcmp(te->desc, "PUBLICATION") == 0 || - strcmp(te->desc, "PUBLICATION TABLE") == 0)) + strcmp(te->desc, "PUBLICATION TABLE") == 0 || + strcmp(te->desc, "PUBLICATION TABLES IN SCHEMA") == 0)) return 0; /* If it's a security label, maybe ignore it */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 67be849829..0e3e3a8392 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -1630,9 +1630,13 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout) if (nsinfo->nspowner == BOOTSTRAP_SUPERUSERID) nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION; nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; } else + { nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; + } /* * In any case, a namespace can be excluded by an exclusion switch @@ -3960,21 +3964,25 @@ getPublications(Archive *fout, int *numPublications) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, p.pubviaroot " "FROM pg_publication p", username_subquery); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, false AS pubviaroot " "FROM pg_publication p", username_subquery); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, false AS pubtruncate, " + "false AS pubviaroot " "FROM pg_publication p", username_subquery); @@ -4125,6 +4133,94 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) free(qpubname); } +/* + * getPublicationNamespaces + * get information about publication membership for dumpable schemas. + */ +void +getPublicationNamespaces(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas) +{ + PQExpBuffer query; + PGresult *res; + PublicationSchemaInfo *pubsinfo; + DumpOptions *dopt = fout->dopt; + int i_tableoid; + int i_oid; + int i_pspubid; + int i_psnspcid; + int i, + j, + ntups; + + if (dopt->no_publications || fout->remoteVersion < 150000) + return; + + query = createPQExpBuffer(); + + /* Collect all publication membership info. */ + appendPQExpBufferStr(query, + "SELECT tableoid, oid, pspubid, psnspcid " + "FROM pg_catalog.pg_publication_schema"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pspubid = PQfnumber(res, "pspubid"); + i_psnspcid = PQfnumber(res, "psnspcid"); + + /* this allocation may be more than we need */ + pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); + j = 0; + + for (i = 0; i < ntups; i++) + { + Oid pspubid = atooid(PQgetvalue(res, i, i_pspubid)); + Oid psnspcid = atooid(PQgetvalue(res, i, i_psnspcid)); + PublicationInfo *pubinfo; + NamespaceInfo *nspinfo; + + /* + * Ignore any entries for which we aren't interested in either the + * publication or the rel. + */ + pubinfo = findPublicationByOid(pspubid); + if (pubinfo == NULL) + continue; + nspinfo = findNamespaceByOid(psnspcid); + if (nspinfo == NULL) + continue; + + /* + * Ignore publication membership of schema whose definitions are not + * to be dumped. + */ + if (!(nspinfo->dobj.dump & DUMP_COMPONENT_PUBSCHEMA)) + continue; + + /* OK, make a DumpableObject for this relationship */ + pubsinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA; + pubsinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&pubsinfo[j].dobj); + pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace; + pubsinfo[j].dobj.name = nspinfo->dobj.name; + pubsinfo[j].publication = pubinfo; + pubsinfo[j].pubschema = nspinfo; + + /* Decide whether we want to dump it */ + selectDumpablePublicationTable(&(pubsinfo[j].dobj), fout); + + j++; + } + + PQclear(res); + destroyPQExpBuffer(query); +} + /* * getPublicationTables * get information about publication membership for dumpable tables. @@ -4212,6 +4308,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) destroyPQExpBuffer(query); } +/* + * dumpPublicationSchema + * dump the definition of the given publication tables in schema mapping + */ +static void +dumpPublicationSchema(Archive *fout, const PublicationSchemaInfo *pubsinfo) +{ + NamespaceInfo *schemainfo = pubsinfo->pubschema; + PublicationInfo *pubinfo = pubsinfo->publication; + PQExpBuffer query; + char *tag; + + if (!(pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + return; + + tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name)); + appendPQExpBuffer(query, "ADD ALL TABLES IN SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + /* + * There is no point in creating drop query as the drop is done by schema + * drop. + */ + ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = schemainfo->dobj.name, + .owner = pubinfo->rolname, + .description = "PUBLICATION TABLES IN SCHEMA", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + free(tag); + destroyPQExpBuffer(query); +} + /* * dumpPublicationTable * dump the definition of the given publication table mapping @@ -10479,6 +10613,9 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj) case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; + case DO_PUBLICATION_SCHEMA: + dumpPublicationSchema(fout, (const PublicationSchemaInfo *) dobj); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; @@ -18734,6 +18871,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_POLICY: case DO_PUBLICATION: case DO_PUBLICATION_REL: + case DO_PUBLICATION_SCHEMA: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index 29af845ece..e19e09726e 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_POLICY, DO_PUBLICATION, DO_PUBLICATION_REL, + DO_PUBLICATION_SCHEMA, DO_SUBSCRIPTION } DumpableObjectType; @@ -94,6 +95,7 @@ typedef uint32 DumpComponents; /* a bitmask of dump object components */ #define DUMP_COMPONENT_ACL (1 << 4) #define DUMP_COMPONENT_POLICY (1 << 5) #define DUMP_COMPONENT_USERMAP (1 << 6) +#define DUMP_COMPONENT_PUBSCHEMA (1 << 7) #define DUMP_COMPONENT_ALL (0xFFFF) /* @@ -631,6 +633,17 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; } PublicationRelInfo; +/* + * The PublicationSchemaInfo struct is used to represent publication tables + * in schema mapping. + */ +typedef struct _PublicationSchemaInfo +{ + DumpableObject dobj; + NamespaceInfo *pubschema; + PublicationInfo *publication; +} PublicationSchemaInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -737,6 +750,8 @@ extern PublicationInfo *getPublications(Archive *fout, int *numPublications); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); +extern void getPublicationNamespaces(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas); extern void getSubscriptions(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 46461fb6a1..200cc3edb5 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -82,6 +82,7 @@ enum dbObjectTypePriorities PRIO_POLICY, PRIO_PUBLICATION, PRIO_PUBLICATION_REL, + PRIO_PUBLICATION_SCHEMA, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] = PRIO_POLICY, /* DO_POLICY */ PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ + PRIO_PUBLICATION_SCHEMA, /* DO_PUBLICATION_SCHEMA */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION TABLE (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_SCHEMA: + snprintf(buf, bufsize, + "PUBLICATION TABLES IN SCHEMA (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 90ff649be7..252624efd8 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -3147,17 +3147,39 @@ describeOneTableDetails(const char *schemaname, /* print any publications */ if (pset.sversion >= 100000) { - printfPQExpBuffer(&buf, - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - "WHERE pr.prrelid = '%s'\n" - "UNION ALL\n" - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid); + if (pset.sversion >= 150000) + { + printfPQExpBuffer(&buf, + "SELECT p.pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_schema ps ON p.oid = ps.pspubid\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s'\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid); + } + else + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid); + } result = PSQLexec(buf.data); if (!result) @@ -5021,6 +5043,8 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; + int pub_schema_tuples = 0; + char **footers = NULL; initPQExpBuffer(&buf); printfPQExpBuffer(&buf, @@ -5061,9 +5085,66 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) myopt.title = _("List of schemas"); myopt.translate_header = true; + if (pattern && pset.sversion >= 150000) + { + PGresult *result; + int i; + + printfPQExpBuffer(&buf, + "SELECT p.pubname FROM pg_catalog.pg_publication p,\n" + "pg_catalog.pg_namespace n,\n" + "pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid AND\n" + "p.oid = ps.pspubid AND n.nspname = '%s'\n" + "ORDER BY 1", + pattern); + result = PSQLexec(buf.data); + if (!result) + return true; + else + pub_schema_tuples = PQntuples(result); + + if (pub_schema_tuples > 0) + { + /* + * Allocate memory for footers. Size of footers will be 1 (for + * storing "Publications:" string) + Schema count + 1 (for + * storing NULL). + */ + footers = (char **) palloc((1 + pub_schema_tuples + 1) * sizeof(char *)); + footers[0] = pstrdup(_("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < pub_schema_tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + footers[i + 1] = pstrdup(buf.data); + } + + footers[i + 1] = NULL; + myopt.footers = footers; + } + + PQclear(result); + } + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); PQclear(res); + + /* Free the memory allocated for the footer */ + if (footers) + { + char **footer = NULL; + + for (footer = footers; *footer; footer++) + pfree(*footer); + + pfree(footers); + } + return true; } @@ -6210,6 +6291,42 @@ listPublications(const char *pattern) return true; } +/* + * Add footer to publication description. + */ +static bool +addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, + bool singlecol, printTableContent *cont) +{ + PGresult *res; + int count = 0; + int i = 0; + + res = PSQLexec(buf->data); + if (!res) + return false; + else + count = PQntuples(res); + + if (count > 0) + printTableAddFooter(cont, _(footermsg)); + + for (i = 0; i < count; i++) + { + if (!singlecol) + printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + else + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + + printTableAddFooter(cont, buf->data); + } + + PQclear(res); + termPQExpBuffer(buf); + return true; +} + /* * \dRp+ * Describes publications including the contents. @@ -6225,6 +6342,9 @@ describePublications(const char *pattern) bool has_pubtruncate; bool has_pubviaroot; + PQExpBufferData title; + printTableContent cont; + if (pset.sversion < 100000) { char sverbuf[32]; @@ -6287,15 +6407,10 @@ describePublications(const char *pattern) const char align = 'l'; int ncols = 5; int nrows = 1; - int tables = 0; - PGresult *tabres; char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; - int j; - PQExpBufferData title; printTableOpt myopt = pset.popt.topt; - printTableContent cont; if (has_pubtruncate) ncols++; @@ -6328,6 +6443,7 @@ describePublications(const char *pattern) if (!puballtables) { + /* Get the tables for the specified publication */ printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname\n" "FROM pg_catalog.pg_class c,\n" @@ -6337,31 +6453,20 @@ describePublications(const char *pattern) " AND c.oid = pr.prrelid\n" " AND pr.prpubid = '%s'\n" "ORDER BY 1,2", pubid); + if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont)) + goto error_return; - tabres = PSQLexec(buf.data); - if (!tabres) - { - printTableCleanup(&cont); - PQclear(res); - termPQExpBuffer(&buf); - termPQExpBuffer(&title); - return false; - } - else - tables = PQntuples(tabres); - - if (tables > 0) - printTableAddFooter(&cont, _("Tables:")); - - for (j = 0; j < tables; j++) - { - printfPQExpBuffer(&buf, " \"%s.%s\"", - PQgetvalue(tabres, j, 0), - PQgetvalue(tabres, j, 1)); - - printTableAddFooter(&cont, buf.data); - } - PQclear(tabres); + /* Get the schemas for the specified publication */ + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid\n" + " AND ps.pspubid = '%s'\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, "Tables from schemas:", + true, &cont)) + goto error_return; } printTable(&cont, pset.queryFout, false, pset.logfile); @@ -6374,6 +6479,13 @@ describePublications(const char *pattern) PQclear(res); return true; + +error_return: + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 75b867685a..133a5a73a0 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1644,10 +1644,19 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION */ else if (Matches("ALTER", "PUBLICATION", MatchAny)) - COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET"); + COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); + /* ALTER PUBLICATION ADD */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) + COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); + /* ALTER PUBLICATION DROP */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) + COMPLETE_WITH("ALL TABLES IN SCHEMA", "TABLE"); /* ALTER PUBLICATION SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "TABLE"); + COMPLETE_WITH("(", "ALL TABLES IN SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA'"); /* ALTER PUBLICATION SET ( */ else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); @@ -2688,17 +2697,26 @@ psql_completion(const char *text, int start, int end) /* CREATE PUBLICATION */ else if (Matches("CREATE", "PUBLICATION", MatchAny)) - COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "WITH ("); + COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR ALL TABLES IN SCHEMA", "WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR")) - COMPLETE_WITH("TABLE", "ALL TABLES"); + COMPLETE_WITH("TABLE", "ALL TABLES", "ALL TABLES IN SCHEMA"); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL")) - COMPLETE_WITH("TABLES"); - else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES") - || Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny)) + COMPLETE_WITH("TABLES", "TABLE IN SCHEMA"); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES")) + COMPLETE_WITH("IN SCHEMA", "WITH ("); + else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE", MatchAny)) COMPLETE_WITH("WITH ("); /* Complete "CREATE PUBLICATION FOR TABLE" with ", ..." */ else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); + /* + * Complete "CREATE PUBLICATION FOR ALL TABLES IN + * SCHEMA , ..." + */ + else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "ALL", "TABLES", "IN", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA' " + "UNION SELECT 'WITH ('"); /* Complete "CREATE PUBLICATION [...] WITH" */ else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("WITH", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd..e5e88d3a31 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -123,6 +123,7 @@ typedef enum ObjectClass OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ + OCLASS_PUBLICATION_SCHEMA, /* pg_publication_schema */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ } ObjectClass; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..508b663639 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -106,10 +106,18 @@ typedef enum PublicationPartOpt extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetAllSchemasPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); +extern List *GetSchemaPublicationRelations(Oid schemaOid, + PublicationPartOpt pub_partopt); extern bool is_publishable_relation(Relation rel); extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid, + bool if_not_exists); extern Oid get_publication_oid(const char *pubname, bool missing_ok); extern char *get_publication_name(Oid pubid, bool missing_ok); diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h new file mode 100644 index 0000000000..fc50655af1 --- /dev/null +++ b/src/include/catalog/pg_publication_schema.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_schema.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_schema) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_schema.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_SCHEMA_H +#define PG_PUBLICATION_SCHEMA_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_schema_d.h" + + +/* ---------------- + * pg_publication_schema definition. cpp turns this into + * typedef struct FormData_pg_publication_schema + * ---------------- + */ +CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId) +{ + Oid oid; /* oid */ + Oid pspubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid psnspcid BKI_LOOKUP(pg_class); /* Oid of the schema */ +} FormData_pg_publication_schema; + +/* ---------------- + * Form_pg_publication_schema corresponds to a pointer to a tuple with + * the format of pg_publication_schema relation. + * ---------------- + */ +typedef FormData_pg_publication_schema *Form_pg_publication_schema; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_schema_oid_index, 8902, PublicationSchemaObjectIndexId, on pg_publication_schema using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_schema_psnspcid_pspubid_index, 8903, PublicationSchemaPsnspcidPspubidIndexId, on pg_publication_schema using btree(psnspcid oid_ops, pspubid oid_ops)); + +#endif /* PG_PUBLICATION_SCHEMA_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index fa47d6d761..276f77fa8c 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -25,6 +25,7 @@ extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..af82a2fd7f 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -482,7 +482,9 @@ typedef enum NodeTag T_CTESearchClause, T_CTECycleClause, T_CommonTableExpr, + T_PublicationObjSpec, T_RoleSpec, + T_SchemaSpec, T_TriggerTransition, T_PartitionElem, T_PartitionSpec, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 7af13dee43..70146d1fa5 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -341,6 +341,45 @@ typedef struct RoleSpec int location; /* token location, or -1 if unknown */ } RoleSpec; +/* + * SchemaSpec - a schema name or CURRENT_SCHEMA + */ +typedef enum SchemaSpecType +{ + SCHEMASPEC_CSTRING, /* schema name is stored as a C string */ + SCHEMASPEC_CURRENT_SCHEMA /* schema spec is CURRENT_SCHEMA */ +} SchemaSpecType; + +typedef struct SchemaSpec +{ + NodeTag type; + SchemaSpecType schematype; /* type of this schemaspec */ + char *schemaname; /* filled only for SCHEMASPEC_CSTRING */ + int location; /* token location, or -1 if unknown */ +} SchemaSpec; + +/* + * Publication object type + */ +typedef enum PublicationObjSpecType +{ + PUBLICATIONOBJ_TABLE, /* Table type */ + PUBLICATIONOBJ_SCHEMA, /* Schema type */ + PUBLICATIONOBJ_UNKNOWN /* Unknown type */ +} PublicationObjSpecType; + +typedef struct PublicationObjSpec +{ + NodeTag type; + PublicationObjSpecType pubobjtype; /* type of this publication object */ + List *name; /* publication object name */ + bool inh; /* expand rel by inheritance? recursively act + * on children? */ + bool spl_rel_type_syn; /* true if it is special relation type + * syntax */ + int location; /* token location, or -1 if unknown */ +} PublicationObjSpec; + /* * FuncCall - a function or aggregate invocation * @@ -1805,6 +1844,7 @@ typedef enum ObjectType OBJECT_PROCEDURE, OBJECT_PUBLICATION, OBJECT_PUBLICATION_REL, + OBJECT_PUBLICATION_SCHEMA, OBJECT_ROLE, OBJECT_ROUTINE, OBJECT_RULE, @@ -3630,7 +3670,7 @@ typedef struct CreatePublicationStmt NodeTag type; char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ - List *tables; /* Optional list of tables to add */ + List *pubobjects; /* Optional list of publication objects */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3642,10 +3682,12 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... WITH */ List *options; /* List of DefElem nodes */ - /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ + /* ALTER PUBLICATION ... ADD/DROP TABLE/ALL TABLES IN SCHEMA parameters */ List *tables; /* List of tables to add/drop */ + List *schemas; /* List of schemas to add/drop/set */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + DefElemAction action; /* What action to perform with the + * tables/schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600..1ba295206a 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -79,6 +79,8 @@ enum SysCacheIdentifier PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, + PUBLICATIONSCHEMA, + PUBLICATIONSCHEMAMAP, RANGEMULTIRANGE, RANGETYPE, RELNAMENSP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cd..ddb421c394 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -260,6 +260,8 @@ NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} +NOTICE: checking pg_publication_schema {pspubid} => pg_publication {oid} +NOTICE: checking pg_publication_schema {psnspcid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4a5ef0bc24..23131de23f 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -60,15 +60,15 @@ CREATE TABLE testpub_tbl2 (id serial primary key, data text); -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +DETAIL: Tables cannot be added to, dropped from, or set on FOR ALL TABLES publications. -- fail - can't drop from all tables publication ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +DETAIL: Tables cannot be added to, dropped from, or set on FOR ALL TABLES publications. -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +DETAIL: Tables cannot be added to, dropped from, or set on FOR ALL TABLES publications. SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; pubname | puballtables ----------------------+-------------- diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff53..3b4f62025f 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -141,6 +141,7 @@ pg_policy|t pg_proc|t pg_publication|t pg_publication_rel|t +pg_publication_schema|t pg_range|t pg_replication_origin|t pg_rewrite|t diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index f31a1e4e1e..fc0deec206 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -779,6 +779,7 @@ FormData_pg_policy FormData_pg_proc FormData_pg_publication FormData_pg_publication_rel +FormData_pg_publication_schema FormData_pg_range FormData_pg_replication_origin FormData_pg_rewrite @@ -835,6 +836,7 @@ Form_pg_policy Form_pg_proc Form_pg_publication Form_pg_publication_rel +Form_pg_publication_schema Form_pg_range Form_pg_replication_origin Form_pg_rewrite @@ -2045,8 +2047,11 @@ PsqlSettings Publication PublicationActions PublicationInfo +PublicationObjSpec +PublicationObjSpecType PublicationPartOpt PublicationRelInfo +PublicationSchemaInfo PullFilter PullFilterOps PushFilter @@ -2331,6 +2336,8 @@ ScanState ScanTypeControl ScannerCallbackState SchemaQuery +SchemaSpec +SchemaSpecType SecBuffer SecBufferDesc SecLabelItem -- 2.30.2