From 10e4538734fc1d512efce5f86e24ce53a528862b Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 2 Jan 2023 15:37:25 +0530 Subject: [PATCH v77 2/5] Add GUC stream_serialize_threshold and test serializing messages to disk. --- doc/src/sgml/config.sgml | 32 +++++ .../replication/logical/applyparallelworker.c | 12 ++ src/backend/replication/logical/worker.c | 9 ++ src/backend/utils/misc/guc_tables.c | 14 ++ src/include/replication/worker_internal.h | 4 + src/test/subscription/t/015_stream.pl | 144 ++++++++++++++++++++- 6 files changed, 212 insertions(+), 3 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index a33a935..669babf 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11650,6 +11650,38 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + stream_serialize_threshold (integer) + + stream_serialize_threshold configuration parameter + + + + + Forces the leader apply worker to serialize messages to files after + sending specified amount of streaming chunks to the parallel apply + worker. Setting this to zero serialize all messages. A value of + -1 (the default) disables this feature. This is + intended to test serialization to files with + streaming = parallel. + + + + When logical replication subscription streaming + parameter is set to parallel, the leader apply worker + sends messages to parallel workers with a timeout. By default, the + leader apply worker will serialize the remaining messages to files if + the timeout is exceeded. If this option is set to any value other than + -1, serialize to files even without timeout. + + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1b3b741..3d7f8a8 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -254,6 +254,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* GUC variable */ +int stream_serialize_threshold = -1; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); @@ -1149,6 +1152,15 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) Assert(!IsTransactionState()); Assert(!winfo->serialize_changes); + /* Force to serialize messages if stream_serialize_threshold is reached. */ + if (stream_serialize_threshold != -1 && + (stream_serialize_threshold == 0 || + stream_serialize_threshold < parallel_stream_nchunks)) + { + parallel_stream_nchunks = 0; + return false; + } + /* * This timeout is a bit arbitrary but testing revealed that it is sufficient * to send the message unless the parallel apply worker is waiting on some diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 3198c97..3e5b759 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -328,6 +328,12 @@ static TransactionId stream_xid = InvalidTransactionId; static uint32 parallel_stream_nchanges = 0; /* + * The number of streaming chunks sent by leader apply worker during one + * streamed transaction. This is only used when stream_serialize_threshold > 0. + */ +uint32 parallel_stream_nchunks = 0; + +/* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. * Once we start skipping changes, we don't stop it until we skip all changes of @@ -1521,6 +1527,9 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + if (stream_serialize_threshold > 0) + parallel_stream_nchunks++; + /* * Once we start serializing the changes, the parallel apply * worker will wait for the leader to release the stream lock diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 92545b4..302ceb7 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -61,6 +61,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3015,6 +3016,19 @@ struct config_int ConfigureNamesInt[] = }, { + {"stream_serialize_threshold", PGC_SIGHUP, DEVELOPER_OPTIONS, + gettext_noop("Forces the leader apply worker to serialize messages " + "to files after sending specified amount of streaming " + "chunks in streaming parallel mode."), + gettext_noop("A value of -1 disables this feature."), + GUC_NOT_IN_SAMPLE + }, + &stream_serialize_threshold, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " "log file rotation."), diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index db891ee..a682791 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -225,6 +225,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern PGDLLIMPORT int stream_serialize_threshold; + +extern PGDLLIMPORT uint32 parallel_stream_nchunks; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/test/subscription/t/015_stream.pl b/src/test/subscription/t/015_stream.pl index 91e8aa8..83d6956 100644 --- a/src/test/subscription/t/015_stream.pl +++ b/src/test/subscription/t/015_stream.pl @@ -133,13 +133,20 @@ sub test_streaming # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); -$node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); +$node_publisher->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +logical_decoding_work_mem = 64kB +)); $node_publisher->start; # Create subscriber node my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber'); $node_subscriber->init(allows_streaming => 'logical'); +$node_subscriber->append_conf( + 'postgresql.conf', qq( +max_prepared_transactions = 10 +)); $node_subscriber->start; # Create some preexisting content on publisher @@ -170,7 +177,7 @@ my $appname = 'tap_sub'; # Test using streaming mode 'on' ################################ $node_subscriber->safe_psql('postgres', - "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on)" + "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub WITH (streaming = on, two_phase = on)" ); # Wait for initial table sync to finish @@ -312,6 +319,137 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); + +# Test serializing messages to disk + +# Set stream_serialize_threshold to zero, so the messages will be serialized to disk. +$node_subscriber->safe_psql('postgres', + 'ALTER SYSTEM SET stream_serialize_threshold = 0;'); +$node_subscriber->reload; + +# Run a query to make sure that the reload has taken effect. +$node_subscriber->safe_psql('postgres', q{SELECT 1}); + +# Serialize the COMMIT transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', + "INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5000), + 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the PREPARE transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5000), + 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT top-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); + ROLLBACK; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(0), + 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab_2"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT sub-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab_2 SELECT i FROM generate_series(1, 5000) s(i); + SAVEPOINT sp; + INSERT INTO test_tab_2 SELECT i FROM generate_series(5001, 10000) s(i); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab_2"); +is($result, qq(5000), + 'data replicated to subscriber by serializing messages to disk'); + $node_subscriber->stop; $node_publisher->stop; -- 2.7.2.windows.1