From 6f5437dd58df8837a6a2104d6295cdee7d3e12fd Mon Sep 17 00:00:00 2001 From: wangw Date: Wed, 15 Jun 2022 11:02:08 +0800 Subject: [PATCH v11 5/5] Retry to apply streaming xact only in apply worker. --- doc/src/sgml/catalogs.sgml | 11 ++ doc/src/sgml/ref/create_subscription.sgml | 4 + src/backend/catalog/pg_subscription.c | 1 + src/backend/catalog/system_views.sql | 4 +- src/backend/commands/subscriptioncmds.c | 1 + .../replication/logical/applybgwroker.c | 13 +- src/backend/replication/logical/worker.c | 89 +++++++++++ src/bin/pg_dump/pg_dump.c | 5 +- src/include/catalog/pg_subscription.h | 4 + .../subscription/t/032_streaming_apply.pl | 139 ++++++++++-------- 10 files changed, 203 insertions(+), 68 deletions(-) diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 5cb39b18bd..928ad02ace 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7907,6 +7907,17 @@ SCRAM-SHA-256$<iteration count>:&l + + + subretry bool + + + If true, the subscription will not try to apply streaming transaction + in apply mode. See + for more information. + + + subconninfo text diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 8de1a23ce4..70fbf81668 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -244,6 +244,10 @@ CREATE SUBSCRIPTION subscription_nameon + mode. diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index add51caadf..1824390d7d 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -71,6 +71,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->stream = subform->substream; sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; + sub->retry = subform->subretry; /* Get conninfo */ datum = SysCacheGetAttr(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fedaed533b..10f4dd6785 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1298,8 +1298,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public; -- All columns of pg_subscription except subconninfo are publicly readable. REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, - subbinary, substream, subtwophasestate, subdisableonerr, subslotname, - subsynccommit, subpublications) + subbinary, substream, subtwophasestate, subdisableonerr, + subretry, subslotname, subsynccommit, subpublications) ON pg_subscription TO public; CREATE VIEW pg_stat_subscription_stats AS diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 4080bba987..38697184d9 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -663,6 +663,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_PENDING : LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(false); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) diff --git a/src/backend/replication/logical/applybgwroker.c b/src/backend/replication/logical/applybgwroker.c index 7d3c61c2e9..02b1ef5e62 100644 --- a/src/backend/replication/logical/applybgwroker.c +++ b/src/backend/replication/logical/applybgwroker.c @@ -103,6 +103,13 @@ apply_bgworker_find_or_start(TransactionId xid, bool start) if (start && !XLogRecPtrIsInvalid(MySubscription->skiplsn)) return NULL; + if (start && MySubscription->retry) + { + elog(DEBUG1, "retry to apply an streaming transaction in apply " + "background worker"); + return NULL; + } + /* * For streaming transactions that are being applied in apply background * worker, we cannot decide whether to apply the change for a relation @@ -794,8 +801,7 @@ apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) if (!rel->sameunique) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot replicate relation with different unique index"), - errhint("Please change the streaming option to 'on' instead of 'apply'."))); + errmsg("cannot replicate relation with different unique index"))); /* * Check if there is any non-immutable function present in expression in @@ -804,7 +810,6 @@ apply_bgworker_relation_check(LogicalRepRelMapEntry *rel) if (rel->volatility == FUNCTION_NONIMMUTABLE) ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot replicate relation. There is at least one non-immutable function"), - errhint("Please change the streaming option to 'on' instead of 'apply'."))); + errmsg("cannot replicate relation. There is at least one non-immutable function"))); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 6770973e39..064b97a84a 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -374,6 +374,8 @@ static void clear_subscription_skip_lsn(XLogRecPtr finish_lsn); static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn); static inline void reset_apply_error_context_info(void); +static void set_subscription_retry(bool retry); + /* * Should this worker apply changes for given relation. * @@ -890,6 +892,9 @@ apply_handle_commit(StringInfo s) apply_handle_commit_internal(&commit_data); + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1001,6 +1006,9 @@ apply_handle_prepare(StringInfo s) in_remote_transaction = false; + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); + /* Check the status of apply background worker if any. */ apply_bgworker_check_status(); @@ -1054,6 +1062,9 @@ apply_handle_commit_prepared(StringInfo s) store_flush_position(prepare_data.end_lsn); in_remote_transaction = false; + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(prepare_data.end_lsn); @@ -1109,6 +1120,9 @@ apply_handle_rollback_prepared(StringInfo s) store_flush_position(rollback_data.rollback_end_lsn); in_remote_transaction = false; + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); + /* Process any tables that are being synchronized in parallel. */ process_syncing_tables(rollback_data.rollback_end_lsn); @@ -1195,6 +1209,9 @@ apply_handle_stream_prepare(StringInfo s) /* unlink the files with serialized changes and subxact info. */ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); } + + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); } in_remote_transaction = false; @@ -1615,6 +1632,9 @@ apply_handle_stream_abort(StringInfo s) */ else serialize_stream_abort(xid, subxid); + + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); } } @@ -2786,6 +2806,9 @@ apply_handle_stream_commit(StringInfo s) /* unlink the files with serialized changes and subxact info */ stream_cleanup_files(MyLogicalRepWorker->subid, xid); } + + /* Set the flag that we will not retry later. */ + set_subscription_retry(false); } /* Check the status of apply background worker if any. */ @@ -3854,6 +3877,9 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } PG_CATCH(); { + /* Set the flag that we will retry later. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -3892,6 +3918,9 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* Set the flag that we will retry later. */ + set_subscription_retry(true); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -4393,3 +4422,63 @@ reset_apply_error_context_info(void) apply_error_callback_arg.remote_attnum = -1; set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr); } + +/* + * Set subretry of pg_subscription catalog. + * + * If retry is true, subscriber is about to exit with an error. Otherwise, it + * means that the changes was applied successfully. + */ +static void +set_subscription_retry(bool retry) +{ + Relation rel; + HeapTuple tup; + bool started_tx = false; + bool nulls[Natts_pg_subscription]; + bool replaces[Natts_pg_subscription]; + Datum values[Natts_pg_subscription]; + + if (MySubscription->retry == retry || + am_apply_bgworker()) + return; + + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + /* Look up the subscription in the catalog */ + rel = table_open(SubscriptionRelationId, RowExclusiveLock); + tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, + ObjectIdGetDatum(MySubscription->oid)); + + if (!HeapTupleIsValid(tup)) + elog(ERROR, "subscription \"%s\" does not exist", MySubscription->name); + + LockSharedObject(SubscriptionRelationId, MySubscription->oid, 0, + AccessShareLock); + + /* Form a new tuple. */ + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + memset(replaces, false, sizeof(replaces)); + + /* reset subretry */ + values[Anum_pg_subscription_subretry - 1] = BoolGetDatum(retry); + replaces[Anum_pg_subscription_subretry - 1] = true; + + tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls, + replaces); + + /* Update the catalog. */ + CatalogTupleUpdate(rel, &tup->t_self, tup); + + /* Cleanup. */ + heap_freetuple(tup); + table_close(rel, NoLock); + + if (started_tx) + CommitTransactionCommand(); +} diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6f8b30abb0..9b3ffe0888 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -4472,8 +4472,9 @@ getSubscriptions(Archive *fout) ntups = PQntuples(res); /* - * Get subscription fields. We don't include subskiplsn in the dump as - * after restoring the dump this value may no longer be relevant. + * Get subscription fields. We don't include subskiplsn and subretry in + * the dump as after restoring the dump this value may no longer be + * relevant. */ i_tableoid = PQfnumber(res, "tableoid"); i_oid = PQfnumber(res, "oid"); diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h index 9b394a45fe..dc7597bf80 100644 --- a/src/include/catalog/pg_subscription.h +++ b/src/include/catalog/pg_subscription.h @@ -76,6 +76,8 @@ CATALOG(pg_subscription,6100,SubscriptionRelationId) BKI_SHARED_RELATION BKI_ROW bool subdisableonerr; /* True if a worker error should cause the * subscription to be disabled */ + bool subretry; /* True if the previous apply change failed. */ + #ifdef CATALOG_VARLEN /* variable-length fields start here */ /* Connection string to the publisher */ text subconninfo BKI_FORCE_NOT_NULL; @@ -115,6 +117,8 @@ typedef struct Subscription bool disableonerr; /* Indicates if the subscription should be * automatically disabled if a worker error * occurs */ + bool retry; /* Indicates if the previous apply change + * failed. */ char *conninfo; /* Connection string to the publisher */ char *slotname; /* Name of the replication slot */ char *synccommit; /* Synchronous commit setting for worker */ diff --git a/src/test/subscription/t/032_streaming_apply.pl b/src/test/subscription/t/032_streaming_apply.pl index 84a6900b33..8f2e254b39 100644 --- a/src/test/subscription/t/032_streaming_apply.pl +++ b/src/test/subscription/t/032_streaming_apply.pl @@ -58,7 +58,9 @@ $node_subscriber->safe_psql( $node_publisher->wait_for_catchup($appname); # It is not allowed that the unique index on the publisher and the subscriber -# is different. Check the error reported by background worker in this case. +# is different. Check the error reported by background worker in this case. And +# after retrying in apply worker, we check if the data is replicated +# successfully. # First we check the unique index on normal table. $node_subscriber->safe_psql('postgres', "CREATE UNIQUE INDEX test_tab_b_idx ON test_tab (b)"); @@ -82,14 +84,15 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation with different unique index/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); my $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscribers after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_idx"); # Then we check the unique index on partition table. $node_subscriber->safe_psql('postgres', @@ -106,17 +109,20 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation with different unique index/, $offset); -# Drop the unique index on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(5000), 'data replicated to subscriber after dropping index'); +is($result, qq(5000), 'data replicated to subscribers after retrying because of unique index'); + +# Drop the unique index on the subscriber. +$node_subscriber->safe_psql('postgres', "DROP INDEX test_tab_b_partition_idx"); # Triggers which execute non-immutable function are not allowed on the # subscriber side. Check the error reported by background worker in this case. +# And after retrying in apply worker, we check if the data is replicated +# successfully. # First we check the trigger function on normal table. $node_subscriber->safe_psql( 'postgres', qq{ @@ -140,15 +146,16 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscribers after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab"); # Then we check the unique index on partition table. $node_subscriber->safe_psql( @@ -168,19 +175,21 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop the trigger on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "DROP TRIGGER insert_trig ON test_tab_partition"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); -is($result, qq(0), 'data replicated to subscriber after dropping trigger'); +is($result, qq(0), 'data replicated to subscribers after retrying because of trigger'); + +# Drop the trigger on the subscriber. +$node_subscriber->safe_psql('postgres', + "DROP TRIGGER insert_trig ON test_tab_partition"); # It is not allowed that column default value expression contains a # non-immutable function on the subscriber side. Check the error reported by -# background worker in this case. +# background worker in this case. And after retrying in apply worker, we check +# if the data is replicated successfully. # First we check the column default value expression on normal table. $node_subscriber->safe_psql('postgres', "ALTER TABLE test_tab ALTER COLUMN b SET DEFAULT random()"); @@ -196,16 +205,17 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscribers after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN b DROP DEFAULT"); # Then we check the column default value expression on partition table. $node_subscriber->safe_psql('postgres', @@ -222,20 +232,22 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop default value on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping default value expression'); + 'data replicated to subscribers after retrying because of column default value'); + +# Drop default value on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition ALTER COLUMN b DROP DEFAULT"); # It is not allowed that domain constraint expression contains a non-immutable # function on the subscriber side. Check the error reported by background -# worker in this case. +# worker in this case. And after retrying in apply worker, we check if the data +# is replicated successfully. # Because the column type of the partition table must be the same as its parent # table, only test normal table here. $node_subscriber->safe_psql( @@ -253,21 +265,23 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop domain constraint expression on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(0), - 'data replicated to subscriber after dropping domain constraint expression' + 'data replicated to subscribers after retrying because of domain' ); -# It is not allowed that constraint expression contains a non-immutable function -# on the subscriber side. Check the error reported by background worker in this -# case. +# Drop domain constraint expression on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab ALTER COLUMN a TYPE int"); + +# It is not allowed that constraint expression contains a non-immutable +# function on the subscriber side. Check the error reported by background +# worker in this case. And after retrying in apply worker, we check if the data +# is replicated successfully. # First we check the constraint expression on normal table. $node_subscriber->safe_psql( 'postgres', qq{ @@ -285,16 +299,17 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscribers after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tab_con"); # Then we check the constraint expression on partition table. $node_subscriber->safe_psql( @@ -311,19 +326,21 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(0), - 'data replicated to subscriber after dropping constraint expression'); + 'data replicated to subscribers after retrying because of constraint'); + +# Drop constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_con"); # It is not allowed that foreign key on the subscriber side. Check the error -# reported by background worker in this case. +# reported by background worker in this case. And after retrying in apply +# worker, we check if the data is replicated successfully. # First we check the foreign key on normal table. $node_publisher->safe_psql('postgres', "DELETE FROM test_tab"); $node_publisher->wait_for_catchup($appname); @@ -344,16 +361,17 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab DROP CONSTRAINT test_tabfk"); # Then we check the foreign key on partition table. $node_publisher->wait_for_catchup($appname); @@ -374,16 +392,17 @@ $node_subscriber->wait_for_log( qr/ERROR: cannot replicate relation. There is at least one non-immutable function/, $offset); -# Drop the foreign key constraint on the subscriber, now it works. -$node_subscriber->safe_psql('postgres', - "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); - +# Wait for this streaming transaction to be applied in the apply worker. $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_partitioned"); is($result, qq(5000), - 'data replicated to subscriber after dropping the foreign key'); + 'data replicated to subscribers after retrying because of foreign key'); + +# Drop the foreign key constraint on the subscriber. +$node_subscriber->safe_psql('postgres', + "ALTER TABLE test_tab_partition DROP CONSTRAINT test_tab_patition_fk"); $node_subscriber->stop; $node_publisher->stop; -- 2.23.0.windows.1