From bea20e940a6d4375f162432b5de3be35fd347238 Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 15 Jun 2022 11:02:08 +0800 Subject: [PATCH v21 4/5] Retry to apply streaming xact only in apply worker When the subscription parameter is set streaming=parallel, the logic tries to apply the streaming transaction using an apply background worker. If this fails the background worker exits with an error. In this case, retry applying the streaming transaction using the normal streaming=on mode. This is done to avoid getting caught in a loop of the same retry errors. A new flag field "subretry" has been introduced to catalog "pg_subscription". If the subscriber exits with an error, this flag will be set true, and whenever the transaction is applied successfully, this flag is reset false. Now, when deciding how to apply a streaming transaction, the logic can know if this transaction has previously failed or not (by checking the "subretry" field). --- doc/src/sgml/catalogs.sgml | 9 ++ doc/src/sgml/ref/create_subscription.sgml | 5 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 2 +- src/backend/commands/subscriptioncmds.c | 1 + src/backend/replication/logical/applybgworker.c | 16 ++- src/backend/replication/logical/worker.c | 173 +++++++++++++++++------- src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 4 + src/test/subscription/t/032_streaming_apply.pl | 168 +++++++++++++++-------- 10 files changed, 272 insertions(+), 112 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 962ea7d..f332ca6 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7910,6 +7910,15 @@ SCRAM-SHA-256$<iteration count>:&l + subretry bool + + + True if the previous apply change failed, necessitating a retry + + + + + subconninfo text diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index bfc2f8ddc..4bd54b2 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -245,6 +245,11 @@ CREATE SUBSCRIPTION subscription_nameparallel mode is disregarded when retrying; + instead the transaction will be applied using on + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index a506fc3..723b141 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index f369b1f..8284cdd 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1299,7 +1299,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subslotname, subsynccommit, subpublications, suborigin) + subretry, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 78b7ee7..a023752 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -689,6 +689,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applybgworker.c b/src/backend/replication/logical/applybgworker.c index a425d94..db80e90 100644 --- a/src/backend/replication/logical/applybgworker.c +++ b/src/backend/replication/logical/applybgworker.c @@ -107,6 +107,18 @@ apply_bgworker_can_start(TransactionId xid) return false; /* + * Don't use apply background workers for retries, because it is possible + * that the last time we tried to apply a transaction using an apply + * background worker the checks failed (see function + * apply_bgworker_relation_check). + */ + if (MySubscription->retry) + { + elog(DEBUG1, "apply background workers are not used for retries"); + return false; + } + + /* * For streaming transactions that are being applied in apply background * worker, we cannot decide whether to apply the change for a relation * that is not in the READY state (see should_apply_changes_for_rel) as we @@ -811,7 +823,5 @@ apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) rel->remoterel.nspname, rel->remoterel.relname), errdetail("The unique column on subscriber is not the unique " "column on publisher or there is at least one " - "non-immutable function."), - errhint("Please change to use subscription parameter " - "streaming=on."))); + "non-immutable function."))); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index c52669e..5d093b1 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -342,7 +342,7 @@ static void store_flush_position(XLogRecPtr remote_lsn); static void maybe_reread_subscription(void); -static void DisableSubscriptionAndExit(void); +static void DisableSubscriptionOnError(void); static void apply_handle_commit_internal(LogicalRepCommitData *commit_data); static void apply_handle_insert_internal(ApplyExecutionData *edata, @@ -379,6 +379,8 @@ static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +static void set_subscription_retry(bool retry); + /* * Should this worker apply changes for given relation. * @@ -929,6 +931,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1040,6 +1045,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1093,6 +1101,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1148,6 +1159,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Reset the retry flag. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1242,6 +1256,9 @@ apply_handle_stream_prepare(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } in_remote_transaction = false; @@ -1671,6 +1688,9 @@ apply_handle_stream_abort(StringInfo s) */ serialize_stream_abort(xid, subxid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } reset_apply_error_context_info(); @@ -1883,6 +1903,9 @@ apply_handle_stream_commit(StringInfo s) /* Unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); } + + /* Reset the retry flag. */ + set_subscription_retry(false); } /* Check the status of apply background worker if any. */ @@ -3927,20 +3950,28 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed during table synchronization */ + pgstat_report_subscription_error(MySubscription->oid, false); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed during table synchronization. Abort - * the current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, false); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); + + proc_exit(0); } PG_END_TRY(); @@ -3965,20 +3996,27 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Emit the error message, and recover from the error state to an idle + * state + */ + HOLD_INTERRUPTS(); + + EmitErrorReport(); + AbortOutOfAnyTransaction(); + FlushErrorState(); + + RESUME_INTERRUPTS(); + + /* Report the worker failed while applying changes */ + pgstat_report_subscription_error(MySubscription->oid, + !am_tablesync_worker()); + if (MySubscription->disableonerr) - DisableSubscriptionAndExit(); - else - { - /* - * Report the worker failed while applying changes. Abort the - * current transaction so that the stats message is sent in an - * idle state. - */ - AbortOutOfAnyTransaction(); - pgstat_report_subscription_error(MySubscription->oid, !am_tablesync_worker()); + DisableSubscriptionOnError(); - PG_RE_THROW(); - } + /* Set the retry flag. */ + set_subscription_retry(true); } PG_END_TRY(); } @@ -4240,39 +4278,20 @@ ApplyWorkerMain(Datum main_arg) } /* - * After error recovery, disable the subscription in a new transaction - * and exit cleanly. + * Disable the subscription in a new transaction. */ static void -DisableSubscriptionAndExit(void) +DisableSubscriptionOnError(void) { - /* - * Emit the error message, and recover from the error state to an idle - * state - */ - HOLD_INTERRUPTS(); - - EmitErrorReport(); - AbortOutOfAnyTransaction(); - FlushErrorState(); - - RESUME_INTERRUPTS(); - - /* Report the worker failed during either table synchronization or apply */ - pgstat_report_subscription_error(MyLogicalRepWorker->subid, - !am_tablesync_worker()); - /* Disable the subscription */ StartTransactionCommand(); DisableSubscription(MySubscription->oid); CommitTransactionCommand(); - /* Notify the subscription has been disabled and exit */ + /* Notify the subscription has been disabled */ ereport(LOG, errmsg("logical replication subscription \"%s\" has been disabled due to an error", MySubscription->name)); - - proc_exit(0); } /* @@ -4507,3 +4526,63 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the transaction was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + if (MySubscription->retry == retry || + am_apply_bgworker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* Reset subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 2a8da26..eb3cbc9 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4507,8 +4507,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index f4e1e94..0969b9f 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -88,6 +88,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry BKI_DEFAULT(f); /* True if the previous apply change failed. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -131,6 +133,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if the previous apply change + * failed. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl index 4dc57c8..f7f66a2 100644 --- a/src/test/subscription/t/032_streaming_apply.pl +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -78,9 +78,13 @@ my $timer = IPC::Run::timeout($PostgreSQL::Test::Utils::timeout_default); my $h = $node_publisher->background_psql('postgres', \$in, \$out, $timer, on_error_stop => 0); +# ============================================================================ # It is not allowed that the unique column in the relation on the # subscriber-side is not the unique column on the publisher-side. Check the -# error reported by background worker in this case. +# error reported by background worker in this case. And after retrying in +# apply worker, we check if the data is replicated successfully. +# ============================================================================ + # First we check the unique index on normal table. $node_subscriber->safe_psql('postgres', "CREATE UNIQUE INDEX idx_tab1 on test_tab1(a)"); @@ -103,14 +107,19 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab1"/, + $offset); + # Drop the unique index on the subscriber, now it works. $node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab1"); +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); -is($result, qq(5001), 'data replicated to subscriber after dropping index'); +is($result, qq(5001), 'data replicated to subscriber after dropping unique index to retry apply'); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); @@ -131,17 +140,23 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscriber after retrying because of unique index'); +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); + +# ============================================================================ # Triggers which execute non-immutable function are not allowed on the # subscriber side. Check the error reported by background worker in this case. +# And after retrying in apply worker, we check if the data is replicated +# successfully. +# ============================================================================ + # First we check the trigger function on normal table. $node_publisher->safe_psql('postgres', "CREATE UNIQUE INDEX idx_tab2 on test_tab2(a)"); @@ -184,22 +199,26 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER tri_tab1 ON public.test_tab1"); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? duplicate key value violates unique constraint "idx_tab2"/, + $offset); +# Drop the unique index on the subscriber, now it works. +$node_subscriber->safe_psql('postgres', "DROP INDEX idx_tab2"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); -is($result, qq(2), 'data replicated to subscriber after dropping trigger'); +is($result, qq(2), 'data replicated to subscriber after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER tri_tab1 ON public.test_tab1"); # Clean up test data from the environment. -$node_subscriber->safe_psql( - 'postgres', qq{ -DROP function trigger_func_tab1; -DROP INDEX idx_tab2; -}); +$node_subscriber->safe_psql('postgres', "DROP function trigger_func_tab1"); $node_publisher->safe_psql( 'postgres', qq{ DROP INDEX idx_tab2; @@ -231,19 +250,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab_partition"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscriber after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); +# ============================================================================ # It is not allowed that column default value expression contains a # non-immutable function on the subscriber side. Check the error reported by -# background worker in this case. +# background worker in this case. And after retrying in apply worker, we check +# if the data is replicated successfully. +# ============================================================================ + # First we check the column default value expression on normal table. $node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); @@ -279,16 +303,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscriber after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 ALTER COLUMN b DROP DEFAULT"); # Clean up test data from the environment. $node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP COLUMN b"); @@ -311,20 +336,25 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscriber after retrying because of column default value'); +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); + +# ============================================================================ # It is not allowed that domain constraint expression contains a non-immutable # function on the subscriber side. Check the error reported by background -# worker in this case. +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. +# ============================================================================ + # Because the column type of the partition table must be the same as its parent # table, only test normal table here. $node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); @@ -363,26 +393,31 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); -# Drop domain constraint expression on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 ALTER COLUMN a TYPE int"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after dropping domain constraint expression' + 'data replicated to subscriber after retrying because of domain' ); +# Drop domain constraint expression on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 ALTER COLUMN a TYPE int"); + # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab2"); $node_publisher->wait_for_catchup($appname); -# It is not allowed that constraint expression contains a non-immutable function -# on the subscriber side. Check the error reported by background worker in this -# case. +# ============================================================================ +# It is not allowed that constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. And after retrying in apply worker, we check if the +# data is replicated successfully. +# ============================================================================ + # First we check the constraint expression on normal table. $node_publisher->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); @@ -413,16 +448,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(5001), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscriber after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab1 DROP CONSTRAINT const_tab1"); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); @@ -444,19 +480,24 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(0), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscriber after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); +# ============================================================================ # It is not allowed that foreign key on the subscriber side. Check the error -# reported by background worker in this case. +# reported by background worker in this case. And after retrying in apply +# worker, we check if the data is replicated successfully. +# ============================================================================ + # First we check the foreign key on normal table. $node_publisher->safe_psql( 'postgres', qq{ @@ -495,15 +536,23 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab1" using subscription parameter streaming=parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); +$node_subscriber->wait_for_log( + qr/ERROR: ( [A-Z0-9]+:)? insert or update on table "test_tab1" violates foreign key constraint "test_tab1fk"/, + $offset); +# Insert dependent data on the publisher, now it works. +$node_subscriber->safe_psql('postgres', "INSERT INTO test_tab2 VALUES(1)"); + +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab1"); is($result, qq(1), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscriber after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab1 DROP CONSTRAINT test_tab1fk"); # Clean up test data from the environment. $node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab1"); @@ -529,16 +578,17 @@ $node_subscriber->wait_for_log( qr/ERROR: ( [A-Z0-9]+:)? cannot replicate target relation "public.test_tab_partitioned" using subscription parameter streaming=parallel/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1