From e015211927787d3ef104fc8e6217a11ce50a6115 Mon Sep 17 00:00:00 2001 From: amit Date: Thu, 7 Nov 2019 18:19:33 +0900 Subject: [PATCH v6 1/4] Support adding partitioned tables to publication --- doc/src/sgml/logical-replication.sgml | 10 +- doc/src/sgml/ref/create_publication.sgml | 27 +++-- src/backend/catalog/pg_publication.c | 42 +++++--- src/backend/commands/copy.c | 2 +- src/backend/commands/publicationcmds.c | 12 ++- src/backend/commands/subscriptioncmds.c | 63 +++++++---- src/backend/executor/execMain.c | 7 +- src/backend/executor/execPartition.c | 5 +- src/backend/executor/execReplication.c | 43 ++++---- src/backend/executor/nodeModifyTable.c | 6 +- src/backend/replication/logical/tablesync.c | 30 ++++-- src/backend/replication/pgoutput/pgoutput.c | 41 +++++-- src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_publication.h | 1 + src/include/executor/executor.h | 8 +- src/test/regress/expected/publication.out | 21 +++- src/test/regress/sql/publication.sql | 12 ++- src/test/subscription/t/013_partition.pl | 159 ++++++++++++++++++++++++++++ 18 files changed, 399 insertions(+), 95 deletions(-) create mode 100644 src/test/subscription/t/013_partition.pl diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index f657d1d06e..cbf33d73c6 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -402,13 +402,9 @@ - Replication is only possible from base tables to base tables. That is, - the tables on the publication and on the subscription side must be normal - tables, not views, materialized views, partition root tables, or foreign - tables. In the case of partitions, you can therefore replicate a - partition hierarchy one-to-one, but you cannot currently replicate to a - differently partitioned setup. Attempts to replicate tables other than - base tables will result in an error. + Replication is only supported by regular and partitioned tables. + Attempts to replicate other types of relations such as views, materialized + views, or foreign tables, will result in an error. diff --git a/doc/src/sgml/ref/create_publication.sgml b/doc/src/sgml/ref/create_publication.sgml index 99f87ca393..848779a00f 100644 --- a/doc/src/sgml/ref/create_publication.sgml +++ b/doc/src/sgml/ref/create_publication.sgml @@ -68,15 +68,25 @@ CREATE PUBLICATION name that table is added to the publication. If ONLY is not specified, the table and all its descendant tables (if any) are added. Optionally, * can be specified after the table name to - explicitly indicate that descendant tables are included. + explicitly indicate that descendant tables are included. However, adding + a partitioned table to a publication never explicitly adds its partitions, + because partitions are implicitly published due to the partitioned table + being added to the publication. - Only persistent base tables can be part of a publication. Temporary - tables, unlogged tables, foreign tables, materialized views, regular - views, and partitioned tables cannot be part of a publication. To - replicate a partitioned table, add the individual partitions to the - publication. + Only persistent base tables and partitioned tables can be part of a + publication. Temporary tables, unlogged tables, foreign tables, + materialized views, regular views cannot be part of a publication. + + + + When a partitioned table is added to a publication, all of its existing + and future partitions are also implicitly considered to be part of the + publication. So, any INSERT, UPDATE, + and DELETE, and TRUNCATE operations + that are directly applied to a partition are also published via its + ancestors' publications. @@ -133,6 +143,11 @@ CREATE PUBLICATION name + Partitioned tables are not considered when FOR ALL TABLES + is specified. + + + The creation of a publication does not start replication. It only defines a grouping and filtering logic for future subscribers. diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index d442c8e0bb..9e14a8216e 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -26,6 +26,7 @@ #include "catalog/namespace.h" #include "catalog/objectaccess.h" #include "catalog/objectaddress.h" +#include "catalog/partition.h" #include "catalog/pg_publication.h" #include "catalog/pg_publication_rel.h" #include "catalog/pg_type.h" @@ -47,17 +48,9 @@ static void check_publication_add_relation(Relation targetrel) { - /* Give more specific error for partitioned tables */ - if (RelationGetForm(targetrel)->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("\"%s\" is a partitioned table", - RelationGetRelationName(targetrel)), - errdetail("Adding partitioned tables to publications is not supported."), - errhint("You can add the table partitions individually."))); - - /* Must be table */ - if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION) + /* Must be a regular or partitioned table */ + if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && + RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("\"%s\" is not a table", @@ -103,7 +96,8 @@ check_publication_add_relation(Relation targetrel) static bool is_publishable_class(Oid relid, Form_pg_class reltuple) { - return reltuple->relkind == RELKIND_RELATION && + return (reltuple->relkind == RELKIND_RELATION || + reltuple->relkind == RELKIND_PARTITIONED_TABLE) && !IsCatalogRelationOid(relid) && reltuple->relpersistence == RELPERSISTENCE_PERMANENT && relid >= FirstNormalObjectId; @@ -230,7 +224,7 @@ GetRelationPublications(Oid relid) CatCList *pubrellist; int i; - /* Find all publications associated with the relation. */ + /* Finds all publications associated with the relation. */ pubrellist = SearchSysCacheList1(PUBLICATIONRELMAP, ObjectIdGetDatum(relid)); for (i = 0; i < pubrellist->n_members; i++) @@ -247,6 +241,28 @@ GetRelationPublications(Oid relid) } /* + * Finds all publications that publish changes to the input relation's + * ancestors. + */ +List * +GetRelationAncestorPublications(Oid relid) +{ + List *ancestors = get_partition_ancestors(relid); + List *ancestor_pubids = NIL; + ListCell *lc; + + foreach(lc, ancestors) + { + Oid ancestor = lfirst_oid(lc); + List *rel_publishers = GetRelationPublications(ancestor); + + ancestor_pubids = list_concat_copy(ancestor_pubids, rel_publishers); + } + + return ancestor_pubids; +} + +/* * Gets list of relation oids for a publication. * * This should only be used for normal publications, the FOR ALL TABLES diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 42a147b67d..bb8c926659 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -2835,7 +2835,7 @@ CopyFrom(CopyState cstate) target_resultRelInfo = resultRelInfo; /* Verify the named relation is a valid target for INSERT */ - CheckValidResultRel(resultRelInfo, CMD_INSERT); + CheckValidResultRel(resultRelInfo, NULL, CMD_INSERT); ExecOpenIndices(resultRelInfo, false); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index fbf11c86aa..ee56acf3f3 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -498,7 +498,8 @@ RemovePublicationRelById(Oid proid) /* * Open relations specified by a RangeVar list. - * The returned tables are locked in ShareUpdateExclusiveLock mode. + * The returned tables are locked in ShareUpdateExclusiveLock mode in order to + * add them to a publication. */ static List * OpenTableList(List *tables) @@ -539,8 +540,13 @@ OpenTableList(List *tables) rels = lappend(rels, rel); relids = lappend_oid(relids, myrelid); - /* Add children of this rel, if requested */ - if (recurse) + /* + * Add children of this rel, if requested, so that they too are added + * to the publication. A partitioned table can't have any inheritance + * children other than its partitions, which need not be explicitly + * added to the publication. + */ + if (recurse && rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE) { List *children; ListCell *child; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5408edcfc2..f65cad4ac0 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -44,7 +44,8 @@ #include "utils/memutils.h" #include "utils/syscache.h" -static List *fetch_table_list(WalReceiverConn *wrconn, List *publications); +static List *fetch_publication_tables(WalReceiverConn *wrconn, List *publications); +static Oid ValidateSubscriptionRel(RangeVar *rv); /* * Common option parsing function for CREATE and ALTER SUBSCRIPTION commands. @@ -453,18 +454,13 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) * Get the table list from publisher and build local table status * info. */ - tables = fetch_table_list(wrconn, publications); + tables = fetch_publication_tables(wrconn, publications); foreach(lc, tables) { RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); - + relid = ValidateSubscriptionRel(rv); AddSubscriptionRelState(subid, relid, table_state, InvalidXLogRecPtr); } @@ -530,7 +526,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) (errmsg("could not connect to the publisher: %s", err))); /* Get the table list from publisher. */ - pubrel_names = fetch_table_list(wrconn, sub->publications); + pubrel_names = fetch_publication_tables(wrconn, sub->publications); /* We are done with the remote side, close connection. */ walrcv_disconnect(wrconn); @@ -568,11 +564,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) RangeVar *rv = (RangeVar *) lfirst(lc); Oid relid; - relid = RangeVarGetRelid(rv, AccessShareLock, false); - - /* Check for supported relkind. */ - CheckSubscriptionRelkind(get_rel_relkind(relid), - rv->schemaname, rv->relname); + /* Check that there's an appropriate relation present locally. */ + relid = ValidateSubscriptionRel(rv); pubrel_local_oids[off++] = relid; @@ -1121,10 +1114,12 @@ AlterSubscriptionOwner_oid(Oid subid, Oid newOwnerId) /* * Get the list of tables which belong to specified publications on the - * publisher connection. + * publisher connection to create a subscription state (pg_subscription_rel + * entry) for each. For partitioned tables, subscription state is maintained + * per partition, so partitions are fetched too. */ static List * -fetch_table_list(WalReceiverConn *wrconn, List *publications) +fetch_publication_tables(WalReceiverConn *wrconn, List *publications) { WalRcvExecResult *res; StringInfoData cmd; @@ -1137,9 +1132,19 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) Assert(list_length(publications) > 0); initStringInfo(&cmd); - appendStringInfoString(&cmd, "SELECT DISTINCT t.schemaname, t.tablename\n" + appendStringInfoString(&cmd, "SELECT DISTINCT s.schemaname, s.tablename FROM (\n" + " SELECT DISTINCT t.pubname, t.schemaname, t.tablename \n" " FROM pg_catalog.pg_publication_tables t\n" - " WHERE t.pubname IN ("); + " UNION\n" + " SELECT DISTINCT t.pubname, s.schemaname, s.tablename\n" + " FROM pg_catalog.pg_publication_tables t,\n" + " LATERAL (SELECT c.relnamespace::regnamespace::name, c.relname\n" + " FROM pg_class c\n" + " JOIN pg_partition_tree(t.schemaname || '.' || t.tablename) p\n" + " ON p.relid = c.oid\n" + " WHERE p.level > 0) AS s(schemaname, tablename)) s\n" + " WHERE s.pubname IN ("); + first = true; foreach(lc, publications) { @@ -1187,3 +1192,25 @@ fetch_table_list(WalReceiverConn *wrconn, List *publications) return tablelist; } + +/* + * Looks up a local relation matching the given publication table and + * checks that it's appropriate to use as replication target, erroring + * out if not. + * + * Oid of the successfully validated local relation is returned. + */ +static Oid +ValidateSubscriptionRel(RangeVar *rv) +{ + Oid relid; + + relid = RangeVarGetRelid(rv, AccessShareLock, false); + Assert(OidIsValid(relid)); + + /* Check for supported relkind. */ + CheckSubscriptionRelkind(get_rel_relkind(relid), + rv->schemaname, rv->relname); + + return relid; +} diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index c46eb8d646..416970393c 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -1073,7 +1073,9 @@ InitPlan(QueryDesc *queryDesc, int eflags) * CheckValidRowMarkRel. */ void -CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation) +CheckValidResultRel(ResultRelInfo *resultRelInfo, + ResultRelInfo *rootResultRelInfo, + CmdType operation) { Relation resultRel = resultRelInfo->ri_RelationDesc; TriggerDesc *trigDesc = resultRel->trigdesc; @@ -1083,7 +1085,8 @@ CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation) { case RELKIND_RELATION: case RELKIND_PARTITIONED_TABLE: - CheckCmdReplicaIdentity(resultRel, operation); + CheckCmdReplicaIdentity(resultRelInfo, rootResultRelInfo, + operation); break; case RELKIND_SEQUENCE: ereport(ERROR, diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c index d23f292cb0..06f6923966 100644 --- a/src/backend/executor/execPartition.c +++ b/src/backend/executor/execPartition.c @@ -385,7 +385,8 @@ ExecFindPartition(ModifyTableState *mtstate, rri = elem->rri; /* Verify this ResultRelInfo allows INSERTs */ - CheckValidResultRel(rri, CMD_INSERT); + CheckValidResultRel(rri, rootResultRelInfo, + CMD_INSERT); /* Set up the PartitionRoutingInfo for it */ ExecInitRoutingInfo(mtstate, estate, proute, dispatch, @@ -530,7 +531,7 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate, * partition-key becomes a DELETE+INSERT operation, so this check is still * required when the operation is CMD_UPDATE. */ - CheckValidResultRel(leaf_part_rri, CMD_INSERT); + CheckValidResultRel(leaf_part_rri, rootResultRelInfo, CMD_INSERT); /* * Open partition indices. The user may have asked to check for conflicts diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c index 95e027c970..22f613beed 100644 --- a/src/backend/executor/execReplication.c +++ b/src/backend/executor/execReplication.c @@ -396,10 +396,10 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot) ResultRelInfo *resultRelInfo = estate->es_result_relation_info; Relation rel = resultRelInfo->ri_RelationDesc; - /* For now we support only tables. */ + /* For now we support only regular tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); - CheckCmdReplicaIdentity(rel, CMD_INSERT); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_INSERT); /* BEFORE ROW INSERT Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -463,7 +463,7 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, /* For now we support only tables. */ Assert(rel->rd_rel->relkind == RELKIND_RELATION); - CheckCmdReplicaIdentity(rel, CMD_UPDATE); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_UPDATE); /* BEFORE ROW UPDATE Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -521,7 +521,7 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, Relation rel = resultRelInfo->ri_RelationDesc; ItemPointer tid = &searchslot->tts_tid; - CheckCmdReplicaIdentity(rel, CMD_DELETE); + CheckCmdReplicaIdentity(resultRelInfo, NULL, CMD_DELETE); /* BEFORE ROW DELETE Triggers */ if (resultRelInfo->ri_TrigDesc && @@ -544,12 +544,17 @@ ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, } /* - * Check if command can be executed with current replica identity. + * Check if command can be executed on 'target_rel' with its (or the + * ancestor's) current replica identity. */ void -CheckCmdReplicaIdentity(Relation rel, CmdType cmd) +CheckCmdReplicaIdentity(ResultRelInfo *target_rel, + ResultRelInfo *root_target_rel, + CmdType cmd) { PublicationActions *pubactions; + Relation rel = target_rel->ri_RelationDesc; + Relation rootrel = root_target_rel ? root_target_rel->ri_RelationDesc : NULL; /* We only need to do checks for UPDATE and DELETE. */ if (cmd != CMD_UPDATE && cmd != CMD_DELETE) @@ -563,9 +568,18 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd) /* * This is either UPDATE OR DELETE and there is no replica identity. * - * Check if the table publishes UPDATES or DELETES. + * Check if the table or its root ancestor publishes UPDATES or DELETES. */ pubactions = GetRelationPublicationActions(rel); + if (rootrel) + { + PublicationActions *root_pubactions; + + root_pubactions = GetRelationPublicationActions(rootrel); + pubactions->pubupdate |= root_pubactions->pubupdate; + pubactions->pubdelete |= root_pubactions->pubdelete; + } + if (cmd == CMD_UPDATE && pubactions->pubupdate) ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -591,17 +605,10 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname) { /* - * We currently only support writing to regular tables. However, give a - * more specific error for partitioned and foreign tables. + * We currently only support writing to regular and partitioned tables. + * However, give a more specific error for foreign tables. */ - if (relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot use relation \"%s.%s\" as logical replication target", - nspname, relname), - errdetail("\"%s.%s\" is a partitioned table.", - nspname, relname))); - else if (relkind == RELKIND_FOREIGN_TABLE) + if (relkind == RELKIND_FOREIGN_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", @@ -609,7 +616,7 @@ CheckSubscriptionRelkind(char relkind, const char *nspname, errdetail("\"%s.%s\" is a foreign table.", nspname, relname))); - if (relkind != RELKIND_RELATION) + if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE) ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot use relation \"%s.%s\" as logical replication target", diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c index e3eb9d7b90..fb97d24f3a 100644 --- a/src/backend/executor/nodeModifyTable.c +++ b/src/backend/executor/nodeModifyTable.c @@ -2268,6 +2268,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) int nplans = list_length(node->plans); ResultRelInfo *saved_resultRelInfo; ResultRelInfo *resultRelInfo; + ResultRelInfo *rootResultRelInfo = NULL; Plan *subplan; ListCell *l; int i; @@ -2295,8 +2296,11 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* If modifying a partitioned table, initialize the root table info */ if (node->rootResultRelIndex >= 0) + { mtstate->rootResultRelInfo = estate->es_root_result_relations + node->rootResultRelIndex; + rootResultRelInfo = mtstate->rootResultRelInfo; + } mtstate->mt_arowmarks = (List **) palloc0(sizeof(List *) * nplans); mtstate->mt_nplans = nplans; @@ -2330,7 +2334,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags) /* * Verify result relation is a valid target for the current operation */ - CheckValidResultRel(resultRelInfo, operation); + CheckValidResultRel(resultRelInfo, rootResultRelInfo, operation); /* * If there are indices on the result relation, open them and save diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index e01d18c3a1..554bdb10d3 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -630,16 +630,17 @@ copy_read_data(void *outbuf, int minread, int maxread) /* * Get information about remote relation in similar fashion the RELATION - * message provides during replication. + * message provides during replication. XXX - while we fetch relkind too + * here, the RELATION message doesn't provide it */ static void fetch_remote_table_info(char *nspname, char *relname, - LogicalRepRelation *lrel) + LogicalRepRelation *lrel, char *relkind) { WalRcvExecResult *res; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[2] = {OIDOID, CHAROID}; + Oid tableRow[3] = {OIDOID, CHAROID, CHAROID}; Oid attrRow[4] = {TEXTOID, OIDOID, INT4OID, BOOLOID}; bool isnull; int natt; @@ -649,16 +650,16 @@ fetch_remote_table_info(char *nspname, char *relname, /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" " WHERE n.nspname = %s" " AND c.relname = %s" - " AND c.relkind = 'r'", + " AND pg_relation_is_publishable(c.oid)", quote_literal_cstr(nspname), quote_literal_cstr(relname)); - res = walrcv_exec(wrconn, cmd.data, 2, tableRow); + res = walrcv_exec(wrconn, cmd.data, 3, tableRow); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, @@ -675,6 +676,8 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->replident = DatumGetChar(slot_getattr(slot, 2, &isnull)); Assert(!isnull); + *relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); + Assert(!isnull); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -750,10 +753,12 @@ copy_table(Relation rel) CopyState cstate; List *attnamelist; ParseState *pstate; + char remote_relkind; /* Get the publisher relation info. */ fetch_remote_table_info(get_namespace_name(RelationGetNamespace(rel)), - RelationGetRelationName(rel), &lrel); + RelationGetRelationName(rel), &lrel, + &remote_relkind); /* Put the relation into relmap. */ logicalrep_relmap_update(&lrel); @@ -762,6 +767,17 @@ copy_table(Relation rel) relmapentry = logicalrep_rel_open(lrel.remoteid, NoLock); Assert(rel == relmapentry->localrel); + /* + * If either table is partitioned, skip copying. Individual partitions + * will be copied instead. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE || + remote_relkind == RELKIND_PARTITIONED_TABLE) + { + logicalrep_rel_close(relmapentry, NoLock); + return; + } + /* Start copy on the publisher. */ initStringInfo(&cmd); appendStringInfo(&cmd, "COPY %s TO STDOUT", diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 3483c1b877..8dc78f1779 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -50,7 +50,12 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); -/* Entry in the map used to remember which relation schemas we sent. */ +/* + * Entry in the map used to remember which relation schemas we sent. + * + * For partitions, 'pubactions' considers not only the table's own + * publications, but also those of all of its ancestors. + */ typedef struct RelationSyncEntry { Oid relid; /* relation oid */ @@ -63,7 +68,7 @@ typedef struct RelationSyncEntry static HTAB *RelationSyncCache = NULL; static void init_rel_sync_cache(MemoryContext decoding_context); -static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Oid relid); +static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data, Relation rel); static void rel_sync_cache_relation_cb(Datum arg, Oid relid); static void rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue); @@ -311,7 +316,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) return; - relentry = get_rel_sync_entry(data, RelationGetRelid(relation)); + relentry = get_rel_sync_entry(data, relation); /* First check the table filter */ switch (change->action) @@ -401,7 +406,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (!is_publishable_relation(relation)) continue; - relentry = get_rel_sync_entry(data, relid); + relentry = get_rel_sync_entry(data, relation); if (!relentry->pubactions.pubtruncate) continue; @@ -526,8 +531,9 @@ init_rel_sync_cache(MemoryContext cachectx) * Find or create entry in the relation schema cache. */ static RelationSyncEntry * -get_rel_sync_entry(PGOutputData *data, Oid relid) +get_rel_sync_entry(PGOutputData *data, Relation rel) { + Oid relid = RelationGetRelid(rel); RelationSyncEntry *entry; bool found; MemoryContext oldctx; @@ -546,7 +552,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!found || !entry->replicate_valid) { List *pubids = GetRelationPublications(relid); - ListCell *lc; + ListCell *lc, + *lc1; + List *ancestor_pubids = NIL; /* Reload publications if needed before use. */ if (!publications_valid) @@ -568,6 +576,11 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; + /* For partitions, also consider publications of ancestors. */ + if (rel->rd_rel->relispartition) + ancestor_pubids = + GetRelationAncestorPublications(RelationGetRelid(rel)); + foreach(lc, data->publications) { Publication *pub = lfirst(lc); @@ -583,9 +596,25 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && entry->pubactions.pubdelete && entry->pubactions.pubtruncate) break; + + foreach(lc1, ancestor_pubids) + { + if (lfirst_oid(lc1) == pub->oid) + { + entry->pubactions.pubinsert |= pub->pubactions.pubinsert; + entry->pubactions.pubupdate |= pub->pubactions.pubupdate; + entry->pubactions.pubdelete |= pub->pubactions.pubdelete; + entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; + } + } + + if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && + entry->pubactions.pubdelete && entry->pubactions.pubtruncate) + break; } list_free(pubids); + list_free(ancestor_pubids); entry->replicate_valid = true; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 08658c8e86..b5e91771e4 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -3969,8 +3969,9 @@ getPublicationTables(Archive *fout, TableInfo tblinfo[], int numTables) { TableInfo *tbinfo = &tblinfo[i]; - /* Only plain tables can be aded to publications. */ - if (tbinfo->relkind != RELKIND_RELATION) + /* Only plain and partitioned tables can be added to publications. */ + if (tbinfo->relkind != RELKIND_RELATION && + tbinfo->relkind != RELKIND_PARTITIONED_TABLE) continue; /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index ea22aa6563..5ee7091472 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -80,6 +80,7 @@ typedef struct Publication extern Publication *GetPublication(Oid pubid); extern Publication *GetPublicationByName(const char *pubname, bool missing_ok); extern List *GetRelationPublications(Oid relid); +extern List *GetRelationAncestorPublications(Oid relid); extern List *GetPublicationRelations(Oid pubid); extern List *GetAllTablesPublications(void); extern List *GetAllTablesPublicationRelations(void); diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h index 6298c7c8ca..698a57d0cd 100644 --- a/src/include/executor/executor.h +++ b/src/include/executor/executor.h @@ -179,7 +179,9 @@ extern void ExecutorEnd(QueryDesc *queryDesc); extern void standard_ExecutorEnd(QueryDesc *queryDesc); extern void ExecutorRewind(QueryDesc *queryDesc); extern bool ExecCheckRTPerms(List *rangeTable, bool ereport_on_violation); -extern void CheckValidResultRel(ResultRelInfo *resultRelInfo, CmdType operation); +extern void CheckValidResultRel(ResultRelInfo *resultRelInfo, + ResultRelInfo *rootResultRelInfo, + CmdType operation); extern void InitResultRelInfo(ResultRelInfo *resultRelInfo, Relation resultRelationDesc, Index resultRelationIndex, @@ -592,7 +594,9 @@ extern void ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot, TupleTableSlot *slot); extern void ExecSimpleRelationDelete(EState *estate, EPQState *epqstate, TupleTableSlot *searchslot); -extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd); +extern void CheckCmdReplicaIdentity(ResultRelInfo *target_rel, + ResultRelInfo *root_target_rel, + CmdType cmd); extern void CheckSubscriptionRelkind(char relkind, const char *nspname, const char *relname); diff --git a/src/test/regress/expected/publication.out b/src/test/regress/expected/publication.out index feb51e4add..e3fabe70f9 100644 --- a/src/test/regress/expected/publication.out +++ b/src/test/regress/expected/publication.out @@ -116,6 +116,22 @@ Tables: DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted + Publication testpub_forparted + Owner | All tables | Inserts | Updates | Deletes | Truncates +--------------------------+------------+---------+---------+---------+----------- + regress_publication_user | f | t | t | t | t +Tables: + "public.testpub_parted" + +DROP PUBLICATION testpub_forparted; -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; ERROR: "testpub_view" is not a table @@ -142,11 +158,6 @@ Tables: ALTER PUBLICATION testpub_default ADD TABLE testpub_view; ERROR: "testpub_view" is not a table DETAIL: Only tables can be added to publications. --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; -ERROR: "testpub_parted" is a partitioned table -DETAIL: Adding partitioned tables to publications is not supported. -HINT: You can add the table partitions individually. ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; ALTER PUBLICATION testpub_default ADD TABLE pub_test.testpub_nopk; diff --git a/src/test/regress/sql/publication.sql b/src/test/regress/sql/publication.sql index 5773a755cf..b79a3f8f8f 100644 --- a/src/test/regress/sql/publication.sql +++ b/src/test/regress/sql/publication.sql @@ -69,6 +69,16 @@ RESET client_min_messages; DROP TABLE testpub_tbl3, testpub_tbl3a; DROP PUBLICATION testpub3, testpub4; +-- Tests for partitioned tables +SET client_min_messages = 'ERROR'; +CREATE PUBLICATION testpub_forparted; +RESET client_min_messages; +-- should add only the parent to publication, not the partition +CREATE TABLE testpub_parted1 PARTITION OF testpub_parted FOR VALUES IN (1); +ALTER PUBLICATION testpub_forparted ADD TABLE testpub_parted; +\dRp+ testpub_forparted +DROP PUBLICATION testpub_forparted; + -- fail - view CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_view; SET client_min_messages = 'ERROR'; @@ -83,8 +93,6 @@ CREATE PUBLICATION testpub_fortbl FOR TABLE testpub_tbl1; -- fail - view ALTER PUBLICATION testpub_default ADD TABLE testpub_view; --- fail - partitioned table -ALTER PUBLICATION testpub_fortbl ADD TABLE testpub_parted; ALTER PUBLICATION testpub_default ADD TABLE testpub_tbl1; ALTER PUBLICATION testpub_default SET TABLE testpub_tbl1; diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl new file mode 100644 index 0000000000..2b8a5025dc --- /dev/null +++ b/src/test/subscription/t/013_partition.pl @@ -0,0 +1,159 @@ +# Test PARTITION +use strict; +use warnings; +use PostgresNode; +use TestLib; +use Test::More tests => 10; + +# setup + +my $node_publisher = get_new_node('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->start; + +my $node_subscriber1 = get_new_node('subscriber1'); +$node_subscriber1->init(allows_streaming => 'logical'); +$node_subscriber1->start; + +my $node_subscriber2 = get_new_node('subscriber2'); +$node_subscriber2->init(allows_streaming => 'logical'); +$node_subscriber2->start; + +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY) PARTITION BY LIST (a)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1 (a int PRIMARY KEY, b text) PARTITION BY LIST (a)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_1 PARTITION OF tab1 FOR VALUES IN (1, 2, 3)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_1 (b text DEFAULT 'sub1_tab1', a int NOT NULL)"); +$node_subscriber1->safe_psql('postgres', + "ALTER TABLE tab1 ATTACH PARTITION tab1_1 FOR VALUES IN (1, 2, 3, 4)"); + +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 FOR VALUES IN (5, 6)"); + +$node_subscriber1->safe_psql('postgres', + "CREATE TABLE tab1_2 PARTITION OF tab1 (b DEFAULT 'sub1_tab1') FOR VALUES IN (5, 6)"); + +$node_subscriber2->safe_psql('postgres', + "CREATE TABLE tab1_2 (a int PRIMARY KEY, b text DEFAULT 'sub2_tab1_2')"); + +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub1 FOR TABLE tab1, tab1_1"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION pub2 FOR TABLE tab1_2"); + +$node_subscriber1->safe_psql('postgres', + "CREATE SUBSCRIPTION sub1 CONNECTION '$publisher_connstr' PUBLICATION pub1"); + +$node_subscriber2->safe_psql('postgres', + "CREATE SUBSCRIPTION sub2 CONNECTION '$publisher_connstr' PUBLICATION pub2"); + +# Wait for initial sync of all subscriptions +my $synced_query = + "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');"; +$node_subscriber1->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; +$node_subscriber2->poll_query_until('postgres', $synced_query) + or die "Timed out while waiting for subscriber to synchronize data"; + +# insert data (some into the root parent and some directly into partitions) + +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_1 VALUES (3)"); +$node_publisher->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +my $result = $node_subscriber1->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|1|5), 'insert into tab1_1, tab1_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|1|5|5), 'inserts into tab1_2 replicated'); + +# update a row (no partition change) + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 2 WHERE a = 1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|2|5), 'update of tab1_1 replicated'); + +# update a row (partition changes) + +$node_publisher->safe_psql('postgres', + "UPDATE tab1 SET a = 6 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1 GROUP BY 1"); +is($result, qq(sub1_tab1|3|3|6), 'delete from tab1_1 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT b, count(*), min(a), max(a) FROM tab1_2 GROUP BY 1"); +is($result, qq(sub2_tab1_2|2|5|6), 'insert into tab1_2 replicated'); + +# delete rows (some from the root parent, some directly from the partition) + +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1 WHERE a IN (3, 5)"); +$node_publisher->safe_psql('postgres', + "DELETE FROM tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'delete from tab1_1, tab_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'delete from tab1_2 replicated'); + +# truncate (root parent and partition directly) + +$node_subscriber1->safe_psql('postgres', + "INSERT INTO tab1 VALUES (1), (2), (5)"); +$node_subscriber2->safe_psql('postgres', + "INSERT INTO tab1_2 VALUES (5)"); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1_2"); + +$node_publisher->wait_for_catchup('sub1'); +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(2|1|2), 'truncate of tab_2 replicated'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1_2"); +is($result, qq(0||), 'truncate of tab1_2 replicated'); + +$node_publisher->safe_psql('postgres', + "TRUNCATE tab1"); + +$node_publisher->wait_for_catchup('sub1'); + +$result = $node_subscriber1->safe_psql('postgres', + "SELECT count(*), min(a), max(a) FROM tab1"); +is($result, qq(0||), 'truncate of tab1_1 replicated'); -- 2.11.0