From 93f0e0e5804404860560d2944c04d80cfa689f03 Mon Sep 17 00:00:00 2001 From: wangw Date: Tue, 14 Jun 2022 11:23:52 +0800 Subject: [PATCH v11 4/5] Add some checks before using apply background worker to apply changes. If any of the following checks are violated, an error will be reported. 1. The unique columns between publisher and subscriber are difference. 2. There is any non-immutable function present in expression in subscriber's relation. Check from the following 3 items: a. The function in triggers; b. Column default value expressions and domain constraints; c. Constraint expressions. --- doc/src/sgml/ref/create_subscription.sgml | 4 + .../replication/logical/applybgwroker.c | 45 ++ src/backend/replication/logical/proto.c | 62 ++- src/backend/replication/logical/relation.c | 198 +++++++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/logical/worker.c | 15 +- src/backend/utils/cache/typcache.c | 17 + src/include/replication/logicalproto.h | 1 + src/include/replication/logicalrelation.h | 16 + src/include/replication/worker_internal.h | 1 + src/include/utils/typcache.h | 2 + .../subscription/t/022_twophase_cascade.pl | 7 + .../subscription/t/032_streaming_apply.pl | 391 ++++++++++++++++++ 13 files changed, 752 insertions(+), 8 deletions(-) create mode 100644 src/test/subscription/t/032_streaming_apply.pl diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index d4caae0222..8de1a23ce4 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -240,6 +240,10 @@ CREATE SUBSCRIPTION subscription_name diff --git a/src/backend/replication/logical/applybgwroker.c b/src/backend/replication/logical/applybgwroker.c index 1f52b155d7..7d3c61c2e9 100644 --- a/src/backend/replication/logical/applybgwroker.c +++ b/src/backend/replication/logical/applybgwroker.c @@ -763,3 +763,48 @@ apply_bgworker_subxact_info_add(TransactionId current_xid) MemoryContextSwitchTo(oldctx); } } + +/* + * Check if changes on this logical replication relation can be applied by + * apply background worker. + * + * Although we maintains the commit order by allowing only one process to + * commit at a time, our access order to the relation has changed. + * This could cause unexpected problems if the unique column on the replicated + * table is inconsistent with the publisher-side or contains non-immutable + * functions when applying transactions in the apply background worker. + */ +void +apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) +{ + /* Check only we are in apply bgworker. */ + if (!am_apply_bgworker()) + return; + + /* + * If it is a partitioned table, we do not check it, we will check its + * partition later. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + /* + * If any unique index exist, check that they are same as remoterel. + */ + if (!rel->sameunique) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate relation with different unique index"), + errhint("Please change the streaming option to 'on' instead of 'apply'."))); + + /* + * Check if there is any non-immutable function present in expression in + * this relation. + */ + if (rel->volatility == FUNCTION_NONIMMUTABLE) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot replicate relation. There is at least one non-immutable function"), + errhint("Please change the streaming option to 'on' instead of 'apply'."))); + +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index a888038eb4..6af3302270 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -23,7 +23,8 @@ /* * Protocol message flags. */ -#define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define LOGICALREP_IS_REPLICA_IDENTITY 0x0001 +#define LOGICALREP_IS_UNIQUE 0x0002 #define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) @@ -933,11 +934,55 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) TupleDesc desc; int i; uint16 nliveatts = 0; - Bitmapset *idattrs = NULL; + Bitmapset *idattrs = NULL, + *attunique = NULL; bool replidentfull; desc = RelationGetDescr(rel); + if (rel->rd_rel->relhasindex) + { + List *indexoidlist = RelationGetIndexList(rel); + ListCell *indexoidscan; + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + + /* Look up the description for index */ + indexRel = RelationIdGetRelation(indexoid); + + if (!RelationIsValid(indexRel)) + elog(ERROR, "could not open relation with OID %u", indexoid); + + if (indexRel->rd_index->indisunique) + { + int i; + + /* Add referenced attributes to idindexattrs */ + for (i = 0; i < indexRel->rd_index->indnatts; i++) + { + int attrnum = indexRel->rd_index->indkey.values[i]; + + /* + * We don't include non-key columns into idindexattrs + * bitmaps. See RelationGetIndexAttrBitmap. + */ + if (attrnum != 0) + { + if (i < indexRel->rd_index->indnkeyatts && + !bms_is_member(attrnum - FirstLowInvalidHeapAttributeNumber, attunique)) + attunique = bms_add_member(attunique, + attrnum - FirstLowInvalidHeapAttributeNumber); + } + } + } + RelationClose(indexRel); + } + list_free(indexoidlist); + } + /* send number of live attributes */ for (i = 0; i < desc->natts; i++) { @@ -976,6 +1021,10 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) idattrs)) flags |= LOGICALREP_IS_REPLICA_IDENTITY; + if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + attunique)) + flags |= LOGICALREP_IS_UNIQUE; + pq_sendbyte(out, flags); /* attribute name */ @@ -1001,7 +1050,8 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) int natts; char **attnames; Oid *atttyps; - Bitmapset *attkeys = NULL; + Bitmapset *attkeys = NULL, + *attunique = NULL; natts = pq_getmsgint(in, 2); attnames = palloc(natts * sizeof(char *)); @@ -1012,11 +1062,14 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) { uint8 flags; - /* Check for replica identity column */ + /* Check for replica identity and unique column */ flags = pq_getmsgbyte(in); if (flags & LOGICALREP_IS_REPLICA_IDENTITY) attkeys = bms_add_member(attkeys, i); + if (flags & LOGICALREP_IS_UNIQUE) + attunique = bms_add_member(attunique, i); + /* attribute name */ attnames[i] = pstrdup(pq_getmsgstring(in)); @@ -1030,6 +1083,7 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) rel->attnames = attnames; rel->atttyps = atttyps; rel->attkeys = attkeys; + rel->attunique = attunique; rel->natts = natts; } diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index 18e657cdfe..8e17f95137 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -19,12 +19,19 @@ #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" +#include "rewrite/rewriteHandler.h" #include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/typcache.h" static MemoryContext LogicalRepRelMapContext = NULL; @@ -91,6 +98,23 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Reset the flag volatility of all existing entry in the relation map cache. + */ +static void +logicalrep_relmap_reset_volatility_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS hash_seq; + LogicalRepRelMapEntry *entry; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&hash_seq, LogicalRepRelMap); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + entry->volatility = FUNCTION_UNKNOWN; +} + /* * Initialize the relation map cache. */ @@ -116,6 +140,9 @@ logicalrep_relmap_init(void) /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PROCOID, + logicalrep_relmap_reset_volatility_cb, + (Datum) 0); } /* @@ -142,6 +169,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) pfree(remoterel->atttyps); } bms_free(remoterel->attkeys); + bms_free(remoterel->attunique); if (entry->attrmap) pfree(entry->attrmap); @@ -190,6 +218,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); MemoryContextSwitchTo(oldctx); } @@ -315,6 +344,162 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Helper function to mark flags for apply background worker when opening the + * local table. + */ +static void +logicalrep_rel_mark_apply_bgworker(LogicalRepRelMapEntry *entry) +{ + Bitmapset *ukey; + int i; + TupleDesc tupdesc; + int attnum; + List *fkeys = NIL; + + /* + * Check that the unique column in the relation on the subscriber-side is + * also the unique column on the publisher-side. + */ + entry->sameunique = true; + ukey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_KEY); + + if (ukey) + { + i = -1; + while ((i = bms_next_member(ukey, i)) >= 0) + { + int attnum = i + FirstLowInvalidHeapAttributeNumber; + + attnum = AttrNumberGetAttrOffset(attnum); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique)) + { + entry->sameunique = false; + break; + } + } + } + + /* + * Check whether there is any non-immutable function in the local table. + * + * a. The function in triggers; + * b. Column default value expressions and domain constraints; + * c. Constraint expressions; + * d. Foreign keys. + */ + if (entry->volatility != FUNCTION_UNKNOWN) + return; + + /* Initialize the flag. */ + entry->volatility = FUNCTION_IMMUTABLE; + + /* Check the trigger functions. */ + if (entry->localrel->trigdesc != NULL) + { + for (i = 0; i < entry->localrel->trigdesc->numtriggers; i++) + { + Trigger *trig = entry->localrel->trigdesc->triggers + i; + + if (trig->tgenabled != TRIGGER_FIRES_ALWAYS && + trig->tgenabled != TRIGGER_FIRES_ON_REPLICA) + continue; + + if (func_volatile(trig->tgfoid) != PROVOLATILE_IMMUTABLE) + { + entry->volatility = FUNCTION_NONIMMUTABLE; + return; + } + } + } + + /* Check the columns. */ + tupdesc = RelationGetDescr(entry->localrel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + /* + * We don't need to check columns that only exist on the + * subscriber + */ + if (entry->attrmap->attnums[attnum] < 0) + continue; + + if (att->atthasdef) + { + Node *defaultexpr; + + defaultexpr = build_column_default(entry->localrel, attnum + 1); + if (contain_mutable_functions(defaultexpr)) + { + entry->volatility = FUNCTION_NONIMMUTABLE; + return; + } + } + + /* + * If the column is of a DOMAIN type, determine whether + * that domain has any CHECK expressions that are not + * immutable. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + List *domain_constraints; + ListCell *lc; + + domain_constraints = GetDomainConstraints(att->atttypid); + + foreach(lc, domain_constraints) + { + DomainConstraintState *con = (DomainConstraintState *) lfirst(lc); + + if (con->check_expr && contain_mutable_functions((Node *) con->check_expr)) + { + entry->volatility = FUNCTION_NONIMMUTABLE; + return; + } + } + } + } + + + /* Check the constraints. */ + tupdesc = RelationGetDescr(entry->localrel); + + if (tupdesc->constr) + { + ConstrCheck *check = tupdesc->constr->check; + + /* + * Determine if there are any CHECK constraints which + * contains non-immutable function. + */ + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check[i].ccbin); + + if (contain_mutable_functions((Node *) check_expr)) + { + entry->volatility = FUNCTION_NONIMMUTABLE; + return; + } + } + } + + /* Check the foreign keys. */ + fkeys = RelationGetFKeyList(entry->localrel); + if (fkeys) + entry->volatility = FUNCTION_NONIMMUTABLE; +} + /* * Open the local relation associated with the remote one. * @@ -433,6 +618,12 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) /* Check that replica identity matches. */ logicalrep_rel_mark_updatable(entry); + /* + * Set flags to check later whether changes could be applied in the + * apply background worker. + */ + logicalrep_rel_mark_apply_bgworker(entry); + entry->localrelvalid = true; } @@ -629,6 +820,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); } entry->localrel = partrel; @@ -672,6 +864,12 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, /* Check that replica identity matches. */ logicalrep_rel_mark_updatable(entry); + /* + * Set flags to check later whether changes could be applied in the apply + * background worker. + */ + logicalrep_rel_mark_apply_bgworker(entry); + entry->localrelvalid = true; /* state and statelsn are left set to 0. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 8ffba7e2e5..3cdbf8b457 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -884,6 +884,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + lrel->attunique = NULL; /* * Store the columns as a list of names. Ignore those that are not diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index dac85a4101..6770973e39 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1918,6 +1918,8 @@ apply_handle_insert(StringInfo s) /* Set relation for error callback */ apply_error_callback_arg.rel = rel; + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2054,6 +2056,8 @@ apply_handle_update(StringInfo s) /* Check if we can do the update. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2222,6 +2226,8 @@ apply_handle_delete(StringInfo s) /* Check if we can do the delete. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2407,13 +2413,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + + apply_bgworker_relation_check(part_entry); + /* Check if we can do the update or delete on the leaf partition. */ if(operation == CMD_UPDATE || operation == CMD_DELETE) - { - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); check_relation_updatable(part_entry); - } switch (operation) { diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 808f9ebd0d..b248899d82 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -2540,6 +2540,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2) return 0; } +/* + * GetDomainConstraints --- get DomainConstraintState list of specified domain type + */ +List * +GetDomainConstraints(Oid type_id) +{ + TypeCacheEntry *typentry; + List *constraints = NIL; + + typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); + + if(typentry->domainData != NULL) + constraints = typentry->domainData->constraints; + + return constraints; +} + /* * Load (or re-load) the enumData member of the typcache entry. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 7bba77c9e7..a503eb62c2 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -108,6 +108,7 @@ typedef struct LogicalRepRelation char replident; /* replica identity */ char relkind; /* remote relation kind */ Bitmapset *attkeys; /* Bitmap of key columns */ + Bitmapset *attunique; /* Bitmap of unique columns */ } LogicalRepRelation; /* Type mapping info */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 78cd7e77f5..4476cf7cec 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -15,6 +15,17 @@ #include "access/attmap.h" #include "replication/logicalproto.h" +/* + * States to determine volatility of the function in expressions in one + * relation. + */ +typedef enum RelFuncVolatility +{ + FUNCTION_UNKNOWN = 0, /* initializing */ + FUNCTION_IMMUTABLE, /* all functions are immutable function */ + FUNCTION_NONIMMUTABLE /* at least one non-immutable function */ +} RelFuncVolatility; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -31,6 +42,11 @@ typedef struct LogicalRepRelMapEntry Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ + bool sameunique; /* Are all unique columns of the local + relation contained by the unique columns in + remote? */ + RelFuncVolatility volatility; /* all functions in localrel are + immutable function? */ /* Sync state. */ char state; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 08e63de013..dd49d8e2ec 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -196,6 +196,7 @@ extern void apply_bgworker_free(ApplyBgworkerState *wstate); extern void apply_bgworker_check_status(void); extern void apply_bgworker_set_status(ApplyBgworkerStatus status); extern void apply_bgworker_subxact_info_add(TransactionId current_xid); +extern void apply_bgworker_relation_check(LogicalRepRelMapEntry *rel); static inline bool am_tablesync_worker(void) diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 431ad7f1b3..ed7c2e7f48 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern List *GetDomainConstraints(Oid type_id); + extern size_t SharedRecordTypmodRegistryEstimate(void); extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index b52af74e35..81791c8d3a 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -39,6 +39,13 @@ sub test_streaming ALTER SUBSCRIPTION tap_sub_C SET (streaming = $streaming_mode)"); + if ($streaming_mode eq 'apply') + { + $node_C->safe_psql( + 'postgres', " + ALTER TABLE test_tab ALTER c DROP DEFAULT"); + } + # Wait for subscribers to finish initialization $node_A->poll_query_until( diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl new file mode 100644 index 0000000000..84a6900b33 --- /dev/null +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -0,0 +1,391 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test the restrictions of streaming mode "apply" in logical replication + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar)"); + +# Setup structure on subscriber +# We need to test normal table and partition table. +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab (a int primary key, b varchar)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar) PARTITION BY RANGE(a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partition (LIKE test_tab_partitioned)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partitioned ATTACH PARTITION test_tab_partition DEFAULT" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub FOR TABLE test_tab"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_partitioned FOR TABLE test_tab_partitioned"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub, tap_pub_partitioned + WITH (streaming = apply, copy_data = false)"); + +$node_publisher->wait_for_catchup($appname); + +# It is not allowed that the unique index on the publisher and the subscriber +# is different. Check the error reported by background worker in this case. +# First we check the unique index on normal table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX test_tab_b_idx ON test_tab (b)"); + +# Check that a background worker starts if "streaming" option is +# specified as "apply". We have to look for the DEBUG1 log messages +# about that, so temporarily bump up the log verbosity. +$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->reload; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log(qr/\[Apply BGW #\d+\] started/, 0); +$node_subscriber->append_conf('postgresql.conf', + "log_min_messages = warning"); +$node_subscriber->reload; + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation with different unique index/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); + +$node_publisher->wait_for_catchup($appname); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber after dropping index'); + +# Then we check the unique index on partition table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX test_tab_b_partition_idx ON test_tab_partition (b)"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation with different unique index/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), 'data replicated to subscriber after dropping index'); + +# Triggers which execute non-immutable function are not allowed on the +# subscriber side. Check the error reported by background worker in this case. +# First we check the trigger function on normal table. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION trigger_func() RETURNS TRIGGER AS \$\$ + BEGIN + RETURN NULL; + END +\$\$ language plpgsql; +CREATE TRIGGER insert_trig +BEFORE INSERT ON test_tab +FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +ALTER TABLE test_tab ENABLE REPLICA TRIGGER insert_trig; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop the trigger on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), 'data replicated to subscriber after dropping trigger'); + +# Then we check the unique index on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TRIGGER insert_trig +BEFORE INSERT ON test_tab_partition +FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +ALTER TABLE test_tab_partition ENABLE REPLICA TRIGGER insert_trig; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop the trigger on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), 'data replicated to subscriber after dropping trigger'); + +# It is not allowed that column default value expression contains a +# non-immutable function on the subscriber side. Check the error reported by +# background worker in this case. +# First we check the column default value expression on normal table. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b SET DEFAULT random()"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop default value on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping default value expression'); + +# Then we check the column default value expression on partition table. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b SET DEFAULT random()"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop default value on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping default value expression'); + +# It is not allowed that domain constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. +# Because the column type of the partition table must be the same as its parent +# table, only test normal table here. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE DOMAIN test_domain AS int CHECK(VALUE > random()); +ALTER TABLE test_tab ALTER COLUMN a TYPE test_domain; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop domain constraint expression on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), + 'data replicated to subscriber after dropping domain constraint expression' +); + +# It is not allowed that constraint expression contains a non-immutable function +# on the subscriber side. Check the error reported by background worker in this +# case. +# First we check the constraint expression on normal table. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab ADD CONSTRAINT test_tab_con check (a > random()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping constraint expression'); + +# Then we check the constraint expression on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_con check (a > random()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), + 'data replicated to subscriber after dropping constraint expression'); + +# It is not allowed that foreign key on the subscriber side. Check the error +# reported by background worker in this case. +# First we check the foreign key on normal table. +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TABLE test_tab_f (a int primary key); +ALTER TABLE test_tab ADD CONSTRAINT test_tabfk FOREIGN KEY(a) REFERENCES test_tab_f(a); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), + 'data replicated to subscriber after dropping the foreign key'); + +# Then we check the foreign key on partition table. +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TABLE test_tab_partition_f (a int primary key); +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_patition_fk FOREIGN KEY(a) REFERENCES test_tab_partition_f(a); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping the foreign key'); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); -- 2.23.0.windows.1