From 98033c18f65d9b07c5b947b009de19baa5def701 Mon Sep 17 00:00:00 2001 From: "houzj.fnst" Date: Wed, 10 Aug 2022 20:37:22 +0800 Subject: [PATCH v22 3/5] Add some checks before using apply background worker to apply changes. streaming=parallel mode has two requirements: 1) The unique column in the relation on the subscriber-side should also be the unique column on the publisher-side; 2) There cannot be any non-immutable functions used by the subscriber-side replicated table. Look for functions in the following places: * a. Trigger functions * b. Column default value expressions and domain constraints * c. Constraint expressions * d. Foreign keys Without these restrictions, the following scenario may occur: The apply background worker locks a row when processing a streaming transaction, after that the main apply worker tries to lock the same row when processing another non-streamed transaction. At this time, the main apply worker waits for the streaming transaction to complete and the lock to be released, it won't send subsequent data of the streaming transaction to the apply background worker; the apply background worker waits to receive the rest of streaming transaction and can't finish this transaction. Now a deadlock has occurred, so both workers will wait indefinitely. --- doc/src/sgml/ref/create_subscription.sgml | 5 + .../replication/logical/applybgworker.c | 46 ++ src/backend/replication/logical/proto.c | 86 ++- src/backend/replication/logical/relation.c | 214 +++++++ src/backend/replication/logical/tablesync.c | 1 + src/backend/replication/logical/worker.c | 23 +- src/backend/utils/cache/typcache.c | 17 + src/include/replication/logicalproto.h | 1 + src/include/replication/logicalrelation.h | 13 + src/include/replication/worker_internal.h | 1 + src/include/utils/typcache.h | 2 + src/test/subscription/t/015_stream.pl | 33 +- src/test/subscription/t/016_stream_subxact.pl | 33 +- .../subscription/t/022_twophase_cascade.pl | 8 + .../subscription/t/023_twophase_stream.pl | 33 +- .../subscription/t/032_streaming_apply.pl | 598 ++++++++++++++++++ src/tools/pgindent/typedefs.list | 1 + 17 files changed, 1037 insertions(+), 78 deletions(-) create mode 100644 src/test/subscription/t/032_streaming_apply.pl diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index ece27c5a34..56bcbf43e1 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -240,6 +240,11 @@ CREATE SUBSCRIPTION subscription_nameparallel + mode: 1) the unique column in the table on the subscriber-side must + also be the unique column on the publisher-side; 2) there cannot be + any non-immutable functions used by the subscriber-side replicated + table. diff --git a/src/backend/replication/logical/applybgworker.c b/src/backend/replication/logical/applybgworker.c index d5b61ccd52..371408d485 100644 --- a/src/backend/replication/logical/applybgworker.c +++ b/src/backend/replication/logical/applybgworker.c @@ -762,3 +762,49 @@ apply_bgworker_savepoint_name(Oid suboid, TransactionId xid, { snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid); } + +/* + * Check if changes on this relation can be applied using an apply background + * worker. + * + * Although the commit order is maintained by only allowing one process to + * commit at a time, the access order to the relation has changed. This could + * cause unexpected problems if the unique column on the replicated table is + * inconsistent with the publisher-side or contains non-immutable functions + * when applying transactions using an apply background worker. + */ +void +apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) +{ + /* + * Skip check if not using apply background workers. + * + * If any worker is handling the streaming transaction, this check needs to + * be performed not only using the apply background worker, but also in the + * main apply worker. This is because without these restrictions, main + * apply worker may block apply background worker, which will cause + * infinite waits. + */ + if (!am_apply_bgworker() && + (list_length(ApplyBgworkersFreeList) == list_length(ApplyBgworkersList))) + return; + + /* + * Partition table checks are done later in function + * apply_handle_tuple_routing. + */ + if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + return; + + if (rel->parallel_apply != PARALLEL_APPLY_SAFE) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot replicate target relation \"%s.%s\" using " + "subscription parameter streaming=parallel", + rel->remoterel.nspname, rel->remoterel.relname), + errdetail("The unique column on subscriber is not the unique " + "column on publisher or there is at least one " + "non-immutable function."), + errhint("Please change to use subscription parameter " + "streaming=on."))); +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 47bd811fb7..73b2807939 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -23,7 +23,8 @@ /* * Protocol message flags. */ -#define LOGICALREP_IS_REPLICA_IDENTITY 1 +#define ATTR_IS_REPLICA_IDENTITY (1 << 0) +#define ATTR_IS_UNIQUE (1 << 1) #define MESSAGE_TRANSACTIONAL (1<<0) #define TRUNCATE_CASCADE (1<<0) @@ -40,6 +41,66 @@ static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); static void logicalrep_write_namespace(StringInfo out, Oid nspid); static const char *logicalrep_read_namespace(StringInfo in); +/* + * RelationGetUniqueKeyBitmap -- get a bitmap of unique attribute numbers + * + * This is similar to RelationGetIdentityKeyBitmap(), but returns a bitmap of + * index attribute numbers for all unique indexes. + */ +static Bitmapset * +RelationGetUniqueKeyBitmap(Relation rel) +{ + List *indexoidlist = NIL; + ListCell *indexoidscan; + Bitmapset *attunique = NULL; + + if (!rel->rd_rel->relhasindex) + return NULL; + + indexoidlist = RelationGetIndexList(rel); + + foreach(indexoidscan, indexoidlist) + { + Oid indexoid = lfirst_oid(indexoidscan); + Relation indexRel; + int i; + + /* Look up the description for index */ + indexRel = RelationIdGetRelation(indexoid); + + if (!RelationIsValid(indexRel)) + elog(ERROR, "could not open relation with OID %u", indexoid); + + if (!indexRel->rd_index->indisunique) + { + RelationClose(indexRel); + continue; + } + + /* Add referenced attributes to attunique */ + for (i = 0; i < indexRel->rd_index->indnatts; i++) + { + int attrnum = indexRel->rd_index->indkey.values[i]; + + /* + * We don't include non-key columns into attunique + * bitmaps. See RelationGetIndexAttrBitmap. + */ + if (attrnum != 0) + { + if (i < indexRel->rd_index->indnkeyatts && + !bms_is_member(attrnum - FirstLowInvalidHeapAttributeNumber, attunique)) + attunique = bms_add_member(attunique, + attrnum - FirstLowInvalidHeapAttributeNumber); + } + } + RelationClose(indexRel); + } + list_free(indexoidlist); + + return attunique; +} + /* * Check if a column is covered by a column list. * @@ -933,7 +994,8 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) TupleDesc desc; int i; uint16 nliveatts = 0; - Bitmapset *idattrs = NULL; + Bitmapset *idattrs = NULL, + *attunique = NULL; bool replidentfull; desc = RelationGetDescr(rel); @@ -958,6 +1020,9 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* fetch bitmap of UNIQUE attributes */ + attunique = RelationGetUniqueKeyBitmap(rel); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -974,7 +1039,11 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, idattrs)) - flags |= LOGICALREP_IS_REPLICA_IDENTITY; + flags |= ATTR_IS_REPLICA_IDENTITY; + + if (bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + attunique)) + flags |= ATTR_IS_UNIQUE; pq_sendbyte(out, flags); @@ -989,6 +1058,7 @@ logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *columns) } bms_free(idattrs); + bms_free(attunique); } /* @@ -1001,7 +1071,8 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) int natts; char **attnames; Oid *atttyps; - Bitmapset *attkeys = NULL; + Bitmapset *attkeys = NULL, + *attunique = NULL; natts = pq_getmsgint(in, 2); attnames = palloc(natts * sizeof(char *)); @@ -1014,9 +1085,13 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) /* Check for replica identity column */ flags = pq_getmsgbyte(in); - if (flags & LOGICALREP_IS_REPLICA_IDENTITY) + if (flags & ATTR_IS_REPLICA_IDENTITY) attkeys = bms_add_member(attkeys, i); + /* Check for unique column */ + if (flags & ATTR_IS_UNIQUE) + attunique = bms_add_member(attunique, i); + /* attribute name */ attnames[i] = pstrdup(pq_getmsgstring(in)); @@ -1030,6 +1105,7 @@ logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel) rel->attnames = attnames; rel->atttyps = atttyps; rel->attkeys = attkeys; + rel->attunique = attunique; rel->natts = natts; } diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c index e989047681..21592077bb 100644 --- a/src/backend/replication/logical/relation.c +++ b/src/backend/replication/logical/relation.c @@ -19,12 +19,19 @@ #include "access/table.h" #include "catalog/namespace.h" +#include "catalog/pg_proc.h" #include "catalog/pg_subscription_rel.h" +#include "commands/trigger.h" #include "executor/executor.h" #include "nodes/makefuncs.h" +#include "optimizer/optimizer.h" #include "replication/logicalrelation.h" #include "replication/worker_internal.h" +#include "rewrite/rewriteHandler.h" #include "utils/inval.h" +#include "utils/lsyscache.h" +#include "utils/syscache.h" +#include "utils/typcache.h" static MemoryContext LogicalRepRelMapContext = NULL; @@ -91,6 +98,26 @@ logicalrep_relmap_invalidate_cb(Datum arg, Oid reloid) } } +/* + * Relcache invalidation callback to reset parallel_apply flag. + */ +static void +logicalrep_relmap_reset_parallel_cb(Datum arg, int cacheid, uint32 hashvalue) +{ + HASH_SEQ_STATUS hash_seq; + LogicalRepRelMapEntry *entry; + + if (LogicalRepRelMap == NULL) + return; + + hash_seq_init(&hash_seq, LogicalRepRelMap); + while ((entry = hash_seq_search(&hash_seq)) != NULL) + { + entry->parallel_apply = PARALLEL_APPLY_UNKNOWN; + entry->localrelvalid = false; + } +} + /* * Initialize the relation map cache. */ @@ -116,6 +143,9 @@ logicalrep_relmap_init(void) /* Watch for invalidation events. */ CacheRegisterRelcacheCallback(logicalrep_relmap_invalidate_cb, (Datum) 0); + CacheRegisterSyscacheCallback(PROCOID, + logicalrep_relmap_reset_parallel_cb, + (Datum) 0); } /* @@ -142,6 +172,7 @@ logicalrep_relmap_free_entry(LogicalRepRelMapEntry *entry) pfree(remoterel->atttyps); } bms_free(remoterel->attkeys); + bms_free(remoterel->attunique); if (entry->attrmap) free_attrmap(entry->attrmap); @@ -190,6 +221,7 @@ logicalrep_relmap_update(LogicalRepRelation *remoterel) } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); MemoryContextSwitchTo(oldctx); } @@ -310,6 +342,181 @@ logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry) } } +/* + * Check if changes on one relation can be applied using an apply background + * worker and assign the 'parallel_apply' flag. + * + * There are two requirements for applying changes using an apply background + * worker: 1) the unique column in the table on the subscriber-side should also + * be the unique column on the publisher-side; 2) there cannot be any + * non-immutable functions used by the subscriber-side replicated table. + * + * Without these restrictions, the following scenario may occur: The apply + * background worker locks a row when processing a streaming transaction, after + * that the main apply worker tries to lock the same row when processing + * another non-streamed transaction. At this time, the main apply worker waits + * for the streaming transaction to complete and the lock to be released, it + * won't send subsequent data of the streaming transaction to the apply + * background worker; the apply background worker waits to receive the rest of + * streaming transaction and can't finish this transaction. Now a deadlock has + * occurred, so both workers will wait indefinitely. + * + * We just mark the relation entry as 'PARALLEL_APPLY_UNSAFE' here if changes + * on one relation can not be applied using an apply background worker and + * leave it to apply_bgworker_relation_check() to throw the actual error if + * needed. + */ +static void +logicalrep_rel_mark_parallel_apply(LogicalRepRelMapEntry *entry) +{ + Bitmapset *ukey; + int i; + TupleDesc tupdesc; + int attnum; + List *fkeys = NIL; + + /* Fast path if 'parallel_apply' flag is already known. */ + if (entry->parallel_apply != PARALLEL_APPLY_UNKNOWN) + return; + + /* Initialize the flag. */ + entry->parallel_apply = PARALLEL_APPLY_SAFE; + + /* + * First, check if the unique column in the relation on the subscriber-side + * is also the unique column on the publisher-side. + */ + ukey = RelationGetIndexAttrBitmap(entry->localrel, + INDEX_ATTR_BITMAP_KEY); + + if (ukey) + { + i = -1; + while ((i = bms_next_member(ukey, i)) >= 0) + { + attnum = AttrNumberGetAttrOffset(i + FirstLowInvalidHeapAttributeNumber); + + if (entry->attrmap->attnums[attnum] < 0 || + !bms_is_member(entry->attrmap->attnums[attnum], entry->remoterel.attunique)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + + bms_free(ukey); + } + + /* + * Then, check if there is any non-immutable function used by the + * subscriber-side relation. Look for functions in the following places: + * a. trigger functions; + * b. Column default value expressions and domain constraints; + * c. Constraint expressions; + * d. Foreign keys. + */ + /* Check the trigger functions. */ + if (entry->localrel->trigdesc != NULL) + { + for (i = 0; i < entry->localrel->trigdesc->numtriggers; i++) + { + Trigger *trig = entry->localrel->trigdesc->triggers + i; + + if (trig->tgenabled != TRIGGER_FIRES_ALWAYS && + trig->tgenabled != TRIGGER_FIRES_ON_REPLICA) + continue; + + if (func_volatile(trig->tgfoid) != PROVOLATILE_IMMUTABLE) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + + /* Check the columns. */ + tupdesc = RelationGetDescr(entry->localrel); + for (attnum = 0; attnum < tupdesc->natts; attnum++) + { + Form_pg_attribute att = TupleDescAttr(tupdesc, attnum); + + /* We don't need info for dropped or generated attributes */ + if (att->attisdropped || att->attgenerated) + continue; + + if (att->atthasdef) + { + Node *defaultexpr; + + defaultexpr = build_column_default(entry->localrel, attnum + 1); + if (contain_mutable_functions(defaultexpr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + + /* + * If the column is of a DOMAIN type, determine whether + * that domain has any CHECK expressions that are not + * immutable. + */ + if (get_typtype(att->atttypid) == TYPTYPE_DOMAIN) + { + List *domain_constraints; + ListCell *lc; + + domain_constraints = GetDomainConstraints(att->atttypid); + + foreach(lc, domain_constraints) + { + DomainConstraintState *con = (DomainConstraintState *) lfirst(lc); + + if (con->check_expr && contain_mutable_functions((Node *) con->check_expr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + } + + /* Check the constraints. */ + if (tupdesc->constr) + { + ConstrCheck *check = tupdesc->constr->check; + + /* + * Determine if there are any CHECK constraints which + * contains non-immutable function. + */ + for (i = 0; i < tupdesc->constr->num_check; i++) + { + Expr *check_expr = stringToNode(check[i].ccbin); + + if (contain_mutable_functions((Node *) check_expr)) + { + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; + return; + } + } + } + + /* + * Check the foreign keys. + * + * XXX It is better to only check the foreign key when it is detected that + * the publisher does not have the foreign key and the subscriber does. But + * this requires the publisher to send more information. In addition, since + * foreign key does not take effect in the subscriber's apply worker by + * default, it seems that foreign key does not hit this ERROR frequently. + * So, only a simple check based on the subscriber schema is kept here. + */ + fkeys = RelationGetFKeyList(entry->localrel); + if (fkeys) + entry->parallel_apply = PARALLEL_APPLY_UNSAFE; +} + /* * Open the local relation associated with the remote one. * @@ -438,6 +645,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode) */ logicalrep_rel_mark_updatable(entry); + /* Set if changes could be applied using an apply background worker. */ + logicalrep_rel_mark_parallel_apply(entry); + entry->localrelvalid = true; } @@ -653,6 +863,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, } entry->remoterel.replident = remoterel->replident; entry->remoterel.attkeys = bms_copy(remoterel->attkeys); + entry->remoterel.attunique = bms_copy(remoterel->attunique); } entry->localrel = partrel; @@ -696,6 +907,9 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root, /* Set if the table's replica identity is enough to apply update/delete. */ logicalrep_rel_mark_updatable(entry); + /* Set if changes could be applied using an apply background worker. */ + logicalrep_rel_mark_parallel_apply(entry); + entry->localrelvalid = true; /* state and statelsn are left set to 0. */ diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 299bdc6324..0e1d0c0c30 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -884,6 +884,7 @@ fetch_remote_table_info(char *nspname, char *relname, lrel->attnames = palloc0(MaxTupleAttributeNumber * sizeof(char *)); lrel->atttyps = palloc0(MaxTupleAttributeNumber * sizeof(Oid)); lrel->attkeys = NULL; + lrel->attunique = NULL; /* * Store the columns as a list of names. Ignore those that are not diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 10ca2c55ad..c52669e96c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1415,6 +1415,14 @@ apply_handle_stream_stop(StringInfo s) { char action = LOGICAL_REP_MSG_STREAM_STOP; + /* + * Unlike stream_commit, we don't need to wait here for stream_stop to + * finish. Allowing the other transaction to be applied before + * stream_stop is finished can lead to failures if the unique + * index/constraint is different between publisher and subscriber. But + * for such cases, we don't allow streamed transactions to be applied + * in parallel. See apply_bgworker_relation_check. + */ apply_bgworker_send_data(stream_apply_worker, 1, &action); elog(DEBUG1, "stopped streaming of xid %u, %u changes streamed", stream_xid, nchanges); @@ -2069,6 +2077,8 @@ apply_handle_insert(StringInfo s) /* Set relation for error callback */ apply_error_callback_arg.rel = rel; + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2212,6 +2222,8 @@ apply_handle_update(StringInfo s) /* Check if we can do the update. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2380,6 +2392,8 @@ apply_handle_delete(StringInfo s) /* Check if we can do the delete. */ check_relation_updatable(rel); + apply_bgworker_relation_check(rel); + /* Initialize the executor state. */ edata = create_edata_for_relation(rel); estate = edata->estate; @@ -2565,13 +2579,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata, } MemoryContextSwitchTo(oldctx); + part_entry = logicalrep_partition_open(relmapentry, partrel, + attrmap); + /* Check if we can do the update or delete on the leaf partition. */ if (operation == CMD_UPDATE || operation == CMD_DELETE) - { - part_entry = logicalrep_partition_open(relmapentry, partrel, - attrmap); check_relation_updatable(part_entry); - } + + apply_bgworker_relation_check(part_entry); switch (operation) { diff --git a/src/backend/utils/cache/typcache.c b/src/backend/utils/cache/typcache.c index 808f9ebd0d..b248899d82 100644 --- a/src/backend/utils/cache/typcache.c +++ b/src/backend/utils/cache/typcache.c @@ -2540,6 +2540,23 @@ compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2) return 0; } +/* + * GetDomainConstraints --- get DomainConstraintState list of specified domain type + */ +List * +GetDomainConstraints(Oid type_id) +{ + TypeCacheEntry *typentry; + List *constraints = NIL; + + typentry = lookup_type_cache(type_id, TYPECACHE_DOMAIN_CONSTR_INFO); + + if(typentry->domainData != NULL) + constraints = typentry->domainData->constraints; + + return constraints; +} + /* * Load (or re-load) the enumData member of the typcache entry. */ diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index eb0fd24fd8..4395c11f75 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -113,6 +113,7 @@ typedef struct LogicalRepRelation char replident; /* replica identity */ char relkind; /* remote relation kind */ Bitmapset *attkeys; /* Bitmap of key columns */ + Bitmapset *attunique; /* Bitmap of unique columns */ } LogicalRepRelation; /* Type mapping info */ diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h index 78cd7e77f5..cd6c19b5c9 100644 --- a/src/include/replication/logicalrelation.h +++ b/src/include/replication/logicalrelation.h @@ -15,6 +15,17 @@ #include "access/attmap.h" #include "replication/logicalproto.h" +/* + * States to determine if changes on one relation can be applied using an + * apply background worker. + */ +typedef enum ParallelApplySafety +{ + PARALLEL_APPLY_UNKNOWN = 0, + PARALLEL_APPLY_SAFE, + PARALLEL_APPLY_UNSAFE +} ParallelApplySafety; + typedef struct LogicalRepRelMapEntry { LogicalRepRelation remoterel; /* key is remoterel.remoteid */ @@ -31,6 +42,8 @@ typedef struct LogicalRepRelMapEntry Relation localrel; /* relcache entry (NULL when closed) */ AttrMap *attrmap; /* map of local attributes to remote ones */ bool updatable; /* Can apply updates/deletes? */ + ParallelApplySafety parallel_apply; /* Can apply changes in an apply + background worker? */ /* Sync state. */ char state; diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 8f03ad4a65..ba869015d9 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -203,6 +203,7 @@ extern void apply_bgworker_set_status(ApplyBgworkerStatus status); extern void apply_bgworker_subxact_info_add(TransactionId current_xid); extern void apply_bgworker_savepoint_name(Oid suboid, Oid relid, char *spname, int szsp); +extern void apply_bgworker_relation_check(LogicalRepRelMapEntry *rel); static inline bool am_tablesync_worker(void) diff --git a/src/include/utils/typcache.h b/src/include/utils/typcache.h index 431ad7f1b3..ed7c2e7f48 100644 --- a/src/include/utils/typcache.h +++ b/src/include/utils/typcache.h @@ -199,6 +199,8 @@ extern uint64 assign_record_type_identifier(Oid type_id, int32 typmod); extern int compare_values_of_enum(TypeCacheEntry *tcache, Oid arg1, Oid arg2); +extern List *GetDomainConstraints(Oid type_id); + extern size_t SharedRecordTypmodRegistryEstimate(void); extern void SharedRecordTypmodRegistryInit(SharedRecordTypmodRegistry *, diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 9f4946d7d6..7a6cee606e 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -13,7 +13,7 @@ use Test::More; # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + my ($node_publisher, $node_subscriber, $appname) = @_; # Interleave a pair of transactions, each exceeding the 64kB limit. my $in = ''; @@ -24,16 +24,6 @@ sub test_streaming my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0); - # Check that a background worker starts if "streaming" parameter is - # specified as "parallel". We have to look for the DEBUG1 log messages - # about that, so temporarily bump up the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - $in .= q{ BEGIN; INSERT INTO test_tab SELECT i, md5(i::text) FROM generate_series(3, 5000) s(i); @@ -42,14 +32,6 @@ sub test_streaming }; $h->pump_nb; - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/\[Apply BGW #\d+\] started/, 0); - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } - $node_publisher->safe_psql( 'postgres', q{ BEGIN; @@ -163,7 +145,7 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -172,8 +154,13 @@ my $oldpid = $node_publisher->safe_psql('postgres', "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off)"); +# "streaming = parallel" does not support non-immutable functions, so change +# the function in the default expression of column "c". +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab ALTER COLUMN c SET DEFAULT to_timestamp(0); +ALTER SUBSCRIPTION tap_sub SET(streaming = parallel, binary = off); +}); $node_publisher->poll_query_until('postgres', "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" @@ -181,7 +168,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/016_stream_subxact.pl b/src/test/subscription/t/016_stream_subxact.pl index 104a169197..85fc57f00b 100644 --- a/src/test/subscription/t/016_stream_subxact.pl +++ b/src/test/subscription/t/016_stream_subxact.pl @@ -13,17 +13,7 @@ use Test::More; # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; - - # Check that a background worker starts if "streaming" parameter is - # specified as "parallel". We have to look for the DEBUG1 log messages - # about that, so temporarily bump up the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } + my ($node_publisher, $node_subscriber, $appname) = @_; # Insert, update and delete enough rows to exceed 64kB limit. $node_publisher->safe_psql( @@ -51,14 +41,6 @@ sub test_streaming COMMIT; }); - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/\[Apply BGW #\d+\] started/, 0); - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } - $node_publisher->wait_for_catchup($appname); my $result = @@ -119,7 +101,7 @@ my $result = "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -128,8 +110,13 @@ my $oldpid = $node_publisher->safe_psql('postgres', "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); +# "streaming = parallel" does not support non-immutable functions, so change +# the function in the default expression of column "c". +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab ALTER COLUMN c SET DEFAULT to_timestamp(0); +ALTER SUBSCRIPTION tap_sub SET(streaming = parallel); +}); $node_publisher->poll_query_until('postgres', "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" @@ -137,7 +124,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); $node_subscriber->stop; $node_publisher->stop; diff --git a/src/test/subscription/t/022_twophase_cascade.pl b/src/test/subscription/t/022_twophase_cascade.pl index 0a4152d3be..3df1621e1c 100644 --- a/src/test/subscription/t/022_twophase_cascade.pl +++ b/src/test/subscription/t/022_twophase_cascade.pl @@ -470,6 +470,14 @@ test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'on'); ###################################### # Test using streaming mode 'parallel' ###################################### + +# "streaming = parallel" does not support non-immutable functions, so change +# the function in the default expression of column "c". +$node_B->safe_psql( + 'postgres', "ALTER TABLE test_tab ALTER COLUMN c SET DEFAULT to_timestamp(0);"); +$node_C->safe_psql( + 'postgres', "ALTER TABLE test_tab ALTER COLUMN c SET DEFAULT to_timestamp(0);"); + test_streaming($node_A, $node_B, $node_C, $appname_B, $appname_C, 'parallel'); ############################### diff --git a/src/test/subscription/t/023_twophase_stream.pl b/src/test/subscription/t/023_twophase_stream.pl index ea1f4761c3..c49cb10611 100644 --- a/src/test/subscription/t/023_twophase_stream.pl +++ b/src/test/subscription/t/023_twophase_stream.pl @@ -13,7 +13,7 @@ use Test::More; # streaming=parallel cases. sub test_streaming { - my ($node_publisher, $node_subscriber, $appname, $is_parallel) = @_; + my ($node_publisher, $node_subscriber, $appname) = @_; ############################### # Test 2PC PREPARE / COMMIT PREPARED. @@ -23,16 +23,6 @@ sub test_streaming # Expect all data is replicated on subscriber side after the commit. ############################### - # Check that a background worker starts if "streaming" parameter is - # specified as "parallel". We have to look for the DEBUG1 log messages - # about that, so temporarily bump up the log verbosity. - if ($is_parallel) - { - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = debug1"); - $node_subscriber->reload; - } - # check that 2PC gets replicated to subscriber # Insert, update and delete enough rows to exceed the 64kB limit. $node_publisher->safe_psql( @@ -43,14 +33,6 @@ sub test_streaming DELETE FROM test_tab WHERE mod(a,3) = 0; PREPARE TRANSACTION 'test_prepared_tab';}); - if ($is_parallel) - { - $node_subscriber->wait_for_log(qr/\[Apply BGW #\d+\] started/, 0); - $node_subscriber->append_conf('postgresql.conf', - "log_min_messages = warning"); - $node_subscriber->reload; - } - $node_publisher->wait_for_catchup($appname); # check that transaction is in prepared state on subscriber @@ -333,7 +315,7 @@ my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*), count(c), count(d = 999) FROM test_tab"); is($result, qq(2|2|2), 'check initial data was copied to subscriber'); -test_streaming($node_publisher, $node_subscriber, $appname, 0); +test_streaming($node_publisher, $node_subscriber, $appname); ###################################### # Test using streaming mode 'parallel' @@ -342,8 +324,13 @@ my $oldpid = $node_publisher->safe_psql('postgres', "SELECT pid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" ); -$node_subscriber->safe_psql('postgres', - "ALTER SUBSCRIPTION tap_sub SET(streaming = parallel)"); +# "streaming = parallel" does not support non-immutable functions, so change +# the function in the default expression of column "c". +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab ALTER COLUMN c SET DEFAULT to_timestamp(0); +ALTER SUBSCRIPTION tap_sub SET(streaming = parallel); +}); $node_publisher->poll_query_until('postgres', "SELECT pid != $oldpid FROM pg_stat_replication WHERE application_name = '$appname' AND state = 'streaming';" @@ -351,7 +338,7 @@ $node_publisher->poll_query_until('postgres', or die "Timed out while waiting for apply to restart after changing SUBSCRIPTION"; -test_streaming($node_publisher, $node_subscriber, $appname, 1); +test_streaming($node_publisher, $node_subscriber, $appname); ############################### # check all the cleanup diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl new file mode 100644 index 0000000000..abb8219303 --- /dev/null +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -0,0 +1,598 @@ +# Copyright (c) 2022, PostgreSQL Global Development Group + +# Test the restrictions of streaming mode "parallel" in logical replication. +# Without these restrictions, the subscriber's apply worker may fall into an +# infinite wait without the user knowing. +# +# For normal tables, we use deadlock-producing test cases to ensure that future +# modifications do not invalidate constraint checks. +# +# For partitioned tables, we just use test cases to confirm that the constraint +# checks are as expected. + +use strict; +use warnings; +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; +use Test::More; + +my $offset = 0; + +# Create publisher node +my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); +$node_publisher->init(allows_streaming => 'logical'); +$node_publisher->append_conf('postgresql.conf', + 'logical_decoding_work_mem = 64kB'); +$node_publisher->start; + +# Create subscriber node +my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); +$node_subscriber->init; +$node_subscriber->start; + +# Setup structure on publisher +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab1 (a int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab2 (a int)"); +$node_publisher->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar)"); + +# Setup structure on subscriber +# We need to test normal table and partition table. +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab1 (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab2 (a int)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partitioned (a int primary key, b varchar) PARTITION BY RANGE(a)"); +$node_subscriber->safe_psql('postgres', + "CREATE TABLE test_tab_partition (LIKE test_tab_partitioned)"); +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partitioned ATTACH PARTITION test_tab_partition DEFAULT" +); + +# Setup logical replication +my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres'; +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_normal FOR TABLE test_tab1, test_tab2"); +$node_publisher->safe_psql('postgres', + "CREATE PUBLICATION tap_pub_partitioned FOR TABLE test_tab_partitioned"); + +my $appname = 'tap_sub'; +$node_subscriber->safe_psql( + 'postgres', " + CREATE SUBSCRIPTION tap_sub + CONNECTION '$publisher_connstr application_name=$appname' + PUBLICATION tap_pub_normal, tap_pub_partitioned + WITH (streaming = parallel, copy_data = false)"); + +$node_publisher->wait_for_catchup($appname); + +# Interleave a pair of transactions, each exceeding the 64kB limit. +my $in = ''; +my $out = ''; + +my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); + +my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, + on_error_stop => 0); + +# ============================================================================ +# It is not allowed that the unique column in the relation on the +# subscriber-side is not the unique column on the publisher-side. Check the +# error reported by background worker in this case. +# ============================================================================ + +# First we check the unique index on normal table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab1 on test_tab1(a)"); + +$in .= q{ +BEGIN; +INSERT INTO test_tab1 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 values(1)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab1"); + +$node_publisher->wait_for_catchup($appname); + +my $result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(5001), 'data replicated to subscriber after dropping index'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); +$node_publisher->wait_for_catchup($appname); + +# Then we check the unique index on partition table. +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX test_tab_b_partition_idx ON test_tab_partition (b)"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), 'data replicated to subscriber after dropping index'); + +# ============================================================================ +# Triggers which execute non-immutable function are not allowed on the +# subscriber side. Check the error reported by background worker in this case. +# ============================================================================ + +# First we check the trigger function on normal table. +$node_publisher->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab2 on test_tab2(a)"); +$node_subscriber->safe_psql('postgres', + "CREATE UNIQUE INDEX idx_tab2 on test_tab2(a)"); + +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION trigger_func_tab1_unsafe() RETURNS TRIGGER AS \$\$ + BEGIN + INSERT INTO public.test_tab2 VALUES (NEW.*); + RETURN NEW; + END +\$\$ language plpgsql; +CREATE TRIGGER tri_tab1_unsafe +BEFORE INSERT ON public.test_tab1 +FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_unsafe(); +ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_unsafe; + +CREATE FUNCTION trigger_func_tab1_safe() RETURNS TRIGGER AS \$\$ + BEGIN + RAISE NOTICE 'test for safe trigger function'; + RETURN NEW; + END +\$\$ language plpgsql; +ALTER FUNCTION trigger_func_tab1_safe IMMUTABLE; +CREATE TRIGGER tri_tab1_safe +BEFORE INSERT ON public.test_tab1 +FOR EACH ROW EXECUTE PROCEDURE trigger_func_tab1_safe(); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +INSERT INTO test_tab1 VALUES(5001); +INSERT INTO test_tab2 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1 VALUES(5001)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Using trigger with immutable function, now it works. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER tri_tab1_safe; +DROP TRIGGER tri_tab1_unsafe ON public.test_tab1; +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(2), 'data replicated to subscriber after using immutable expression'); + +# Clean up test data from the environment. +$node_subscriber->safe_psql( + 'postgres', qq{ +DROP INDEX idx_tab2; +DROP TRIGGER tri_tab1_safe ON public.test_tab1; +DROP function trigger_func_tab1_unsafe; +DROP function trigger_func_tab1_safe; +}); +$node_publisher->safe_psql( + 'postgres', qq{ +DROP INDEX idx_tab2; +TRUNCATE TABLE test_tab1; +TRUNCATE TABLE test_tab2; +}); +$node_publisher->wait_for_catchup($appname); + +# Then we check the trigger function on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION trigger_func() RETURNS TRIGGER AS \$\$ + BEGIN + RETURN NULL; + END +\$\$ language plpgsql; +CREATE TRIGGER insert_trig +BEFORE INSERT ON test_tab_partition +FOR EACH ROW EXECUTE PROCEDURE trigger_func(); +ALTER TABLE test_tab_partition ENABLE REPLICA TRIGGER insert_trig; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the trigger on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), 'data replicated to subscriber after dropping trigger'); + +# ============================================================================ +# It is not allowed that column default value expression contains a +# non-immutable function on the subscriber side. Check the error reported by +# background worker in this case. +# ============================================================================ + +# First we check the column default value expression on normal table. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); + +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE FUNCTION func_count_tab2() RETURNS INT AS \$\$ + BEGIN + RETURN (SELECT count(*) FROM public.test_tab2); + END +\$\$ language plpgsql; +ALTER TABLE test_tab1 ADD COLUMN b int DEFAULT func_count_tab2(); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +TRUNCATE test_tab2; +INSERT INTO test_tab1 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1(a) VALUES(1)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Alter default values to immutable expression, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 ALTER COLUMN b SET DEFAULT 1"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(5001), + 'data replicated to subscriber after using immutable expression'); + +# Clean up test data from the environment. +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP COLUMN b"); +$node_publisher->safe_psql( + 'postgres', qq{ +TRUNCATE TABLE test_tab1; +TRUNCATE TABLE test_tab2; +}); +$node_publisher->wait_for_catchup($appname); + +# Then we check the column default value expression on partition table. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b SET DEFAULT random()"); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop default value on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping default value expression'); + +# ============================================================================ +# It is not allowed that domain constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. +# ============================================================================ + +# Because the column type of the partition table must be the same as its parent +# table, only test normal table here. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); + +$node_publisher->safe_psql( + 'postgres', qq{ +CREATE DOMAIN tmp_domain AS int CHECK (VALUE > -1); +ALTER TABLE test_tab1 ALTER COLUMN a TYPE tmp_domain; +}); + +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE DOMAIN tmp_domain AS INT CONSTRAINT domain_check CHECK (VALUE >= func_count_tab2()); +ALTER TABLE test_tab1 ALTER COLUMN a TYPE tmp_domain; +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +TRUNCATE test_tab2; +INSERT INTO test_tab1 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1(a) VALUES(1)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Drop domain constraint expression, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER DOMAIN tmp_domain DROP CONSTRAINT domain_check"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(5001), + 'data replicated to subscriber after using immutable expression' +); + +# Clean up test data from the environment. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 ALTER COLUMN a TYPE int"); +$node_publisher->safe_psql( + 'postgres', qq{ +TRUNCATE TABLE test_tab1; +TRUNCATE TABLE test_tab2; +}); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# It is not allowed that constraint expression contains a non-immutable function +# on the subscriber side. Check the error reported by background worker in this +# case. +# ============================================================================ + +# First we check the constraint expression on normal table. +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); + +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab1 ADD CONSTRAINT const_tab1_unsafe CHECK(a >= func_count_tab2()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +TRUNCATE test_tab2; +INSERT INTO test_tab1 SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab1(a) VALUES(1)"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Alter constraint expression to immutable expression, now it works. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_unsafe; +ALTER TABLE test_tab1 ADD CONSTRAINT const_tab1_safe CHECK(a >= 0); +}); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(5001), + 'data replicated to subscriber after using immutable expression'); + +# Clean up test data from the environment. +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1_safe"); +$node_publisher->safe_psql( + 'postgres', qq{ +TRUNCATE TABLE test_tab1; +TRUNCATE TABLE test_tab2; +}); +$node_publisher->wait_for_catchup($appname); + +# Then we check the constraint expression on partition table. +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_con check (a > random()); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab_partitioned"); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(0), + 'data replicated to subscriber after dropping constraint expression'); + +# ============================================================================ +# It is not allowed that foreign key on the subscriber side. Check the error +# reported by background worker in this case. +# ============================================================================ + +# First we check the foreign key on normal table. +$node_publisher->safe_psql( + 'postgres', qq{ +CREATE TABLE tab_nopublic(a int); +ALTER TABLE test_tab2 ADD PRIMARY KEY (a); +ALTER TABLE test_tab2 REPLICA IDENTITY FULL; +INSERT INTO test_tab2 VALUES(1); +}); +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab2 ADD PRIMARY KEY (a);"); + +$node_subscriber->safe_psql( + 'postgres', qq{ +ALTER TABLE test_tab1 ADD CONSTRAINT test_tab1fk FOREIGN KEY(a) REFERENCES test_tab2(a); +SELECT 'ALTER TABLE test_tab1 ENABLE REPLICA TRIGGER "' || tgname || '"' FROM pg_trigger WHERE tgrelid = 'test_tab1'::regclass::oid \\gexec +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$in .= q{ +BEGIN; +INSERT INTO test_tab1(a) VALUES(1); +INSERT INTO tab_nopublic SELECT i FROM generate_series(1, 5000) s(i); +}; +$h->pump_nb; + +$node_publisher->safe_psql('postgres', "DELETE FROM test_tab2"); + +$in .= q{ +COMMIT; +\q +}; +$h->finish; + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); +is($result, qq(1), + 'data replicated to subscriber after dropping the foreign key'); + +# Clean up test data from the environment. +$node_publisher->safe_psql( + 'postgres', qq{ +TRUNCATE TABLE test_tab1; +TRUNCATE TABLE test_tab2; +}); +$node_publisher->wait_for_catchup($appname); + +# Then we check the foreign key on partition table. +$node_publisher->wait_for_catchup($appname); +$node_subscriber->safe_psql( + 'postgres', qq{ +CREATE TABLE test_tab_partition_f (a int primary key); +ALTER TABLE test_tab_partition ADD CONSTRAINT test_tab_patition_fk FOREIGN KEY(a) REFERENCES test_tab_partition_f(a); +}); + +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_partitioned SELECT i, md5(i::text) FROM generate_series(1, 5000) s(i)" +); + +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, + $offset); + +# Drop the foreign key constraint on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); + +$node_publisher->wait_for_catchup($appname); + +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); +is($result, qq(5000), + 'data replicated to subscriber after dropping the foreign key'); + +$node_subscriber->stop; +$node_publisher->stop; + +done_testing(); diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 9ef7bc15f4..12f4103f23 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1891,6 +1891,7 @@ PageXLogRecPtr PagetableEntry Pairs ParallelAppendState +ParallelApplySafety ParallelBitmapHeapState ParallelBlockTableScanDesc ParallelBlockTableScanWorker -- 2.23.0.windows.1