From 31742279f4f2c846cdd952487c136a5e354d9990 Mon Sep 17 00:00:00 2001 From: Vigneshwaran c Date: Mon, 26 Jul 2021 09:25:22 +0530 Subject: [PATCH v20 1/2] Added schema level support for publication. This patch adds schema-level support for publication. A new schema option allows one or more schemas to be specified, whose tables are selected by the publisher for sending the data to the subscriber. pg_publication maintains information about the publication. Previously, the "puballtables" bool column was used to indicate if the publication was the "FOR ALL TABLES" kind (if true) or the "FOR TABLE" kind (if false). With the introduction of the "FOR SCHEMA" publication kind, it is not easy to determine the publication kind. Therefore, a new column "pubkind" has been added to the pg_publication relation to indicate the publication kind. There was the possibility of avoiding addition of this new column, but that would require checking puballtables of pg_publication and checking pg_publication_rel for table kind publication and then checking pg_publication_schema for schema kind publication. Instead, I preferred to add the "pubkind" column, which makes things easier, and also will help support new options in the future. A new system table "pg_publication_schema" has been added, to maintain the schemas that the user wants to publish through the publication. The schema/publication/publication_schema dependency was created to handle the corresponding renaming/removal of schemas to the publication/publication_schema when the schema is renamed/dropped. The Decoder identifies if the relation is part of the publication and replicates it to the subscriber. Changes were made to pg_dump to handle pubkind updation in the pg_publication table when the database is upgraded. Prototypes present in pg_publication.h have been moved to publicationcmds.h so that minimal data structures are exported to pg_dump and psql clients, as the rest of the information need not be exported. CATALOG_VERSION_NO needs to be updated while committing, as this feature involves a catalog change. --- src/backend/catalog/Makefile | 4 +- src/backend/catalog/aclchk.c | 2 + src/backend/catalog/dependency.c | 9 + src/backend/catalog/objectaddress.c | 142 ++++++++ src/backend/catalog/pg_publication.c | 333 ++++++++++++++++-- src/backend/commands/alter.c | 1 + src/backend/commands/event_trigger.c | 4 + src/backend/commands/publicationcmds.c | 370 ++++++++++++++++++-- src/backend/commands/seclabel.c | 1 + src/backend/commands/tablecmds.c | 5 +- src/backend/nodes/copyfuncs.c | 2 +- src/backend/nodes/equalfuncs.c | 2 +- src/backend/parser/gram.y | 126 +++++-- src/backend/replication/pgoutput/pgoutput.c | 26 +- src/backend/utils/cache/relcache.c | 5 + src/backend/utils/cache/syscache.c | 23 ++ src/bin/pg_dump/common.c | 3 + src/bin/pg_dump/pg_backup_archiver.c | 3 +- src/bin/pg_dump/pg_dump.c | 163 ++++++++- src/bin/pg_dump/pg_dump.h | 16 + src/bin/pg_dump/pg_dump_sort.c | 7 + src/bin/psql/describe.c | 276 ++++++++++++--- src/bin/psql/tab-complete.c | 22 +- src/include/catalog/dependency.h | 1 + src/include/catalog/pg_publication.h | 64 ++-- src/include/catalog/pg_publication_schema.h | 47 +++ src/include/commands/publicationcmds.h | 53 +++ src/include/nodes/nodes.h | 1 + src/include/nodes/parsenodes.h | 24 +- src/include/utils/rel.h | 1 + src/include/utils/syscache.h | 2 + src/test/regress/expected/oidjoins.out | 2 + src/test/regress/expected/publication.out | 112 +++--- src/test/regress/expected/sanity_check.out | 1 + src/test/regress/sql/publication.sql | 2 +- src/tools/pgindent/typedefs.list | 5 + 36 files changed, 1611 insertions(+), 249 deletions(-) create mode 100644 src/include/catalog/pg_publication_schema.h diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile index d297e77361..b2ee87b105 100644 --- a/src/backend/catalog/Makefile +++ b/src/backend/catalog/Makefile @@ -68,8 +68,8 @@ CATALOG_HEADERS := \ pg_foreign_table.h pg_policy.h pg_replication_origin.h \ pg_default_acl.h pg_init_privs.h pg_seclabel.h pg_shseclabel.h \ pg_collation.h pg_partitioned_table.h pg_range.h pg_transform.h \ - pg_sequence.h pg_publication.h pg_publication_rel.h pg_subscription.h \ - pg_subscription_rel.h + pg_sequence.h pg_publication.h pg_publication_rel.h pg_publication_schema.h \ + pg_subscription.h pg_subscription_rel.h GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h diff --git a/src/backend/catalog/aclchk.c b/src/backend/catalog/aclchk.c index 89792b154e..09d7f1a5ea 100644 --- a/src/backend/catalog/aclchk.c +++ b/src/backend/catalog/aclchk.c @@ -3428,6 +3428,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_TABCONSTRAINT: @@ -3567,6 +3568,7 @@ aclcheck_error(AclResult aclerr, ObjectType objtype, case OBJECT_DEFACL: case OBJECT_DOMCONSTRAINT: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_TRANSFORM: case OBJECT_TSPARSER: diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c index 76b65e39c4..d974750473 100644 --- a/src/backend/catalog/dependency.c +++ b/src/backend/catalog/dependency.c @@ -50,6 +50,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -180,6 +181,7 @@ static const Oid object_classes[] = { PolicyRelationId, /* OCLASS_POLICY */ PublicationRelationId, /* OCLASS_PUBLICATION */ PublicationRelRelationId, /* OCLASS_PUBLICATION_REL */ + PublicationSchemaRelationId, /* OCLASS_PUBLICATION_SCHEMA */ SubscriptionRelationId, /* OCLASS_SUBSCRIPTION */ TransformRelationId /* OCLASS_TRANSFORM */ }; @@ -1460,6 +1462,10 @@ doDeletion(const ObjectAddress *object, int flags) RemovePublicationRelById(object->objectId); break; + case OCLASS_PUBLICATION_SCHEMA: + RemovePublicationSchemaById(object->objectId); + break; + case OCLASS_CAST: case OCLASS_COLLATION: case OCLASS_CONVERSION: @@ -2853,6 +2859,9 @@ getObjectClass(const ObjectAddress *object) case PublicationRelRelationId: return OCLASS_PUBLICATION_REL; + case PublicationSchemaRelationId: + return OCLASS_PUBLICATION_SCHEMA; + case SubscriptionRelationId: return OCLASS_SUBSCRIPTION; diff --git a/src/backend/catalog/objectaddress.c b/src/backend/catalog/objectaddress.c index 9882e549c4..4cf144eef0 100644 --- a/src/backend/catalog/objectaddress.c +++ b/src/backend/catalog/objectaddress.c @@ -49,6 +49,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_rewrite.h" #include "catalog/pg_statistic_ext.h" #include "catalog/pg_subscription.h" @@ -67,6 +68,7 @@ #include "commands/extension.h" #include "commands/policy.h" #include "commands/proclang.h" +#include "commands/publicationcmds.h" #include "commands/tablespace.h" #include "commands/trigger.h" #include "foreign/foreign.h" @@ -829,6 +831,10 @@ static const struct object_type_map { "publication relation", OBJECT_PUBLICATION_REL }, + /* OCLASS_PUBLICATION_SCHEMA */ + { + "publication schema", OBJECT_PUBLICATION_SCHEMA + }, /* OCLASS_SUBSCRIPTION */ { "subscription", OBJECT_SUBSCRIPTION @@ -875,6 +881,8 @@ static ObjectAddress get_object_address_usermapping(List *object, static ObjectAddress get_object_address_publication_rel(List *object, Relation *relp, bool missing_ok); +static ObjectAddress get_object_address_publication_schema(List *object, + bool missing_ok); static ObjectAddress get_object_address_defacl(List *object, bool missing_ok); static const ObjectPropertyType *get_object_property_data(Oid class_id); @@ -1118,6 +1126,10 @@ get_object_address(ObjectType objtype, Node *object, &relation, missing_ok); break; + case OBJECT_PUBLICATION_SCHEMA: + address = get_object_address_publication_schema(castNode(List, object), + missing_ok); + break; case OBJECT_DEFACL: address = get_object_address_defacl(castNode(List, object), missing_ok); @@ -1935,6 +1947,47 @@ get_object_address_publication_rel(List *object, return address; } +/* + * Find the ObjectAddress for a publication schema. The first element of + * the object parameter is the schema name, the second is the + * publication name. + */ +static ObjectAddress +get_object_address_publication_schema(List *object, bool missing_ok) +{ + ObjectAddress address; + char *pubname; + Publication *pub; + char *schemaname; + Oid schemaoid; + + ObjectAddressSet(address, PublicationSchemaRelationId, InvalidOid); + + /* Fetch schema name and publication name from input list */ + schemaname = strVal(linitial(object)); + pubname = strVal(lsecond(object)); + + schemaoid = get_namespace_oid(schemaname, false); + + /* Now look up the pg_publication tuple */ + pub = GetPublicationByName(pubname, missing_ok); + if (!pub) + return address; + + /* Find the publication schema mapping in syscache */ + address.objectId = + GetSysCacheOid2(PUBLICATIONSCHEMAMAP, Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pub->oid)); + if (!OidIsValid(address.objectId) && !missing_ok) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("publication schema \"%s\" in publication \"%s\" does not exist", + schemaname, pubname))); + + return address; +} + /* * Find the ObjectAddress for a default ACL. */ @@ -2207,6 +2260,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_CAST: case OBJECT_USER_MAPPING: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_DEFACL: case OBJECT_TRANSFORM: if (list_length(args) != 1) @@ -2299,6 +2353,7 @@ pg_get_object_address(PG_FUNCTION_ARGS) case OBJECT_PUBLICATION_REL: objnode = (Node *) list_make2(name, linitial(args)); break; + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_USER_MAPPING: objnode = (Node *) list_make2(linitial(name), linitial(args)); break; @@ -2847,6 +2902,49 @@ get_catalog_object_by_oid(Relation catalog, AttrNumber oidcol, Oid objectId) return tuple; } +/* + * getPublicationSchemaInfo + * + * Get publication name and schema name from the object address into pubname and + * nspname. Both pubname and nspname are palloc'd string which will be freed by + * the caller. + */ +static bool +getPublicationSchemaInfo(const ObjectAddress *object, bool missing_ok, + char **pubname, char **nspname) +{ + HeapTuple tup; + Form_pg_publication_schema psform; + + tup = SearchSysCache1(PUBLICATIONSCHEMA, + ObjectIdGetDatum(object->objectId)); + if (!HeapTupleIsValid(tup)) + { + if (!missing_ok) + elog(ERROR, "cache lookup failed for publication schema %u", + object->objectId); + return false; + } + + psform = (Form_pg_publication_schema) GETSTRUCT(tup); + *pubname = get_publication_name(psform->pspubid, false); + *nspname = get_namespace_name(psform->psnspcid); + if (!(*nspname)) + { + Oid psnspcid = psform->psnspcid; + + pfree(pubname); + ReleaseSysCache(tup); + if (!missing_ok) + elog(ERROR, "cache lookup failed for schema %u", + psnspcid); + return false; + } + + ReleaseSysCache(tup); + return true; +} + /* * getObjectDescription: build an object description for messages * @@ -3902,6 +4000,22 @@ getObjectDescription(const ObjectAddress *object, bool missing_ok) break; } + case OCLASS_PUBLICATION_SCHEMA: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, + &pubname, &nspname)) + break; + + appendStringInfo(&buffer, _("publication of schema %s in publication %s"), + nspname, pubname); + pfree(pubname); + pfree(nspname); + break; + } + case OCLASS_SUBSCRIPTION: { char *subname = get_subscription_name(object->objectId, @@ -4476,6 +4590,10 @@ getObjectTypeDescription(const ObjectAddress *object, bool missing_ok) appendStringInfoString(&buffer, "publication relation"); break; + case OCLASS_PUBLICATION_SCHEMA: + appendStringInfoString(&buffer, "publication schema"); + break; + case OCLASS_SUBSCRIPTION: appendStringInfoString(&buffer, "subscription"); break; @@ -5711,6 +5829,30 @@ getObjectIdentityParts(const ObjectAddress *object, break; } + case OCLASS_PUBLICATION_SCHEMA: + { + char *pubname; + char *nspname; + + if (!getPublicationSchemaInfo(object, missing_ok, &pubname, + &nspname)) + break; + appendStringInfo(&buffer, "%s in publication %s", + nspname, pubname); + + if (objargs) + *objargs = list_make1(pubname); + else + pfree(pubname); + + if (objname) + *objname = list_make1(nspname); + else + pfree(nspname); + + break; + } + case OCLASS_SUBSCRIPTION: { char *subname; diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 2a2fe03c13..760e3c762c 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -28,16 +28,18 @@ #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" +#include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -214,6 +216,96 @@ publication_add_relation(Oid pubid, Relation targetrel, return myself; } +/* + * Insert new publication / schema mapping. + */ +ObjectAddress +publication_add_schema(Oid pubid, Oid schemaoid, bool if_not_exists) +{ + Relation rel; + HeapTuple tup; + Datum values[Natts_pg_publication_schema]; + bool nulls[Natts_pg_publication_schema]; + Oid psschid; + Publication *pub = GetPublication(pubid); + List *schemaRels = NIL; + ObjectAddress myself, + referenced; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + /* + * Check for duplicates. Note that this does not really prevent + * duplicates, it's here just to provide nicer error message in common + * case. The real protection is the unique key on the catalog. + */ + if (SearchSysCacheExists2(PUBLICATIONSCHEMAMAP, ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid))) + { + table_close(rel, RowExclusiveLock); + + if (if_not_exists) + return InvalidObjectAddress; + + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_OBJECT), + errmsg("schema \"%s\" is already member of publication \"%s\"", + get_namespace_name(schemaoid), pub->name))); + } + + /* Can't be system namespace */ + if (IsCatalogNamespace(schemaoid) || IsToastNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a system schema", + get_namespace_name(schemaoid)), + errdetail("System schemas cannot be added to publications."))); + + /* Can't be temporary namespace */ + if (isAnyTempNamespace(schemaoid)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("\"%s\" is a temporary schema", + get_namespace_name(schemaoid)), + errdetail("Temporary schemas cannot be added to publications."))); + + /* Form a tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + psschid = GetNewOidWithIndex(rel, PublicationSchemaObjectIndexId, + Anum_pg_publication_schema_oid); + values[Anum_pg_publication_schema_oid - 1] = ObjectIdGetDatum(psschid); + values[Anum_pg_publication_schema_pspubid - 1] = + ObjectIdGetDatum(pubid); + values[Anum_pg_publication_schema_psnspcid - 1] = + ObjectIdGetDatum(schemaoid); + + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); + + /* Insert tuple into catalog */ + CatalogTupleInsert(rel, tup); + heap_freetuple(tup); + + ObjectAddressSet(myself, PublicationSchemaRelationId, psschid); + + /* Add dependency on the publication */ + ObjectAddressSet(referenced, PublicationRelationId, pubid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Add dependency on the schema */ + ObjectAddressSet(referenced, NamespaceRelationId, schemaoid); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + + /* Close the table */ + table_close(rel, RowExclusiveLock); + + schemaRels = GetSchemaPublicationRelations(schemaoid, PUBLICATION_PART_ALL); + InvalidatePublicationRels(schemaRels); + + return myself; +} + /* Gets list of publication oids for a relation */ List * GetRelationPublications(Oid relid) @@ -238,10 +330,47 @@ GetRelationPublications(Oid relid) return result; } +/* + * Gets the relations based on the publication partition option for a specified + * relation. + */ +static List * +GetPubPartitionOptionRelations(List *result, PublicationPartOpt pub_partopt, + Oid relid) +{ + if (get_rel_relkind(relid) == RELKIND_PARTITIONED_TABLE && + pub_partopt != PUBLICATION_PART_ROOT) + { + List *all_parts = find_all_inheritors(relid, NoLock, + NULL); + + if (pub_partopt == PUBLICATION_PART_ALL) + result = list_concat(result, all_parts); + else if (pub_partopt == PUBLICATION_PART_LEAF) + { + ListCell *lc; + + foreach(lc, all_parts) + { + Oid partOid = lfirst_oid(lc); + + if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) + result = lappend_oid(result, partOid); + } + } + else + Assert(false); + } + else + result = lappend_oid(result, relid); + + return result; +} + /* * Gets list of relation oids for a publication. * - * This should only be used for normal publications, the FOR ALL TABLES + * This should only be used FOR TABLE publications, the FOR ALL TABLES * should use GetAllTablesPublicationRelations(). */ List * @@ -270,36 +399,79 @@ GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt) Form_pg_publication_rel pubrel; pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); + result = GetPubPartitionOptionRelations(result, pub_partopt, + pubrel->prrelid); + } - if (get_rel_relkind(pubrel->prrelid) == RELKIND_PARTITIONED_TABLE && - pub_partopt != PUBLICATION_PART_ROOT) - { - List *all_parts = find_all_inheritors(pubrel->prrelid, NoLock, - NULL); + systable_endscan(scan); + table_close(pubrelsrel, AccessShareLock); - if (pub_partopt == PUBLICATION_PART_ALL) - result = list_concat(result, all_parts); - else if (pub_partopt == PUBLICATION_PART_LEAF) - { - ListCell *lc; + return result; +} - foreach(lc, all_parts) - { - Oid partOid = lfirst_oid(lc); +/* + * Gets the list of schema oids for a publication. + * + * This should only be used FOR SCHEMA publications. + */ +List * +GetPublicationSchemas(Oid pubid) +{ + List *result = NIL; + Relation pubschsrel; + ScanKeyData scankey; + SysScanDesc scan; + HeapTuple tup; - if (get_rel_relkind(partOid) != RELKIND_PARTITIONED_TABLE) - result = lappend_oid(result, partOid); - } - } - else - Assert(false); - } - else - result = lappend_oid(result, pubrel->prrelid); + /* Find all publications associated with the schema */ + pubschsrel = table_open(PublicationSchemaRelationId, AccessShareLock); + + ScanKeyInit(&scankey, + Anum_pg_publication_schema_pspubid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(pubid)); + + scan = systable_beginscan(pubschsrel, PublicationSchemaPsnspcidPspubidIndexId, + true, NULL, 1, &scankey); + while (HeapTupleIsValid(tup = systable_getnext(scan))) + { + Form_pg_publication_schema pubsch; + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + + result = lappend_oid(result, pubsch->psnspcid); } systable_endscan(scan); - table_close(pubrelsrel, AccessShareLock); + table_close(pubschsrel, AccessShareLock); + + return result; +} + + +/* + * Gets the list of FOR SCHEMA publication oids associated with a specified + * schema oid + */ +List * +GetSchemaPublications(Oid schemaid) +{ + List *result = NIL; + CatCList *pubschlist; + int i; + + /* Find all publications associated with the schema */ + pubschlist = SearchSysCacheList1(PUBLICATIONSCHEMAMAP, + ObjectIdGetDatum(schemaid)); + for (i = 0; i < pubschlist->n_members; i++) + { + HeapTuple tup = &pubschlist->members[i]->tuple; + Oid pubid = ((Form_pg_publication_schema) GETSTRUCT(tup))->pspubid; + + result = lappend_oid(result, pubid); + } + + ReleaseSysCacheList(pubschlist); return result; } @@ -320,9 +492,9 @@ GetAllTablesPublications(void) rel = table_open(PublicationRelationId, AccessShareLock); ScanKeyInit(&scankey, - Anum_pg_publication_puballtables, - BTEqualStrategyNumber, F_BOOLEQ, - BoolGetDatum(true)); + Anum_pg_publication_pubkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(PUBKIND_ALLTABLES)); scan = systable_beginscan(rel, InvalidOid, false, NULL, 1, &scankey); @@ -342,7 +514,7 @@ GetAllTablesPublications(void) } /* - * Gets list of all relation published by FOR ALL TABLES publication(s). + * Gets the list of relations published. * * If the publication publishes partition changes via their respective root * partitioned tables, we must exclude partitions in favor of including the @@ -404,6 +576,96 @@ GetAllTablesPublicationRelations(bool pubviaroot) return result; } +/* + * Gets list of relation oids for a specified schema. + */ +List * +GetSchemaPublicationRelations(Oid schemaOid, PublicationPartOpt pub_partopt) +{ + Relation classRel; + ScanKeyData key[3]; + TableScanDesc scan; + HeapTuple tuple; + List *result = NIL; + int keycount = 0; + + Assert(schemaOid != InvalidOid); + + classRel = table_open(RelationRelationId, AccessShareLock); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_RELATION)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaOid); + + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + Oid relid = relForm->oid; + + if (is_publishable_class(relid, relForm)) + result = lappend_oid(result, relid); + } + + table_endscan(scan); + + keycount = 0; + ScanKeyInit(&key[keycount++], + Anum_pg_class_relkind, + BTEqualStrategyNumber, F_CHAREQ, + CharGetDatum(RELKIND_PARTITIONED_TABLE)); + + ScanKeyInit(&key[keycount++], + Anum_pg_class_relnamespace, + BTEqualStrategyNumber, F_OIDEQ, + schemaOid); + + scan = table_beginscan_catalog(classRel, keycount, key); + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_class relForm = (Form_pg_class) GETSTRUCT(tuple); + + /* Skip the relations which are not publishable */ + if (!is_publishable_class(relForm->oid, relForm)) + continue; + + result = GetPubPartitionOptionRelations(result, pub_partopt, + relForm->oid); + } + + table_endscan(scan); + table_close(classRel, AccessShareLock); + return result; +} + +/* + * Gets the list of all relations published by FOR SCHEMA publication(s). + */ +List * +GetAllSchemasPublicationRelations(Oid puboid, PublicationPartOpt pub_partopt) +{ + List *result = NIL; + List *pubschemalist = GetPublicationSchemas(puboid); + ListCell *cell; + + foreach(cell, pubschemalist) + { + Oid schemaOid = lfirst_oid(cell); + List *schemaRels = NIL; + + schemaRels = GetSchemaPublicationRelations(schemaOid, pub_partopt); + result = list_concat(result, schemaRels); + } + + return result; +} + /* * Get publication using oid * @@ -425,12 +687,12 @@ GetPublication(Oid pubid) pub = (Publication *) palloc(sizeof(Publication)); pub->oid = pubid; pub->name = pstrdup(NameStr(pubform->pubname)); - pub->alltables = pubform->puballtables; pub->pubactions.pubinsert = pubform->pubinsert; pub->pubactions.pubupdate = pubform->pubupdate; pub->pubactions.pubdelete = pubform->pubdelete; pub->pubactions.pubtruncate = pubform->pubtruncate; pub->pubviaroot = pubform->pubviaroot; + pub->pubkind = pubform->pubkind; ReleaseSysCache(tup); @@ -530,13 +792,20 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) * replicated using leaf partition identity and schema, so we only * need those. */ - if (publication->alltables) + if (publication->pubkind == PUBKIND_ALLTABLES) tables = GetAllTablesPublicationRelations(publication->pubviaroot); - else + else if (publication->pubkind == PUBKIND_TABLE) tables = GetPublicationRelations(publication->oid, publication->pubviaroot ? PUBLICATION_PART_ROOT : PUBLICATION_PART_LEAF); + else if (publication->pubkind == PUBKIND_SCHEMA) + tables = GetAllSchemasPublicationRelations(publication->oid, + publication->pubviaroot ? + PUBLICATION_PART_ROOT : + PUBLICATION_PART_LEAF); + else + tables = NIL; funcctx->user_fctx = (void *) tables; MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c index 29249498a9..e7c27459d8 100644 --- a/src/backend/commands/alter.c +++ b/src/backend/commands/alter.c @@ -661,6 +661,7 @@ AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid, case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: /* ignore object types that don't have schema-qualified names */ diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c index 71612d577e..35f47d3253 100644 --- a/src/backend/commands/event_trigger.c +++ b/src/backend/commands/event_trigger.c @@ -974,6 +974,7 @@ EventTriggerSupportsObjectType(ObjectType obtype) case OBJECT_PROCEDURE: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROUTINE: case OBJECT_RULE: case OBJECT_SCHEMA: @@ -1051,6 +1052,7 @@ EventTriggerSupportsObjectClass(ObjectClass objclass) case OCLASS_POLICY: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: return true; @@ -2127,6 +2129,7 @@ stringify_grant_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: @@ -2209,6 +2212,7 @@ stringify_adefprivs_objtype(ObjectType objtype) case OBJECT_POLICY: case OBJECT_PUBLICATION: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_ROLE: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 8487eeb7e6..1de6c186b3 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -25,8 +25,10 @@ #include "catalog/objectaddress.h" #include "catalog/partition.h" #include "catalog/pg_inherits.h" +#include "catalog/pg_namespace.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_type.h" #include "commands/dbcommands.h" #include "commands/defrem.h" @@ -34,25 +36,26 @@ #include "commands/publicationcmds.h" #include "funcapi.h" #include "miscadmin.h" +#include "storage/lmgr.h" #include "utils/acl.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/catcache.h" #include "utils/fmgroids.h" -#include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" -/* Same as MAXNUMMESSAGES in sinvaladt.c */ -#define MAX_RELCACHE_INVAL_MSGS 4096 - static List *OpenTableList(List *tables); static void CloseTableList(List *rels); static void PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, AlterPublicationStmt *stmt); static void PublicationDropTables(Oid pubid, List *rels, bool missing_ok); +static void LockSchemaList(List *schemalist); +static void PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt); +static void PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok); static void parse_publication_options(ParseState *pstate, @@ -138,6 +141,47 @@ parse_publication_options(ParseState *pstate, } } +/* + * Convert the SchemaSpec list into an Oid list. + */ +static List * +ConvertSchemaSpecListToOidList(List *schemas) +{ + List *schemaoidlist = NIL; + ListCell *cell; + + foreach(cell, schemas) + { + SchemaSpec *schema = (SchemaSpec *) lfirst(cell); + Oid schemaoid; + List *search_path; + char *nspname; + + if (schema->schematype == SCHEMASPEC_CURRENT_SCHEMA) + { + search_path = fetch_search_path(false); + if (search_path == NIL) /* nothing valid in search_path? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + + schemaoid = linitial_oid(search_path); + nspname = get_namespace_name(schemaoid); + if (nspname == NULL) /* recently-deleted namespace? */ + ereport(ERROR, + errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("no schema has been selected")); + } + else + schemaoid = get_namespace_oid(schema->schemaname, false); + + /* Filter out duplicates if user specifies "sch1, sch1" */ + schemaoidlist = list_append_unique_oid(schemaoidlist, schemaoid); + } + + return schemaoidlist; +} + /* * Create new publication. */ @@ -168,6 +212,12 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be superuser to create FOR ALL TABLES publication"))); + /* FOR SCHEMA requires superuser */ + if (stmt->schemas && !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to create FOR SCHEMA publication"))); + rel = table_open(PublicationRelationId, RowExclusiveLock); /* Check if name is used */ @@ -198,8 +248,6 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) puboid = GetNewOidWithIndex(rel, PublicationObjectIndexId, Anum_pg_publication_oid); values[Anum_pg_publication_oid - 1] = ObjectIdGetDatum(puboid); - values[Anum_pg_publication_puballtables - 1] = - BoolGetDatum(stmt->for_all_tables); values[Anum_pg_publication_pubinsert - 1] = BoolGetDatum(pubactions.pubinsert); values[Anum_pg_publication_pubupdate - 1] = @@ -211,6 +259,15 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) values[Anum_pg_publication_pubviaroot - 1] = BoolGetDatum(publish_via_partition_root); + if (stmt->schemas) + values[Anum_pg_publication_pubkind - 1] = PUBKIND_SCHEMA; + else if (stmt->tables) + values[Anum_pg_publication_pubkind - 1] = PUBKIND_TABLE; + else if (stmt->for_all_tables) + values[Anum_pg_publication_pubkind - 1] = PUBKIND_ALLTABLES; + else + values[Anum_pg_publication_pubkind - 1] = PUBKIND_EMPTY; + tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); /* Insert tuple into catalog. */ @@ -224,6 +281,24 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) /* Make the changes visible. */ CommandCounterIncrement(); + if (stmt->schemas) + { + List *schemaoidlist = NIL; + + Assert(list_length(stmt->schemas) > 0); + + schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas); + + /* + * Schema lock is held until the publication is created to prevent + * concurrent schema deletion. No need to unlock the schemas, the locks + * will be released automatically at the end of create publication + * command. + */ + LockSchemaList(schemaoidlist); + PublicationAddSchemas(puboid, schemaoidlist, true, NULL); + } + if (stmt->tables) { List *rels; @@ -250,6 +325,34 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt) return myself; } +/* + * Update publication kind in pg_publication relation. + */ +static void +UpdatePublicationKindTupleValue(Relation rel, HeapTuple tup, char pubkind) +{ + bool nulls[Natts_pg_publication]; + bool replaces[Natts_pg_publication]; + Datum values[Natts_pg_publication]; + + + /* Everything ok, form a new tuple */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + values[Anum_pg_publication_pubkind - 1] = pubkind; + replaces[Anum_pg_publication_pubkind - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + CommandCounterIncrement(); +} + /* * Change options of a publication. */ @@ -310,37 +413,27 @@ AlterPublicationOptions(ParseState *pstate, AlterPublicationStmt *stmt, pubform = (Form_pg_publication) GETSTRUCT(tup); /* Invalidate the relcache. */ - if (pubform->puballtables) + if (pubform->pubkind == PUBKIND_ALLTABLES) { CacheInvalidateRelcacheAll(); } else { + List *relids = NIL; + /* * For any partitioned tables contained in the publication, we must * invalidate all partitions contained in the respective partition * trees, not just those explicitly mentioned in the publication. */ - List *relids = GetPublicationRelations(pubform->oid, - PUBLICATION_PART_ALL); - - /* - * We don't want to send too many individual messages, at some point - * it's cheaper to just reset whole relcache. - */ - if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS) - { - ListCell *lc; - - foreach(lc, relids) - { - Oid relid = lfirst_oid(lc); - - CacheInvalidateRelcacheByRelid(relid); - } - } - else - CacheInvalidateRelcacheAll(); + if (pubform->pubkind == PUBKIND_TABLE) + relids = GetPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + else if (pubform->pubkind == PUBKIND_SCHEMA) + relids = GetAllSchemasPublicationRelations(pubform->oid, + PUBLICATION_PART_ALL); + + InvalidatePublicationRels(relids); } ObjectAddressSet(obj, PublicationRelationId, pubform->oid); @@ -362,20 +455,31 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, Oid pubid = pubform->oid; /* Check that user is allowed to manipulate the publication tables. */ - if (pubform->puballtables) + if (pubform->pubkind == PUBKIND_ALLTABLES) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("publication \"%s\" is defined as FOR ALL TABLES", NameStr(pubform->pubname)), - errdetail("Tables cannot be added to or dropped from FOR ALL TABLES publications."))); + errdetail("Tables cannot be added, dropped or set on FOR ALL TABLES publications."))); + + if (pubform->pubkind == PUBKIND_SCHEMA) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR SCHEMA", + NameStr(pubform->pubname)), + errdetail("Tables cannot be added, dropped or set on FOR SCHEMA publications."))); Assert(list_length(stmt->tables) > 0); rels = OpenTableList(stmt->tables); - if (stmt->tableAction == DEFELEM_ADD) + if (stmt->action == DEFELEM_ADD) + { + if (pubform->pubkind == PUBKIND_EMPTY) + UpdatePublicationKindTupleValue(rel, tup, PUBKIND_TABLE); PublicationAddTables(pubid, rels, false, stmt); - else if (stmt->tableAction == DEFELEM_DROP) + } + else if (stmt->action == DEFELEM_DROP) PublicationDropTables(pubid, rels, false); else /* DEFELEM_SET */ { @@ -384,6 +488,9 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, List *delrels = NIL; ListCell *oldlc; + if (pubform->pubkind == PUBKIND_EMPTY) + UpdatePublicationKindTupleValue(rel, tup, PUBKIND_TABLE); + /* Calculate which relations to drop. */ foreach(oldlc, oldrelids) { @@ -426,11 +533,84 @@ AlterPublicationTables(AlterPublicationStmt *stmt, Relation rel, CloseTableList(rels); } +/* + * Alter the publication schemas. + * + * Add/Remove/Set the schemas to/from publication. + */ +static void +AlterPublicationSchemas(AlterPublicationStmt *stmt, Relation rel, HeapTuple tup) +{ + List *schemaoidlist = NIL; + Form_pg_publication pubform = (Form_pg_publication) GETSTRUCT(tup); + + /* Check that user is allowed to manipulate the publication tables */ + if (pubform->pubkind == PUBKIND_ALLTABLES) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR ALL TABLES", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added, dropped or set on FOR ALL TABLES publications."))); + + if (pubform->pubkind == PUBKIND_TABLE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("publication \"%s\" is defined as FOR TABLE", + NameStr(pubform->pubname)), + errdetail("Schemas cannot be added, dropped or set on FOR TABLE publications."))); + + if ((stmt->action == DEFELEM_ADD || stmt->action == DEFELEM_SET) && + !superuser()) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to add or set schemas"))); + + /* Convert the text list into oid list */ + schemaoidlist = ConvertSchemaSpecListToOidList(stmt->schemas); + + /* + * Schema lock is held until the publication is altered to prevent + * concurrent schema deletion. No need to unlock the schemas, the locks will + * be released automatically at the end of alter publication command. + */ + LockSchemaList(schemaoidlist); + if (stmt->action == DEFELEM_ADD) + { + if (pubform->pubkind == PUBKIND_EMPTY) + UpdatePublicationKindTupleValue(rel, tup, PUBKIND_SCHEMA); + PublicationAddSchemas(pubform->oid, schemaoidlist, false, stmt); + } + else if (stmt->action == DEFELEM_DROP) + PublicationDropSchemas(pubform->oid, schemaoidlist, false); + else /* DEFELEM_SET */ + { + List *oldschemaids = GetPublicationSchemas(pubform->oid); + List *delschemas = NIL; + + if (pubform->pubkind == PUBKIND_EMPTY) + UpdatePublicationKindTupleValue(rel, tup, PUBKIND_SCHEMA); + + /* Identify which schemas should be dropped */ + delschemas = list_difference_oid(oldschemaids, schemaoidlist); + + /* And drop them */ + PublicationDropSchemas(pubform->oid, delschemas, true); + + /* + * Don't bother calculating the difference for adding, we'll catch and + * skip existing ones when doing catalog update. + */ + PublicationAddSchemas(pubform->oid, schemaoidlist, true, stmt); + } + + return; +} + /* * Alter the existing publication. * - * This is dispatcher function for AlterPublicationOptions and - * AlterPublicationTables. + * This is dispatcher function for AlterPublicationOptions, + * AlterPublicationSchemas and AlterPublicationTables. */ void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) @@ -459,6 +639,8 @@ AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt) if (stmt->options) AlterPublicationOptions(pstate, stmt, rel, tup); + else if (stmt->schemas) + AlterPublicationSchemas(stmt, rel, tup); else AlterPublicationTables(stmt, rel, tup); @@ -497,6 +679,57 @@ RemovePublicationRelById(Oid proid) table_close(rel, RowExclusiveLock); } +/* + * Remove schema from publication by mapping OID. + */ +void +RemovePublicationSchemaById(Oid psoid) +{ + Relation rel; + HeapTuple tup; + List *schemaRels = NIL; + Form_pg_publication_schema pubsch; + + rel = table_open(PublicationSchemaRelationId, RowExclusiveLock); + + tup = SearchSysCache1(PUBLICATIONSCHEMA, ObjectIdGetDatum(psoid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "cache lookup failed for publication schema %u", psoid); + + pubsch = (Form_pg_publication_schema) GETSTRUCT(tup); + schemaRels = GetSchemaPublicationRelations(pubsch->psnspcid, + PUBLICATION_PART_ALL); + + /* Invalidate relcache so that publication info is rebuilt. */ + InvalidatePublicationRels(schemaRels); + + CatalogTupleDelete(rel, &tup->t_self); + + ReleaseSysCache(tup); + + table_close(rel, RowExclusiveLock); +} + +/* + * The schemas specified in the schema list are locked in AccessShareLock mode + * in order to add them to a publication. + */ +static void +LockSchemaList(List *schemalist) +{ + ListCell *lc; + foreach(lc, schemalist) + { + Oid schemaoid = lfirst_oid(lc); + + /* Allow query cancel in case this takes a long time */ + CHECK_FOR_INTERRUPTS(); + + LockDatabaseObject(NamespaceRelationId, schemaoid, 0, AccessShareLock); + } +} + /* * Open relations specified by a RangeVar list. * The returned tables are locked in ShareUpdateExclusiveLock mode in order to @@ -607,7 +840,7 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, { ListCell *lc; - Assert(!stmt || !stmt->for_all_tables); + Assert(!stmt || (!stmt->for_all_tables && !stmt->schemas)); foreach(lc, rels) { @@ -631,6 +864,39 @@ PublicationAddTables(Oid pubid, List *rels, bool if_not_exists, } } +/* + * Add listed schemas to the publication. + */ +static void +PublicationAddSchemas(Oid pubid, List *schemas, bool if_not_exists, + AlterPublicationStmt *stmt) +{ + ListCell *lc; + + Assert(!stmt || (!stmt->for_all_tables && !stmt->tables)); + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + ObjectAddress obj; + + /* Must be owner of the schema or superuser */ + if (!pg_namespace_ownercheck(schemaoid, GetUserId())) + aclcheck_error(ACLCHECK_NOT_OWNER, OBJECT_SCHEMA, + get_namespace_name(schemaoid)); + + obj = publication_add_schema(pubid, schemaoid, if_not_exists); + if (stmt) + { + EventTriggerCollectSimpleCommand(obj, InvalidObjectAddress, + (Node *) stmt); + + InvokeObjectPostCreateHook(PublicationSchemaRelationId, + obj.objectId, 0); + } + } +} + /* * Remove listed tables from the publication. */ @@ -665,6 +931,40 @@ PublicationDropTables(Oid pubid, List *rels, bool missing_ok) } } +/* + * Remove listed schemas from the publication. + */ +static void +PublicationDropSchemas(Oid pubid, List *schemas, bool missing_ok) +{ + ObjectAddress obj; + ListCell *lc; + Oid psid; + + foreach(lc, schemas) + { + Oid schemaoid = lfirst_oid(lc); + + psid = GetSysCacheOid2(PUBLICATIONSCHEMAMAP, + Anum_pg_publication_schema_oid, + ObjectIdGetDatum(schemaoid), + ObjectIdGetDatum(pubid)); + if (!OidIsValid(psid)) + { + if (missing_ok) + continue; + + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("schema \"%s\" is not part of the publication", + get_namespace_name(schemaoid)))); + } + + ObjectAddressSet(obj, PublicationSchemaRelationId, psid); + performDeletion(&obj, DROP_CASCADE, 0); + } +} + /* * Internal workhorse for changing a publication owner */ @@ -696,7 +996,7 @@ AlterPublicationOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId) aclcheck_error(aclresult, OBJECT_DATABASE, get_database_name(MyDatabaseId)); - if (form->puballtables && !superuser_arg(newOwnerId)) + if (form->pubkind == PUBKIND_ALLTABLES && !superuser_arg(newOwnerId)) ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("permission denied to change owner of publication \"%s\"", diff --git a/src/backend/commands/seclabel.c b/src/backend/commands/seclabel.c index ddc019cb39..accaf2ed2e 100644 --- a/src/backend/commands/seclabel.c +++ b/src/backend/commands/seclabel.c @@ -80,6 +80,7 @@ SecLabelSupportsObjectType(ObjectType objtype) case OBJECT_OPFAMILY: case OBJECT_POLICY: case OBJECT_PUBLICATION_REL: + case OBJECT_PUBLICATION_SCHEMA: case OBJECT_RULE: case OBJECT_STATISTIC_EXT: case OBJECT_TABCONSTRAINT: diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index b18de38e73..29a623f34d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -52,6 +52,7 @@ #include "commands/defrem.h" #include "commands/event_trigger.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/tablespace.h" @@ -12267,6 +12268,7 @@ ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel, case OCLASS_EVENT_TRIGGER: case OCLASS_PUBLICATION: case OCLASS_PUBLICATION_REL: + case OCLASS_PUBLICATION_SCHEMA: case OCLASS_SUBSCRIPTION: case OCLASS_TRANSFORM: @@ -15846,7 +15848,8 @@ ATPrepChangePersistence(Relation rel, bool toLogged) * UNLOGGED as UNLOGGED tables can't be published. */ if (!toLogged && - list_length(GetRelationPublications(RelationGetRelid(rel))) > 0) + (list_length(GetRelationPublications(RelationGetRelid(rel))) > 0 || + list_length(GetSchemaPublications(rel->rd_rel->relnamespace)) > 0)) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot change table \"%s\" to unlogged because it is part of a publication", diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 38251c2b8e..b0d19cabc3 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4823,7 +4823,7 @@ _copyAlterPublicationStmt(const AlterPublicationStmt *from) COPY_NODE_FIELD(options); COPY_NODE_FIELD(tables); COPY_SCALAR_FIELD(for_all_tables); - COPY_SCALAR_FIELD(tableAction); + COPY_SCALAR_FIELD(action); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index 8a1762000c..f4f7e896dd 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2309,7 +2309,7 @@ _equalAlterPublicationStmt(const AlterPublicationStmt *a, COMPARE_NODE_FIELD(options); COMPARE_NODE_FIELD(tables); COMPARE_SCALAR_FIELD(for_all_tables); - COMPARE_SCALAR_FIELD(tableAction); + COMPARE_SCALAR_FIELD(action); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 39a2849eba..6fac2a4c30 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -169,6 +169,7 @@ static Node *makeNullAConst(int location); static Node *makeAConst(Value *v, int location); static Node *makeBoolAConst(bool state, int location); static RoleSpec *makeRoleSpec(RoleSpecType type, int location); +static SchemaSpec *makeSchemaSpec(SchemaSpecType type, int location); static void check_qualified_name(List *names, core_yyscan_t yyscanner); static List *check_func_name(List *names, core_yyscan_t yyscanner); static List *check_indirection(List *indirection, core_yyscan_t yyscanner); @@ -257,6 +258,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); PartitionSpec *partspec; PartitionBoundSpec *partboundspec; RoleSpec *rolespec; + SchemaSpec *schemaspec; struct SelectLimit *selectlimit; SetQuantifier setquantifier; struct GroupClause *groupclause; @@ -426,14 +428,13 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); transform_element_list transform_type_list TriggerTransitions TriggerReferencing vacuum_relation_list opt_vacuum_relation_list - drop_option_list + drop_option_list schema_list %type opt_routine_body %type group_clause %type group_by_list %type group_by_item empty_grouping_set rollup_clause cube_clause %type grouping_sets_clause -%type opt_publication_for_tables publication_for_tables %type opt_fdw_options fdw_options %type fdw_option @@ -554,6 +555,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); %type createdb_opt_name plassign_target %type var_value zone_value %type auth_ident RoleSpec opt_granted_by +%type SchemaSpec %type unreserved_keyword type_func_name_keyword %type col_name_keyword reserved_keyword @@ -9591,45 +9593,68 @@ AlterOwnerStmt: ALTER AGGREGATE aggregate_with_argtypes OWNER TO RoleSpec /***************************************************************************** * - * CREATE PUBLICATION name [ FOR TABLE ] [ WITH options ] + * CREATE PUBLICATION name [WITH options] * + * CREATE PUBLICATION FOR ALL TABLES [WITH options] + * + * CREATE PUBLICATION FOR TABLE [WITH options] + * + * CREATE PUBLICATION FOR SCHEMA [WITH options] *****************************************************************************/ CreatePublicationStmt: - CREATE PUBLICATION name opt_publication_for_tables opt_definition + CREATE PUBLICATION name opt_definition { CreatePublicationStmt *n = makeNode(CreatePublicationStmt); n->pubname = $3; - n->options = $5; - if ($4 != NULL) - { - /* FOR TABLE */ - if (IsA($4, List)) - n->tables = (List *)$4; - /* FOR ALL TABLES */ - else - n->for_all_tables = true; - } + n->options = $4; $$ = (Node *)n; } - ; - -opt_publication_for_tables: - publication_for_tables { $$ = $1; } - | /* EMPTY */ { $$ = NULL; } - ; - -publication_for_tables: - FOR TABLE relation_expr_list + | CREATE PUBLICATION name FOR ALL TABLES opt_definition + { + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->for_all_tables = true; + $$ = (Node *)n; + } + | CREATE PUBLICATION name FOR TABLE relation_expr_list opt_definition { - $$ = (Node *) $3; + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->tables = (List *)$6; + $$ = (Node *)n; } - | FOR ALL TABLES + | CREATE PUBLICATION name FOR SCHEMA schema_list opt_definition { - $$ = (Node *) makeInteger(true); + CreatePublicationStmt *n = makeNode(CreatePublicationStmt); + n->pubname = $3; + n->options = $7; + n->schemas = (List *)$6; + $$ = (Node *)n; } ; +/* Schema specifications */ +SchemaSpec: ColId + { + SchemaSpec *n; + n = makeSchemaSpec(SCHEMASPEC_CSTRING, @1); + n->schemaname = pstrdup($1); + $$ = n; + } + | CURRENT_SCHEMA + { + $$ = makeSchemaSpec(SCHEMASPEC_CURRENT_SCHEMA, @1); + } + ; + +schema_list: SchemaSpec + { $$ = list_make1($1); } + | schema_list ',' SchemaSpec + { $$ = lappend($1, $3); } + ; /***************************************************************************** * @@ -9641,6 +9666,11 @@ publication_for_tables: * * ALTER PUBLICATION name SET TABLE table [, table2] * + * ALTER PUBLICATION name ADD SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name DROP SCHEMA schema [, schema2] + * + * ALTER PUBLICATION name SET SCHEMA schema [, schema2] *****************************************************************************/ AlterPublicationStmt: @@ -9656,7 +9686,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_ADD; + n->action = DEFELEM_ADD; $$ = (Node *)n; } | ALTER PUBLICATION name SET TABLE relation_expr_list @@ -9664,7 +9694,7 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_SET; + n->action = DEFELEM_SET; $$ = (Node *)n; } | ALTER PUBLICATION name DROP TABLE relation_expr_list @@ -9672,7 +9702,31 @@ AlterPublicationStmt: AlterPublicationStmt *n = makeNode(AlterPublicationStmt); n->pubname = $3; n->tables = $6; - n->tableAction = DEFELEM_DROP; + n->action = DEFELEM_DROP; + $$ = (Node *)n; + } + | ALTER PUBLICATION name ADD_P SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->action = DEFELEM_ADD; + $$ = (Node *)n; + } + | ALTER PUBLICATION name SET SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->action = DEFELEM_SET; + $$ = (Node *)n; + } + | ALTER PUBLICATION name DROP SCHEMA schema_list + { + AlterPublicationStmt *n = makeNode(AlterPublicationStmt); + n->pubname = $3; + n->schemas = $6; + n->action = DEFELEM_DROP; $$ = (Node *)n; } ; @@ -16621,6 +16675,20 @@ makeRoleSpec(RoleSpecType type, int location) return spec; } +/* + * makeSchemaSpec - Create a SchemaSpec with the given type and location + */ +static SchemaSpec * +makeSchemaSpec(SchemaSpecType type, int location) +{ + SchemaSpec *spec = makeNode(SchemaSpec); + + spec->schematype = type; + spec->location = location; + + return spec; +} + /* check_qualified_name --- check the result of qualified_name production * * It's easiest to let the grammar production for qualified_name allow diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 14d737fd93..d3f529de9a 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,7 +15,9 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_schema.h" #include "commands/defrem.h" +#include "commands/publicationcmds.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" @@ -1068,6 +1070,9 @@ init_rel_sync_cache(MemoryContext cachectx) CacheRegisterSyscacheCallback(PUBLICATIONRELMAP, rel_sync_cache_publication_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PUBLICATIONSCHEMAMAP, + rel_sync_cache_publication_cb, + (Datum) 0); } /* @@ -1146,7 +1151,15 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) /* Validate the entry */ if (!entry->replicate_valid) { + Oid schemaId = get_rel_namespace(relid); List *pubids = GetRelationPublications(relid); + + /* + * We don't acquire a lock on the namespace system table as we build + * the cache entry using a historic snapshot and all the later changes + * are absorbed while decoding WAL. + */ + List *schemaPubids = GetSchemaPublications(schemaId); ListCell *lc; Oid publish_as_relid = relid; @@ -1172,12 +1185,21 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Publication *pub = lfirst(lc); bool publish = false; - if (pub->alltables) + if (pub->pubkind == PUBKIND_ALLTABLES) { publish = true; if (pub->pubviaroot && am_partition) publish_as_relid = llast_oid(get_partition_ancestors(relid)); } + else if (pub->pubkind == PUBKIND_SCHEMA) + { + if (list_member_oid(schemaPubids, pub->oid)) + { + publish = true; + if (pub->pubviaroot && am_partition) + publish_as_relid = llast_oid(get_partition_ancestors(relid)); + } + } if (!publish) { @@ -1203,6 +1225,8 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) Oid ancestor = lfirst_oid(lc2); if (list_member_oid(GetRelationPublications(ancestor), + pub->oid) || + list_member_oid(GetSchemaPublications(get_rel_namespace(ancestor)), pub->oid)) { ancestor_published = true; diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c index 13d9994af3..2ec805eefe 100644 --- a/src/backend/utils/cache/relcache.c +++ b/src/backend/utils/cache/relcache.c @@ -66,6 +66,7 @@ #include "catalog/schemapg.h" #include "catalog/storage.h" #include "commands/policy.h" +#include "commands/publicationcmds.h" #include "commands/trigger.h" #include "miscadmin.h" #include "nodes/makefuncs.h" @@ -5447,6 +5448,7 @@ GetRelationPublicationActions(Relation relation) List *puboids; ListCell *lc; MemoryContext oldcxt; + Oid schemaid; PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); /* @@ -5478,6 +5480,9 @@ GetRelationPublicationActions(Relation relation) } puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); + schemaid = RelationGetNamespace(relation); + puboids = list_concat_unique_oid(puboids, GetSchemaPublications(schemaid)); + foreach(lc, puboids) { Oid pubid = lfirst_oid(lc); diff --git a/src/backend/utils/cache/syscache.c b/src/backend/utils/cache/syscache.c index d6cb78dea8..924b7bcad5 100644 --- a/src/backend/utils/cache/syscache.c +++ b/src/backend/utils/cache/syscache.c @@ -51,6 +51,7 @@ #include "catalog/pg_proc.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" +#include "catalog/pg_publication_schema.h" #include "catalog/pg_range.h" #include "catalog/pg_replication_origin.h" #include "catalog/pg_rewrite.h" @@ -650,6 +651,28 @@ static const struct cachedesc cacheinfo[] = { }, 64 }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMA */ + PublicationSchemaObjectIndexId, + 1, + { + Anum_pg_publication_schema_oid, + 0, + 0, + 0 + }, + 64 + }, + {PublicationSchemaRelationId, /* PUBLICATIONSCHEMAMAP */ + PublicationSchemaPsnspcidPspubidIndexId, + 2, + { + Anum_pg_publication_schema_psnspcid, + Anum_pg_publication_schema_pspubid, + 0, + 0 + }, + 64 + }, {RangeRelationId, /* RANGEMULTIRANGE */ RangeMultirangeTypidIndexId, 1, diff --git a/src/bin/pg_dump/common.c b/src/bin/pg_dump/common.c index 1f24e79665..3010485f47 100644 --- a/src/bin/pg_dump/common.c +++ b/src/bin/pg_dump/common.c @@ -257,6 +257,9 @@ getSchemaData(Archive *fout, int *numTablesPtr) pg_log_info("reading publication membership"); getPublicationTables(fout, tblinfo, numTables); + pg_log_info("reading publication schemas"); + getPublicationNamespaces(fout, nspinfo, numNamespaces); + pg_log_info("reading subscriptions"); getSubscriptions(fout); diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index ee06dc6822..8d97b13154 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2788,7 +2788,8 @@ _tocEntryRequired(TocEntry *te, teSection curSection, ArchiveHandle *AH) */ if (ropt->no_publications && (strcmp(te->desc, "PUBLICATION") == 0 || - strcmp(te->desc, "PUBLICATION TABLE") == 0)) + strcmp(te->desc, "PUBLICATION TABLE") == 0 || + strcmp(te->desc, "PUBLICATION SCHEMA") == 0)) return 0; /* If it's a security label, maybe ignore it */ diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 90ac445bcd..d0806b5a6e 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -51,6 +51,7 @@ #include "catalog/pg_largeobject_d.h" #include "catalog/pg_largeobject_metadata_d.h" #include "catalog/pg_proc_d.h" +#include "catalog/pg_publication.h" #include "catalog/pg_subscription.h" #include "catalog/pg_trigger_d.h" #include "catalog/pg_type_d.h" @@ -1630,9 +1631,13 @@ selectDumpableNamespace(NamespaceInfo *nsinfo, Archive *fout) if (nsinfo->nspowner == BOOTSTRAP_SUPERUSERID) nsinfo->dobj.dump &= ~DUMP_COMPONENT_DEFINITION; nsinfo->dobj.dump_contains = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; } else + { nsinfo->dobj.dump_contains = nsinfo->dobj.dump = DUMP_COMPONENT_ALL; + nsinfo->dobj.dump |= DUMP_COMPONENT_PUBSCHEMA; + } /* * In any case, a namespace can be excluded by an exclusion switch @@ -3950,6 +3955,7 @@ getPublications(Archive *fout, int *numPublications) int i_pubdelete; int i_pubtruncate; int i_pubviaroot; + int i_pubkind; int i, ntups; @@ -3964,25 +3970,37 @@ getPublications(Archive *fout, int *numPublications) resetPQExpBuffer(query); /* Get the publications. */ - if (fout->remoteVersion >= 130000) + if (fout->remoteVersion >= 150000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, p.pubviaroot " + "false AS puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, p.pubviaroot, p.pubkind " "FROM pg_publication p", username_subquery); + else if (fout->remoteVersion >= 130000) + appendPQExpBuffer(query, + "SELECT p.tableoid, p.oid, p.pubname, " + "(%s p.pubowner) AS rolname, " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, p.pubviaroot, " + "NULL AS pubkind FROM pg_publication p", + username_subquery); else if (fout->remoteVersion >= 110000) appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, p.pubtruncate, false AS pubviaroot " - "FROM pg_publication p", + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, p.pubtruncate, false AS pubviaroot, " + "NULL AS pubkind FROM pg_publication p", username_subquery); else appendPQExpBuffer(query, "SELECT p.tableoid, p.oid, p.pubname, " "(%s p.pubowner) AS rolname, " - "p.puballtables, p.pubinsert, p.pubupdate, p.pubdelete, false AS pubtruncate, false AS pubviaroot " + "p.puballtables, p.pubinsert, p.pubupdate, " + "p.pubdelete, false AS pubtruncate, " + "false AS pubviaroot, NULL AS pubkind " "FROM pg_publication p", username_subquery); @@ -4000,6 +4018,7 @@ getPublications(Archive *fout, int *numPublications) i_pubdelete = PQfnumber(res, "pubdelete"); i_pubtruncate = PQfnumber(res, "pubtruncate"); i_pubviaroot = PQfnumber(res, "pubviaroot"); + i_pubkind = PQfnumber(res, "pubkind"); pubinfo = pg_malloc(ntups * sizeof(PublicationInfo)); @@ -4024,6 +4043,7 @@ getPublications(Archive *fout, int *numPublications) (strcmp(PQgetvalue(res, i, i_pubtruncate), "t") == 0); pubinfo[i].pubviaroot = (strcmp(PQgetvalue(res, i, i_pubviaroot), "t") == 0); + pubinfo[i].pubkind = get_publication_kind(PQgetvalue(res, i, i_pubkind)); if (strlen(pubinfo[i].rolname) == 0) pg_log_warning("owner of publication \"%s\" appears to be invalid", @@ -4066,7 +4086,8 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) appendPQExpBuffer(query, "CREATE PUBLICATION %s", qpubname); - if (pubinfo->puballtables) + if ((fout->remoteVersion >= 150000 && pubinfo->pubkind == PUBKIND_ALLTABLES) || + (fout->remoteVersion < 150000 && pubinfo->puballtables)) appendPQExpBufferStr(query, " FOR ALL TABLES"); appendPQExpBufferStr(query, " WITH (publish = '"); @@ -4133,6 +4154,94 @@ dumpPublication(Archive *fout, const PublicationInfo *pubinfo) free(qpubname); } +/* + * getPublicationNamespaces + * get information about publication membership for dumpable schemas. + */ +void +getPublicationNamespaces(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas) +{ + PQExpBuffer query; + PGresult *res; + PublicationSchemaInfo *pubsinfo; + DumpOptions *dopt = fout->dopt; + int i_tableoid; + int i_oid; + int i_pspubid; + int i_psnspcid; + int i, + j, + ntups; + + if (dopt->no_publications || fout->remoteVersion < 150000) + return; + + query = createPQExpBuffer(); + + /* Collect all publication membership info. */ + appendPQExpBufferStr(query, + "SELECT tableoid, oid, pspubid, psnspcid " + "FROM pg_catalog.pg_publication_schema"); + res = ExecuteSqlQuery(fout, query->data, PGRES_TUPLES_OK); + + ntups = PQntuples(res); + + i_tableoid = PQfnumber(res, "tableoid"); + i_oid = PQfnumber(res, "oid"); + i_pspubid = PQfnumber(res, "pspubid"); + i_psnspcid = PQfnumber(res, "psnspcid"); + + /* this allocation may be more than we need */ + pubsinfo = pg_malloc(ntups * sizeof(PublicationSchemaInfo)); + j = 0; + + for (i = 0; i < ntups; i++) + { + Oid pspubid = atooid(PQgetvalue(res, i, i_pspubid)); + Oid psnspcid = atooid(PQgetvalue(res, i, i_psnspcid)); + PublicationInfo *pubinfo; + NamespaceInfo *nspinfo; + + /* + * Ignore any entries for which we aren't interested in either the + * publication or the rel. + */ + pubinfo = findPublicationByOid(pspubid); + if (pubinfo == NULL) + continue; + nspinfo = findNamespaceByOid(psnspcid); + if (nspinfo == NULL) + continue; + + /* + * Ignore publication membership of schema whose definitions are not + * to be dumped. + */ + if (!(nspinfo->dobj.dump & DUMP_COMPONENT_PUBSCHEMA)) + continue; + + /* OK, make a DumpableObject for this relationship */ + pubsinfo[j].dobj.objType = DO_PUBLICATION_SCHEMA; + pubsinfo[j].dobj.catId.tableoid = + atooid(PQgetvalue(res, i, i_tableoid)); + pubsinfo[j].dobj.catId.oid = atooid(PQgetvalue(res, i, i_oid)); + AssignDumpId(&pubsinfo[j].dobj); + pubsinfo[j].dobj.namespace = nspinfo->dobj.namespace; + pubsinfo[j].dobj.name = nspinfo->dobj.name; + pubsinfo[j].publication = pubinfo; + pubsinfo[j].pubschema = nspinfo; + + /* Decide whether we want to dump it */ + selectDumpablePublicationTable(&(pubsinfo[j].dobj), fout); + + j++; + } + + PQclear(res); + destroyPQExpBuffer(query); +} + /* * getPublicationTables * get information about publication membership for dumpable tables. @@ -4220,6 +4329,44 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) destroyPQExpBuffer(query); } +/* + * dumpPublicationSchema + * dump the definition of the given publication schema mapping + */ +static void +dumpPublicationSchema(Archive *fout, const PublicationSchemaInfo *pubsinfo) +{ + NamespaceInfo *schemainfo = pubsinfo->pubschema; + PublicationInfo *pubinfo = pubsinfo->publication; + PQExpBuffer query; + char *tag; + + if (!(pubsinfo->dobj.dump & DUMP_COMPONENT_DEFINITION)) + return; + + tag = psprintf("%s %s", pubinfo->dobj.name, schemainfo->dobj.name); + + query = createPQExpBuffer(); + + appendPQExpBuffer(query, "ALTER PUBLICATION %s ", fmtId(pubinfo->dobj.name)); + appendPQExpBuffer(query, "ADD SCHEMA %s;\n", fmtId(schemainfo->dobj.name)); + + /* + * There is no point in creating drop query as the drop is done by schema + * drop. + */ + ArchiveEntry(fout, pubsinfo->dobj.catId, pubsinfo->dobj.dumpId, + ARCHIVE_OPTS(.tag = tag, + .namespace = schemainfo->dobj.name, + .owner = pubinfo->rolname, + .description = "PUBLICATION SCHEMA", + .section = SECTION_POST_DATA, + .createStmt = query->data)); + + free(tag); + destroyPQExpBuffer(query); +} + /* * dumpPublicationTable * dump the definition of the given publication table mapping @@ -10445,6 +10592,9 @@ dumpDumpableObject(Archive *fout, const DumpableObject *dobj) case DO_PUBLICATION_REL: dumpPublicationTable(fout, (const PublicationRelInfo *) dobj); break; + case DO_PUBLICATION_SCHEMA: + dumpPublicationSchema(fout, (const PublicationSchemaInfo *) dobj); + break; case DO_SUBSCRIPTION: dumpSubscription(fout, (const SubscriptionInfo *) dobj); break; @@ -18693,6 +18843,7 @@ addBoundaryDependencies(DumpableObject **dobjs, int numObjs, case DO_POLICY: case DO_PUBLICATION: case DO_PUBLICATION_REL: + case DO_PUBLICATION_SCHEMA: case DO_SUBSCRIPTION: /* Post-data objects: must come after the post-data boundary */ addObjectDependency(dobj, postDataBound->dumpId); diff --git a/src/bin/pg_dump/pg_dump.h b/src/bin/pg_dump/pg_dump.h index f5e170e0db..daad2802cd 100644 --- a/src/bin/pg_dump/pg_dump.h +++ b/src/bin/pg_dump/pg_dump.h @@ -81,6 +81,7 @@ typedef enum DO_POLICY, DO_PUBLICATION, DO_PUBLICATION_REL, + DO_PUBLICATION_SCHEMA, DO_SUBSCRIPTION } DumpableObjectType; @@ -94,6 +95,7 @@ typedef uint32 DumpComponents; /* a bitmask of dump object components */ #define DUMP_COMPONENT_ACL (1 << 4) #define DUMP_COMPONENT_POLICY (1 << 5) #define DUMP_COMPONENT_USERMAP (1 << 6) +#define DUMP_COMPONENT_PUBSCHEMA (1 << 7) #define DUMP_COMPONENT_ALL (0xFFFF) /* @@ -616,6 +618,7 @@ typedef struct _PublicationInfo bool pubdelete; bool pubtruncate; bool pubviaroot; + char pubkind; } PublicationInfo; /* @@ -629,6 +632,17 @@ typedef struct _PublicationRelInfo TableInfo *pubtable; } PublicationRelInfo; +/* + * The PublicationSchemaInfo struct is used to represent publication schema + * mapping. + */ +typedef struct _PublicationSchemaInfo +{ + DumpableObject dobj; + NamespaceInfo *pubschema; + PublicationInfo *publication; +} PublicationSchemaInfo; + /* * The SubscriptionInfo struct is used to represent subscription. */ @@ -735,6 +749,8 @@ extern PublicationInfo *getPublications(Archive *fout, int *numPublications); extern void getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables); +extern void getPublicationNamespaces(Archive *fout, NamespaceInfo nspinfo[], + int numSchemas); extern void getSubscriptions(Archive *fout); #endif /* PG_DUMP_H */ diff --git a/src/bin/pg_dump/pg_dump_sort.c b/src/bin/pg_dump/pg_dump_sort.c index 46461fb6a1..13a6fcd660 100644 --- a/src/bin/pg_dump/pg_dump_sort.c +++ b/src/bin/pg_dump/pg_dump_sort.c @@ -82,6 +82,7 @@ enum dbObjectTypePriorities PRIO_POLICY, PRIO_PUBLICATION, PRIO_PUBLICATION_REL, + PRIO_PUBLICATION_SCHEMA, PRIO_SUBSCRIPTION, PRIO_DEFAULT_ACL, /* done in ACL pass */ PRIO_EVENT_TRIGGER, /* must be next to last! */ @@ -135,6 +136,7 @@ static const int dbObjectTypePriority[] = PRIO_POLICY, /* DO_POLICY */ PRIO_PUBLICATION, /* DO_PUBLICATION */ PRIO_PUBLICATION_REL, /* DO_PUBLICATION_REL */ + PRIO_PUBLICATION_SCHEMA, /* DO_PUBLICATION_SCHEMA */ PRIO_SUBSCRIPTION /* DO_SUBSCRIPTION */ }; @@ -1477,6 +1479,11 @@ describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) "PUBLICATION TABLE (ID %d OID %u)", obj->dumpId, obj->catId.oid); return; + case DO_PUBLICATION_SCHEMA: + snprintf(buf, bufsize, + "PUBLICATION SCHEMA (ID %d OID %u)", + obj->dumpId, obj->catId.oid); + return; case DO_SUBSCRIPTION: snprintf(buf, bufsize, "SUBSCRIPTION (ID %d OID %u)", diff --git a/src/bin/psql/describe.c b/src/bin/psql/describe.c index 8333558bda..f8b16887f4 100644 --- a/src/bin/psql/describe.c +++ b/src/bin/psql/describe.c @@ -19,6 +19,7 @@ #include "catalog/pg_cast_d.h" #include "catalog/pg_class_d.h" #include "catalog/pg_default_acl_d.h" +#include "catalog/pg_publication.h" #include "common.h" #include "common/logging.h" #include "describe.h" @@ -3147,17 +3148,40 @@ describeOneTableDetails(const char *schemaname, /* print any publications */ if (pset.sversion >= 100000) { - printfPQExpBuffer(&buf, - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" - "WHERE pr.prrelid = '%s'\n" - "UNION ALL\n" - "SELECT pubname\n" - "FROM pg_catalog.pg_publication p\n" - "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" - "ORDER BY 1;", - oid, oid); + if (pset.sversion >= 150000) + { + printfPQExpBuffer(&buf, + "SELECT p.pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_schema ps ON p.oid = ps.pspubid AND p.pubkind = 's'\n" + " JOIN pg_catalog.pg_class pc ON pc.relnamespace = ps.psnspcid AND pc.oid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + " JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE p.pubkind = 't' AND pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.pubkind = 'a' \n" + " AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid, oid); + } + else + { + printfPQExpBuffer(&buf, + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "JOIN pg_catalog.pg_publication_rel pr ON p.oid = pr.prpubid\n" + "WHERE pr.prrelid = '%s'\n" + "UNION ALL\n" + "SELECT pubname\n" + "FROM pg_catalog.pg_publication p\n" + "WHERE p.puballtables AND pg_catalog.pg_relation_is_publishable('%s')\n" + "ORDER BY 1;", + oid, oid); + } result = PSQLexec(buf.data); if (!result) @@ -5021,6 +5045,8 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) PQExpBufferData buf; PGresult *res; printQueryOpt myopt = pset.popt; + int pub_schema_tuples = 0; + char **footers = NULL; initPQExpBuffer(&buf); printfPQExpBuffer(&buf, @@ -5061,9 +5087,66 @@ listSchemas(const char *pattern, bool verbose, bool showSystem) myopt.title = _("List of schemas"); myopt.translate_header = true; + if (pattern && pset.sversion >= 150000) + { + PGresult *result; + int i; + + printfPQExpBuffer(&buf, + "SELECT p.pubname FROM pg_catalog.pg_publication p,\n" + "pg_catalog.pg_namespace n,\n" + "pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid AND\n" + "p.oid = ps.pspubid AND n.nspname = '%s'\n" + "ORDER BY 1", + pattern); + result = PSQLexec(buf.data); + if (!result) + return true; + else + pub_schema_tuples = PQntuples(result); + + if (pub_schema_tuples > 0) + { + /* + * Allocate memory for footers. Size of footers will be 1 (for + * storing "Publications:" string) + Schema count + 1 (for + * storing NULL). + */ + footers = (char **) palloc((1 + pub_schema_tuples + 1) * sizeof(char *)); + footers[0] = pstrdup(_("Publications:")); + + /* Might be an empty set - that's ok */ + for (i = 0; i < pub_schema_tuples; i++) + { + printfPQExpBuffer(&buf, " \"%s\"", + PQgetvalue(result, i, 0)); + + footers[i + 1] = pstrdup(buf.data); + } + + footers[i + 1] = NULL; + myopt.footers = footers; + } + + PQclear(result); + } + printQuery(res, &myopt, pset.queryFout, false, pset.logfile); PQclear(res); + + /* Free the memory allocated for the footer */ + if (footers) + { + char **footer = NULL; + + for (footer = footers; *footer; footer++) + pfree(*footer); + + pfree(footers); + } + return true; } @@ -6163,14 +6246,17 @@ listPublications(const char *pattern) printfPQExpBuffer(&buf, "SELECT pubname AS \"%s\",\n" - " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\",\n" - " puballtables AS \"%s\",\n" - " pubinsert AS \"%s\",\n" + " pg_catalog.pg_get_userbyid(pubowner) AS \"%s\"", + gettext_noop("Name"), + gettext_noop("Owner")); + if (pset.sversion < 150000) + appendPQExpBuffer(&buf, + ",\n puballtables AS \"%s\"", + gettext_noop("All tables")); + appendPQExpBuffer(&buf, + ",\n pubinsert AS \"%s\",\n" " pubupdate AS \"%s\",\n" " pubdelete AS \"%s\"", - gettext_noop("Name"), - gettext_noop("Owner"), - gettext_noop("All tables"), gettext_noop("Inserts"), gettext_noop("Updates"), gettext_noop("Deletes")); @@ -6182,6 +6268,19 @@ listPublications(const char *pattern) appendPQExpBuffer(&buf, ",\n pubviaroot AS \"%s\"", gettext_noop("Via root")); + if (pset.sversion >= 150000) + appendPQExpBuffer(&buf, + ",\n CASE pubkind " + " WHEN 'e' THEN '%s'" + " WHEN 'a' THEN '%s'" + " WHEN 't' THEN '%s'" + " WHEN 's' THEN '%s'" + " END as \"%s\"", + gettext_noop("empty"), + gettext_noop("all tables"), + gettext_noop("table"), + gettext_noop("schema"), + gettext_noop("Pub Kind")); appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6210,6 +6309,42 @@ listPublications(const char *pattern) return true; } +/* + * Add footer to publication description. + */ +static bool +addFooterToPublicationDesc(PQExpBuffer buf, char *footermsg, + bool singlecol, printTableContent *cont) +{ + PGresult *res; + int count = 0; + int i = 0; + + res = PSQLexec(buf->data); + if (!res) + return false; + else + count = PQntuples(res); + + if (count > 0) + printTableAddFooter(cont, _(footermsg)); + + for (i = 0; i < count; i++) + { + if (!singlecol) + printfPQExpBuffer(buf, " \"%s.%s\"", PQgetvalue(res, i, 0), + PQgetvalue(res, i, 1)); + else + printfPQExpBuffer(buf, " \"%s\"", PQgetvalue(res, i, 0)); + + printTableAddFooter(cont, buf->data); + } + + PQclear(res); + termPQExpBuffer(buf); + return true; +} + /* * \dRp+ * Describes publications including the contents. @@ -6224,6 +6359,10 @@ describePublications(const char *pattern) PGresult *res; bool has_pubtruncate; bool has_pubviaroot; + bool has_pubkind; + bool has_puballtables; + PQExpBufferData title; + printTableContent cont; if (pset.sversion < 100000) { @@ -6237,19 +6376,35 @@ describePublications(const char *pattern) has_pubtruncate = (pset.sversion >= 110000); has_pubviaroot = (pset.sversion >= 130000); + has_pubkind = (pset.sversion >= 150000); + has_puballtables = (pset.sversion < 150000); initPQExpBuffer(&buf); printfPQExpBuffer(&buf, "SELECT oid, pubname,\n" - " pg_catalog.pg_get_userbyid(pubowner) AS owner,\n" - " puballtables, pubinsert, pubupdate, pubdelete"); + " pg_catalog.pg_get_userbyid(pubowner) AS owner"); + + if (!has_puballtables) + appendPQExpBufferStr(&buf, + ", false as puballtables"); + else + appendPQExpBufferStr(&buf, + ", puballtables"); + + appendPQExpBufferStr(&buf, + ", pubinsert, pubupdate, pubdelete"); + if (has_pubtruncate) appendPQExpBufferStr(&buf, ", pubtruncate"); if (has_pubviaroot) appendPQExpBufferStr(&buf, ", pubviaroot"); + if (has_pubkind) + appendPQExpBufferStr(&buf, + ", pubkind"); + appendPQExpBufferStr(&buf, "\nFROM pg_catalog.pg_publication\n"); @@ -6285,29 +6440,33 @@ describePublications(const char *pattern) for (i = 0; i < PQntuples(res); i++) { const char align = 'l'; - int ncols = 5; + int ncols = 4; int nrows = 1; - int tables = 0; - PGresult *tabres; char *pubid = PQgetvalue(res, i, 0); char *pubname = PQgetvalue(res, i, 1); - bool puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; - int j; - PQExpBufferData title; + bool puballtables; + char pubkind = PUBKIND_EMPTY; printTableOpt myopt = pset.popt.topt; - printTableContent cont; if (has_pubtruncate) ncols++; if (has_pubviaroot) ncols++; + if (has_pubkind) + ncols++; + if (has_puballtables) + { + puballtables = strcmp(PQgetvalue(res, i, 3), "t") == 0; + ncols++; + } initPQExpBuffer(&title); printfPQExpBuffer(&title, _("Publication %s"), pubname); printTableInit(&cont, &myopt, title.data, ncols, nrows); printTableAddHeader(&cont, gettext_noop("Owner"), true, align); - printTableAddHeader(&cont, gettext_noop("All tables"), true, align); + if (has_puballtables) + printTableAddHeader(&cont, gettext_noop("All tables"), true, align); printTableAddHeader(&cont, gettext_noop("Inserts"), true, align); printTableAddHeader(&cont, gettext_noop("Updates"), true, align); printTableAddHeader(&cont, gettext_noop("Deletes"), true, align); @@ -6315,9 +6474,12 @@ describePublications(const char *pattern) printTableAddHeader(&cont, gettext_noop("Truncates"), true, align); if (has_pubviaroot) printTableAddHeader(&cont, gettext_noop("Via root"), true, align); + if (has_pubkind) + printTableAddHeader(&cont, gettext_noop("Pub kind"), true, align); printTableAddCell(&cont, PQgetvalue(res, i, 2), false, false); - printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); + if (has_puballtables) + printTableAddCell(&cont, PQgetvalue(res, i, 3), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 4), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 5), false, false); printTableAddCell(&cont, PQgetvalue(res, i, 6), false, false); @@ -6325,8 +6487,18 @@ describePublications(const char *pattern) printTableAddCell(&cont, PQgetvalue(res, i, 7), false, false); if (has_pubviaroot) printTableAddCell(&cont, PQgetvalue(res, i, 8), false, false); + if (has_pubkind) + { + char *kind = PQgetvalue(res, i, 9); - if (!puballtables) + pubkind = get_publication_kind(kind); + printTableAddCell(&cont, get_publication_kind_string(kind), false, + false); + } + + /* Prior to version 15 check was based on all tables */ + if ((has_pubkind && pubkind == PUBKIND_TABLE) || + (has_puballtables && !puballtables)) { printfPQExpBuffer(&buf, "SELECT n.nspname, c.relname\n" @@ -6337,31 +6509,20 @@ describePublications(const char *pattern) " AND c.oid = pr.prrelid\n" " AND pr.prpubid = '%s'\n" "ORDER BY 1,2", pubid); - - tabres = PSQLexec(buf.data); - if (!tabres) - { - printTableCleanup(&cont); - PQclear(res); - termPQExpBuffer(&buf); - termPQExpBuffer(&title); - return false; - } - else - tables = PQntuples(tabres); - - if (tables > 0) - printTableAddFooter(&cont, _("Tables:")); - - for (j = 0; j < tables; j++) - { - printfPQExpBuffer(&buf, " \"%s.%s\"", - PQgetvalue(tabres, j, 0), - PQgetvalue(tabres, j, 1)); - - printTableAddFooter(&cont, buf.data); - } - PQclear(tabres); + if (!addFooterToPublicationDesc(&buf, "Tables:", false, &cont)) + goto error_return; + } + else if (has_pubkind && pubkind == PUBKIND_SCHEMA) + { + printfPQExpBuffer(&buf, + "SELECT n.nspname\n" + "FROM pg_catalog.pg_namespace n,\n" + " pg_catalog.pg_publication_schema ps\n" + "WHERE n.oid = ps.psnspcid\n" + " AND ps.pspubid = '%s'\n" + "ORDER BY 1", pubid); + if (!addFooterToPublicationDesc(&buf, "Schemas:", true, &cont)) + goto error_return; } printTable(&cont, pset.queryFout, false, pset.logfile); @@ -6374,6 +6535,13 @@ describePublications(const char *pattern) PQclear(res); return true; + +error_return: + printTableCleanup(&cont); + PQclear(res); + termPQExpBuffer(&buf); + termPQExpBuffer(&title); + return false; } /* diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 0750f70273..ae9cd8fe9d 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -1640,10 +1640,19 @@ psql_completion(const char *text, int start, int end) /* ALTER PUBLICATION */ else if (Matches("ALTER", "PUBLICATION", MatchAny)) - COMPLETE_WITH("ADD TABLE", "DROP TABLE", "OWNER TO", "RENAME TO", "SET"); + COMPLETE_WITH("ADD", "DROP", "OWNER TO", "RENAME TO", "SET"); + /* ALTER PUBLICATION ADD */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD")) + COMPLETE_WITH("SCHEMA", "TABLE"); + /* ALTER PUBLICATION DROP */ + else if (Matches("ALTER", "PUBLICATION", MatchAny, "DROP")) + COMPLETE_WITH("SCHEMA", "TABLE"); /* ALTER PUBLICATION SET */ else if (Matches("ALTER", "PUBLICATION", MatchAny, "SET")) - COMPLETE_WITH("(", "TABLE"); + COMPLETE_WITH("(", "SCHEMA", "TABLE"); + else if (Matches("ALTER", "PUBLICATION", MatchAny, "ADD|DROP|SET", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA'"); /* ALTER PUBLICATION SET ( */ else if (HeadMatches("ALTER", "PUBLICATION", MatchAny) && TailMatches("SET", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); @@ -2641,15 +2650,20 @@ psql_completion(const char *text, int start, int end) /* CREATE PUBLICATION */ else if (Matches("CREATE", "PUBLICATION", MatchAny)) - COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "WITH ("); + COMPLETE_WITH("FOR TABLE", "FOR ALL TABLES", "FOR SCHEMA", "WITH ("); else if (Matches("CREATE", "PUBLICATION", MatchAny, "FOR")) - COMPLETE_WITH("TABLE", "ALL TABLES"); + COMPLETE_WITH("TABLE", "ALL TABLES", "SCHEMA"); /* Complete "CREATE PUBLICATION FOR TABLE , ..." */ else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "TABLE")) COMPLETE_WITH_SCHEMA_QUERY(Query_for_list_of_tables, NULL); /* Complete "CREATE PUBLICATION [...] WITH" */ else if (HeadMatches("CREATE", "PUBLICATION") && TailMatches("WITH", "(")) COMPLETE_WITH("publish", "publish_via_partition_root"); + /* Complete "CREATE PUBLICATION FOR SCHEMA , ..." */ + else if (HeadMatches("CREATE", "PUBLICATION", MatchAny, "FOR", "SCHEMA")) + COMPLETE_WITH_QUERY(Query_for_list_of_schemas + " UNION SELECT 'CURRENT_SCHEMA' " + "UNION SELECT 'WITH ('"); /* CREATE RULE */ /* Complete "CREATE [ OR REPLACE ] RULE " with "AS ON" */ diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h index 2885f35ccd..e5e88d3a31 100644 --- a/src/include/catalog/dependency.h +++ b/src/include/catalog/dependency.h @@ -123,6 +123,7 @@ typedef enum ObjectClass OCLASS_POLICY, /* pg_policy */ OCLASS_PUBLICATION, /* pg_publication */ OCLASS_PUBLICATION_REL, /* pg_publication_rel */ + OCLASS_PUBLICATION_SCHEMA, /* pg_publication_schema */ OCLASS_SUBSCRIPTION, /* pg_subscription */ OCLASS_TRANSFORM /* pg_transform */ } ObjectClass; diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index f332bad4d4..74ad444cb0 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -18,7 +18,6 @@ #define PG_PUBLICATION_H #include "catalog/genbki.h" -#include "catalog/objectaddress.h" #include "catalog/pg_publication_d.h" /* ---------------- @@ -34,12 +33,6 @@ CATALOG(pg_publication,6104,PublicationRelationId) Oid pubowner BKI_LOOKUP(pg_authid); /* publication owner */ - /* - * indicates that this is special publication which should encompass all - * tables in the database (except for the unlogged and temp ones) - */ - bool puballtables; - /* true if inserts are published */ bool pubinsert; @@ -54,6 +47,9 @@ CATALOG(pg_publication,6104,PublicationRelationId) /* true if partition changes are published using root schema */ bool pubviaroot; + + /* see PUBKIND_xxx constants below */ + char pubkind; } FormData_pg_publication; /* ---------------- @@ -78,15 +74,11 @@ typedef struct Publication { Oid oid; char *name; - bool alltables; bool pubviaroot; PublicationActions pubactions; + char pubkind; } Publication; -extern Publication *GetPublication(Oid pubid); -extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); -extern List *GetRelationPublications(Oid relid); - /*--------- * Expected values for pub_partopt parameter of GetRelationPublications(), * which allows callers to specify which partitions of partitioned tables @@ -103,16 +95,42 @@ typedef enum PublicationPartOpt PUBLICATION_PART_ALL, } PublicationPartOpt; -extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); -extern List *GetAllTablesPublications(void); -extern List *GetAllTablesPublicationRelations(bool pubviaroot); - -extern bool is_publishable_relation(Relation rel); -extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, - bool if_not_exists); - -extern Oid get_publication_oid(const char *pubname, bool missing_ok); -extern char *get_publication_name(Oid pubid, bool missing_ok); - +/* Publication kinds */ +#define PUBKIND_ALLTABLES 'a' /* all tables publication */ +#define PUBKIND_TABLE 't' /* table publication */ +#define PUBKIND_SCHEMA 's' /* schema publication */ +#define PUBKIND_EMPTY 'e' /* empty publication */ + +/* + * Return the publication kind. +*/ +static inline char +get_publication_kind(char *strpubkind) +{ + if (strcmp(strpubkind, "a") == 0) + return PUBKIND_ALLTABLES; + else if (strcmp(strpubkind, "t") == 0) + return PUBKIND_TABLE; + else if (strcmp(strpubkind, "s") == 0) + return PUBKIND_SCHEMA; + + return PUBKIND_EMPTY; +} + +/* + * Return the publication kind string. +*/ +static inline char* +get_publication_kind_string(char *strpubkind) +{ + if (strcmp(strpubkind, "a") == 0) + return "all tables"; + else if (strcmp(strpubkind, "t") == 0) + return "table"; + else if (strcmp(strpubkind, "s") == 0) + return "schema"; + + return "empty"; +} #endif /* PG_PUBLICATION_H */ diff --git a/src/include/catalog/pg_publication_schema.h b/src/include/catalog/pg_publication_schema.h new file mode 100644 index 0000000000..fc50655af1 --- /dev/null +++ b/src/include/catalog/pg_publication_schema.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_publication_schema.h + * definition of the system catalog for mappings between schemas and + * publications (pg_publication_schema) + * + * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/catalog/pg_publication_schema.h + * + * NOTES + * The Catalog.pm module reads this file and derives schema + * information. + * + *------------------------------------------------------------------------- + */ +#ifndef PG_PUBLICATION_SCHEMA_H +#define PG_PUBLICATION_SCHEMA_H + +#include "catalog/genbki.h" +#include "catalog/pg_publication_schema_d.h" + + +/* ---------------- + * pg_publication_schema definition. cpp turns this into + * typedef struct FormData_pg_publication_schema + * ---------------- + */ +CATALOG(pg_publication_schema,8901,PublicationSchemaRelationId) +{ + Oid oid; /* oid */ + Oid pspubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ + Oid psnspcid BKI_LOOKUP(pg_class); /* Oid of the schema */ +} FormData_pg_publication_schema; + +/* ---------------- + * Form_pg_publication_schema corresponds to a pointer to a tuple with + * the format of pg_publication_schema relation. + * ---------------- + */ +typedef FormData_pg_publication_schema *Form_pg_publication_schema; + +DECLARE_UNIQUE_INDEX_PKEY(pg_publication_schema_oid_index, 8902, PublicationSchemaObjectIndexId, on pg_publication_schema using btree(oid oid_ops)); +DECLARE_UNIQUE_INDEX(pg_publication_schema_psnspcid_pspubid_index, 8903, PublicationSchemaPsnspcidPspubidIndexId, on pg_publication_schema using btree(psnspcid oid_ops, pspubid oid_ops)); + +#endif /* PG_PUBLICATION_SCHEMA_H */ diff --git a/src/include/commands/publicationcmds.h b/src/include/commands/publicationcmds.h index a3fa2ac6cd..2326a946d8 100644 --- a/src/include/commands/publicationcmds.h +++ b/src/include/commands/publicationcmds.h @@ -16,13 +16,66 @@ #define PUBLICATIONCMDS_H #include "catalog/objectaddress.h" +#include "catalog/pg_publication.h" #include "parser/parse_node.h" +#include "utils/inval.h" + +/* Same as MAXNUMMESSAGES in sinvaladt.c */ +#define MAX_RELCACHE_INVAL_MSGS 4096 extern ObjectAddress CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt); extern void AlterPublication(ParseState *pstate, AlterPublicationStmt *stmt); extern void RemovePublicationRelById(Oid proid); +extern void RemovePublicationSchemaById(Oid psoid); extern ObjectAddress AlterPublicationOwner(const char *name, Oid newOwnerId); extern void AlterPublicationOwner_oid(Oid pubid, Oid newOwnerId); +extern Publication *GetPublication(Oid pubid); +extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); +extern List *GetRelationPublications(Oid relid); + +extern List *GetPublicationRelations(Oid pubid, PublicationPartOpt pub_partopt); +extern List *GetPublicationSchemas(Oid pubid); +extern List *GetSchemaPublications(Oid schemaid); +extern List *GetAllTablesPublications(void); +extern List *GetAllTablesPublicationRelations(bool pubviaroot); +extern List *GetAllSchemasPublicationRelations(Oid puboid, + PublicationPartOpt pub_partopt); +extern List *GetSchemaPublicationRelations(Oid schemaOid, + PublicationPartOpt pub_partopt); +extern bool is_publishable_relation(Relation rel); +extern ObjectAddress publication_add_relation(Oid pubid, Relation targetrel, + bool if_not_exists); +extern ObjectAddress publication_add_schema(Oid pubid, Oid schemaoid, + bool if_not_exists); + +extern Oid get_publication_oid(const char *pubname, bool missing_ok); +extern char *get_publication_name(Oid pubid, bool missing_ok); + +/* + * Invalidate the relations. + */ +static inline void +InvalidatePublicationRels(List *relids) +{ + /* + * We don't want to send too many individual messages, at some point + * it's cheaper to just reset whole relcache. + */ + if (list_length(relids) < MAX_RELCACHE_INVAL_MSGS) + { + ListCell *lc; + + foreach(lc, relids) + { + Oid relid = lfirst_oid(lc); + + CacheInvalidateRelcacheByRelid(relid); + } + } + else + CacheInvalidateRelcacheAll(); +} + #endif /* PUBLICATIONCMDS_H */ diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h index 6a4d82f0a8..89c48e3252 100644 --- a/src/include/nodes/nodes.h +++ b/src/include/nodes/nodes.h @@ -483,6 +483,7 @@ typedef enum NodeTag T_CTECycleClause, T_CommonTableExpr, T_RoleSpec, + T_SchemaSpec, T_TriggerTransition, T_PartitionElem, T_PartitionSpec, diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index e28248af32..d96aafd626 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -341,6 +341,23 @@ typedef struct RoleSpec int location; /* token location, or -1 if unknown */ } RoleSpec; +/* + * SchemaSpec - a schema name or CURRENT_SCHEMA + */ +typedef enum SchemaSpecType +{ + SCHEMASPEC_CSTRING, /* schema name is stored as a C string */ + SCHEMASPEC_CURRENT_SCHEMA /* schema spec is CURRENT_SCHEMA */ +} SchemaSpecType; + +typedef struct SchemaSpec +{ + NodeTag type; + SchemaSpecType schematype; /* type of this schemaspec */ + char *schemaname; /* filled only for SCHEMASPEC_CSTRING */ + int location; /* token location, or -1 if unknown */ +} SchemaSpec; + /* * FuncCall - a function or aggregate invocation * @@ -1805,6 +1822,7 @@ typedef enum ObjectType OBJECT_PROCEDURE, OBJECT_PUBLICATION, OBJECT_PUBLICATION_REL, + OBJECT_PUBLICATION_SCHEMA, OBJECT_ROLE, OBJECT_ROUTINE, OBJECT_RULE, @@ -3631,6 +3649,7 @@ typedef struct CreatePublicationStmt char *pubname; /* Name of the publication */ List *options; /* List of DefElem nodes */ List *tables; /* Optional list of tables to add */ + List *schemas; /* Optional list of schemas to add */ bool for_all_tables; /* Special publication for all tables in db */ } CreatePublicationStmt; @@ -3642,10 +3661,11 @@ typedef struct AlterPublicationStmt /* parameters used for ALTER PUBLICATION ... WITH */ List *options; /* List of DefElem nodes */ - /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE */ + /* parameters used for ALTER PUBLICATION ... ADD/DROP TABLE/SCHEMA */ List *tables; /* List of tables to add/drop */ + List *schemas; /* List of schemas to add/drop/set */ bool for_all_tables; /* Special publication for all tables in db */ - DefElemAction tableAction; /* What action to perform with the tables */ + DefElemAction action; /* What action to perform with the tables/schemas */ } AlterPublicationStmt; typedef struct CreateSubscriptionStmt diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index b4faa1c123..4415d9cd76 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -19,6 +19,7 @@ #include "catalog/pg_class.h" #include "catalog/pg_index.h" #include "catalog/pg_publication.h" +#include "catalog/objectaddress.h" #include "nodes/bitmapset.h" #include "partitioning/partdefs.h" #include "rewrite/prs2lock.h" diff --git a/src/include/utils/syscache.h b/src/include/utils/syscache.h index d74a348600..1ba295206a 100644 --- a/src/include/utils/syscache.h +++ b/src/include/utils/syscache.h @@ -79,6 +79,8 @@ enum SysCacheIdentifier PUBLICATIONOID, PUBLICATIONREL, PUBLICATIONRELMAP, + PUBLICATIONSCHEMA, + PUBLICATIONSCHEMAMAP, RANGEMULTIRANGE, RANGETYPE, RELNAMENSP, diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out index 1461e947cd..ddb421c394 100644 --- a/src/test/regress/expected/oidjoins.out +++ b/src/test/regress/expected/oidjoins.out @@ -260,6 +260,8 @@ NOTICE: checking pg_sequence {seqtypid} => pg_type {oid} NOTICE: checking pg_publication {pubowner} => pg_authid {oid} NOTICE: checking pg_publication_rel {prpubid} => pg_publication {oid} NOTICE: checking pg_publication_rel {prrelid} => pg_class {oid} +NOTICE: checking pg_publication_schema {pspubid} => pg_publication {oid} +NOTICE: checking pg_publication_schema {psnspcid} => pg_class {oid} NOTICE: checking pg_subscription {subdbid} => pg_database {oid} NOTICE: checking pg_subscription {subowner} => pg_authid {oid} NOTICE: checking pg_subscription_rel {srsubid} => pg_subscription {oid} diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index 4a5ef0bc24..67f843f465 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -30,20 +30,20 @@ ERROR: conflicting or redundant options LINE 1: ...ub_xxx WITH (publish_via_partition_root = 'true', publish_vi... ^ \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | f | t | f | f | f + List of publications + Name | Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub Kind +--------------------+--------------------------+---------+---------+---------+-----------+----------+---------- + testpib_ins_trunct | regress_publication_user | t | f | f | f | f | empty + testpub_default | regress_publication_user | f | t | f | f | f | empty (2 rows) ALTER PUBLICATION testpub_default SET (publish = 'insert, update, delete'); \dRp - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpib_ins_trunct | regress_publication_user | f | t | f | f | f | f - testpub_default | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub Kind +--------------------+--------------------------+---------+---------+---------+-----------+----------+---------- + testpib_ins_trunct | regress_publication_user | t | f | f | f | f | empty + testpub_default | regress_publication_user | t | t | t | f | f | empty (2 rows) --- adding tables @@ -60,19 +60,19 @@ CREATE TABLE testpub_tbl2 (id serial primary key, data text); -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables ADD TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +DETAIL: Tables cannot be added, dropped or set on FOR ALL TABLES publications. -- fail - can't drop from all tables publication ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. +DETAIL: Tables cannot be added, dropped or set on FOR ALL TABLES publications. -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; ERROR: publication "testpub_foralltables" is defined as FOR ALL TABLES -DETAIL: Tables cannot be added to or dropped from FOR ALL TABLES publications. -SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; - pubname | puballtables -----------------------+-------------- - testpub_foralltables | t +DETAIL: Tables cannot be added, dropped or set on FOR ALL TABLES publications. +SELECT pubname, pubkind FROM pg_publication WHERE pubname = 'testpub_foralltables'; + pubname | pubkind +----------------------+--------- + testpub_foralltables | a (1 row) \d+ testpub_tbl2 @@ -88,9 +88,9 @@ Publications: \dRp+ testpub_foralltables Publication testpub_foralltables - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | t | t | t | f | f | f + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+------------ + regress_publication_user | t | t | f | f | f | all tables (1 row) DROP TABLE testpub_tbl2; @@ -102,19 +102,19 @@ CREATE PUBLICATION testpub3 FOR TABLE testpub_tbl3; CREATE PUBLICATION testpub4 FOR TABLE ONLY testpub_tbl3; RESET client_min_messages; \dRp+ testpub3 - Publication testpub3 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub3 + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | t | f | table Tables: "public.testpub_tbl3" "public.testpub_tbl3a" \dRp+ testpub4 - Publication testpub4 - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub4 + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | t | f | table Tables: "public.testpub_tbl3" @@ -133,10 +133,10 @@ ALTER TABLE testpub_parted ATTACH PARTITION testpub_parted1 FOR VALUES IN (1); -- only parent is listed as being in publication, not the partition ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_forparted + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | t | f | table Tables: "public.testpub_parted" @@ -149,10 +149,10 @@ ALTER TABLE testpub_parted DETACH PARTITION testpub_parted1; UPDATE testpub_parted1 SET a = 1; ALTER PUBLICATION testpub_forparted SET (publish_via_partition_root = true); \dRp+ testpub_forparted - Publication testpub_forparted - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | t + Publication testpub_forparted + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | t | t | table Tables: "public.testpub_parted" @@ -172,10 +172,10 @@ ERROR: relation "testpub_tbl1" is already member of publication "testpub_fortbl CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; ERROR: publication "testpub_fortbl" already exists \dRp+ testpub_fortbl - Publication testpub_fortbl - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | t | f + Publication testpub_fortbl + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | t | f | table Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -213,10 +213,10 @@ Publications: "testpub_fortbl" \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | f | f | table Tables: "pub_test.testpub_nopk" "public.testpub_tbl1" @@ -260,10 +260,10 @@ DROP TABLE testpub_parted; DROP VIEW testpub_view; DROP TABLE testpub_tbl1; \dRp+ testpub_default - Publication testpub_default - Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ---------------------------+------------+---------+---------+---------+-----------+---------- - regress_publication_user | f | t | t | t | f | f + Publication testpub_default + Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub kind +--------------------------+---------+---------+---------+-----------+----------+---------- + regress_publication_user | t | t | t | f | f | table (1 row) -- fail - must be owner of publication @@ -273,20 +273,20 @@ ERROR: must be owner of publication testpub_default RESET ROLE; ALTER PUBLICATION testpub_default RENAME TO testpub_foo; \dRp testpub_foo - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root --------------+--------------------------+------------+---------+---------+---------+-----------+---------- - testpub_foo | regress_publication_user | f | t | t | t | f | f + List of publications + Name | Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub Kind +-------------+--------------------------+---------+---------+---------+-----------+----------+---------- + testpub_foo | regress_publication_user | t | t | t | f | f | table (1 row) -- rename back to keep the rest simple ALTER PUBLICATION testpub_foo RENAME TO testpub_default; ALTER PUBLICATION testpub_default OWNER TO regress_publication_user2; \dRp testpub_default - List of publications - Name | Owner | All tables | Inserts | Updates | Deletes | Truncates | Via root ------------------+---------------------------+------------+---------+---------+---------+-----------+---------- - testpub_default | regress_publication_user2 | f | t | t | t | f | f + List of publications + Name | Owner | Inserts | Updates | Deletes | Truncates | Via root | Pub Kind +-----------------+---------------------------+---------+---------+---------+-----------+----------+---------- + testpub_default | regress_publication_user2 | t | t | t | f | f | table (1 row) DROP PUBLICATION testpub_default; diff --git a/src/test/regress/expected/sanity_check.out b/src/test/regress/expected/sanity_check.out index 982b6aff53..3b4f62025f 100644 --- a/src/test/regress/expected/sanity_check.out +++ b/src/test/regress/expected/sanity_check.out @@ -141,6 +141,7 @@ pg_policy|t pg_proc|t pg_publication|t pg_publication_rel|t +pg_publication_schema|t pg_range|t pg_replication_origin|t pg_rewrite|t diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index d844075368..a5391f20b8 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -51,7 +51,7 @@ ALTER PUBLICATION testpub_foralltables DROP TABLE testpub_tbl2; -- fail - can't add to for all tables publication ALTER PUBLICATION testpub_foralltables SET TABLE pub_test.testpub_nopk; -SELECT pubname, puballtables FROM pg_publication WHERE pubname = 'testpub_foralltables'; +SELECT pubname, pubkind FROM pg_publication WHERE pubname = 'testpub_foralltables'; \d+ testpub_tbl2 \dRp+ testpub_foralltables diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 37cf4b2f76..fb5daa49eb 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -777,6 +777,7 @@ FormData_pg_policy FormData_pg_proc FormData_pg_publication FormData_pg_publication_rel +FormData_pg_publication_schema FormData_pg_range FormData_pg_replication_origin FormData_pg_rewrite @@ -833,6 +834,7 @@ Form_pg_policy Form_pg_proc Form_pg_publication Form_pg_publication_rel +Form_pg_publication_schema Form_pg_range Form_pg_replication_origin Form_pg_rewrite @@ -2045,6 +2047,7 @@ PublicationActions PublicationInfo PublicationPartOpt PublicationRelInfo +PublicationSchemaInfo PullFilter PullFilterOps PushFilter @@ -2329,6 +2332,8 @@ ScanState ScanTypeControl ScannerCallbackState SchemaQuery +SchemaSpec +SchemaSpecType SecBuffer SecBufferDesc SecLabelItem -- 2.30.2