From a584fde8dac183e4c83fa6176ff8b81d8dcb6e80 Mon Sep 17 00:00:00 2001 From: "shiy.fnst" Date: Fri, 10 Jun 2022 16:29:52 +0800 Subject: [PATCH v2 2/2] Check partition table replica identity on subscriber In logical replication, we will check if the target table on subscriber is updatable. When the target table is a partitioned table, we should check the target partition, instead of the partitioned table. Author: Shi yu --- src/backend/replication/logical/relation.c | 113 +++++++++++---------- src/backend/replication/logical/worker.c | 27 +++-- src/test/subscription/t/013_partition.pl | 14 +++ 3 files changed, 97 insertions(+), 57 deletions(-) diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 85712fb0f4..b614124227 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -299,6 +299,64 @@ logicalrep_report_missing_attrs(LogicalRepRelation *remoterel, } } +/* + * Check if replica identity matches and mark the "updatable" flag. + * + * We allow for stricter replica identity (fewer columns) on subscriber as + * that will not stop us from finding unique tuple. IE, if publisher has + * identity (id,timestamp) and subscriber just (id) this will not be a + * problem, but in the opposite scenario it will. + * + * Don't throw any error here just mark the relation entry as not updatable, + * as replica identity is only for updates and deletes but inserts can be + * replicated even without it. + */ +static void +logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) +{ + Bitmapset *idkey; + LogicalRepRelation *remoterel = &entry->remoterel; + int i; + + entry->updatable = true; + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_IDENTITY_KEY); + /* fallback to PK if no replica identity */ + if (idkey == NULL) + { + idkey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_PRIMARY_KEY); + /* + * If no replica identity index and no PK, the published table + * must have replica identity FULL. + */ + if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) + entry->updatable = false; + } + + i = -1; + while ((i = bms_next_member(idkey, i)) >= 0) + { + int attnum = i + FirstLowInvalidHeapAttributeNumber; + + if (!AttrNumberIsForUserDefinedAttr(attnum)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical replication target relation \"%s.%s\" uses " + "system columns in REPLICA IDENTITY index", + remoterel->nspname, remoterel->relname))); + + attnum = AttrNumberGetAttrOffset(attnum); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) + { + entry->updatable = false; + break; + } + } +} + /* * Open the local relation associated with the remote one. * @@ -357,7 +415,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) if (!entry->localrelvalid) { Oid relid; - Bitmapset *idkey; TupleDesc desc; MemoryContext oldctx; int i; @@ -415,55 +472,8 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) /* be tidy */ bms_free(missingatts); - /* - * Check that replica identity matches. We allow for stricter replica - * identity (fewer columns) on subscriber as that will not stop us - * from finding unique tuple. IE, if publisher has identity - * (id,timestamp) and subscriber just (id) this will not be a problem, - * but in the opposite scenario it will. - * - * Don't throw any error here just mark the relation entry as not - * updatable, as replica identity is only for updates and deletes but - * inserts can be replicated even without it. - */ - entry->updatable = true; - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_IDENTITY_KEY); - /* fallback to PK if no replica identity */ - if (idkey == NULL) - { - idkey = RelationGetIndexAttrBitmap(entry->localrel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - - /* - * If no replica identity index and no PK, the published table - * must have replica identity FULL. - */ - if (idkey == NULL && remoterel->replident != REPLICA_IDENTITY_FULL) - entry->updatable = false; - } - - i = -1; - while ((i = bms_next_member(idkey, i)) >= 0) - { - int attnum = i + FirstLowInvalidHeapAttributeNumber; - - if (!AttrNumberIsForUserDefinedAttr(attnum)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("logical replication target relation \"%s.%s\" uses " - "system columns in REPLICA IDENTITY index", - remoterel->nspname, remoterel->relname))); - - attnum = AttrNumberGetAttrOffset(attnum); - - if (entry->attrmap->attnums[attnum] < 0 || - !bms_is_member(entry->attrmap->attnums[attnum], remoterel->attkeys)) - { - entry->updatable = false; - break; - } - } + /* Check that replica identity matches. */ + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; } @@ -647,7 +657,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, attrmap->maplen * sizeof(AttrNumber)); } - entry->updatable = root->updatable; + /* Check that replica identity matches. */ + logicalrep_rel_mark_updatable(entry); entry->localrelvalid = true; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index fc210a9e7b..4eee9c7bb6 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1735,6 +1735,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata, static void check_relation_updatable(LogicalRepRelMapEntry *rel) { + /* + * If it is a partitioned table, we don't check it, we will check its + * partition later. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + /* Updatable, no error. */ if (rel->updatable) return; @@ -2118,6 +2125,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, TupleTableSlot *remoteslot_part; TupleConversionMap *map; MemoryContext oldctx; + LogicalRepRelMapEntry *part_entry; + AttrMap *attrmap = NULL; /* ModifyTableState is needed for ExecFindPartition(). */ edata->mtstate = mtstate = makeNode(ModifyTableState); @@ -2149,8 +2158,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, remoteslot_part = table_slot_create(partrel, &estate->es_tupleTable); map = partrelinfo->ri_RootToPartitionMap; if (map != NULL) - remoteslot_part = execute_attr_map_slot(map->attrMap, remoteslot, + { + attrmap = map->attrMap; + remoteslot_part = execute_attr_map_slot(attrmap, remoteslot, remoteslot_part); + } else { remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot); @@ -2158,6 +2170,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + /* Check if we can do the update or delete. */ + if(operation == CMD_UPDATE || operation == CMD_DELETE) + { + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + check_relation_updatable(part_entry); + } + switch (operation) { case CMD_INSERT: @@ -2179,15 +2199,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, * suitable partition. */ { - AttrMap *attrmap = map ? map->attrMap : NULL; - LogicalRepRelMapEntry *part_entry; TupleTableSlot *localslot; ResultRelInfo *partrelinfo_new; bool found; - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); - /* Get the matching local tuple from the partition. */ found = FindReplTupleInLocalRel(estate, partrel, &part_entry->remoterel, diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl index b2183e0232..fde0b64a1b 100644 --- a/src/test/subscription/t/013_partition.pl +++ b/src/test/subscription/t/013_partition.pl @@ -845,4 +845,18 @@ $result = $node_subscriber2->safe_psql('postgres', "SELECT a, b, c FROM tab5 ORDER BY 1"); is($result, qq(2|1|1), 'updates of tab5 replicated correctly after altering table on publisher'); +# Alter REPLICA IDENTITY on subscriber. +# No REPLICA IDENTITY in the partitioned table on subscriber, but what we check +# is the partition, so it works fine. +$node_subscriber2->safe_psql('postgres', + "ALTER TABLE tab5 REPLICA IDENTITY NOTHING"); + +$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 3 WHERE a = 2"); + +$node_publisher->wait_for_catchup('sub2'); + +$result = $node_subscriber2->safe_psql('postgres', + "SELECT a, b, c FROM tab5_1 ORDER BY 1"); +is($result, qq(3|1|1), 'updates of tab5 replicated correctly'); + done_testing(); -- 2.18.4