From 4f76b34b76e086020d25bb551b2c82dc5db0721e Mon Sep 17 00:00:00 2001 From: Hou Zhijie Date: Sun, 11 Dec 2022 19:11:53 +0800 Subject: [PATCH v60 5/7] Add GUC stream_serialize_threshold and test serializing messages to disk --- doc/src/sgml/config.sgml | 32 ++++ .../replication/logical/applyparallelworker.c | 54 ++++--- src/backend/replication/logical/worker.c | 9 ++ src/backend/utils/misc/guc_tables.c | 14 ++ src/include/replication/worker_internal.h | 4 + .../t/032_stream_parallel_conflict.pl | 141 +++++++++++++++++- 6 files changed, 230 insertions(+), 24 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 6454573a0b..80e42f8f99 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -11616,6 +11616,38 @@ LOG: CleanUpLock: deleting: lock(0xb7acd844) id(24688,24696,0,0,0,1) + + stream_serialize_threshold (integer) + + stream_serialize_threshold configuration parameter + + + + + Forces the leader apply worker to serialize messages to files after + sending specified amount of streaming chunks to the parallel apply + worker. Setting this to zero serialize all messages. A value of + -1 (the default) disables this feature. This is + intended to test serialization to files with + streaming = parallel. + + + + When logical replication subscription streaming + parameter is set to parallel, the leader apply worker + sends messages to parallel workers with a timeout. By default, the + leader apply worker will serialize the remaining messages to files if + the timeout is exceeded. If this option is set to any value other than + -1, serialize to files even without timeout. + + + + This parameter can only be set in the postgresql.conf + file or on the server command line. + + + + diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 2e2cd0d535..32d003e126 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -252,6 +252,9 @@ static ParallelApplyWorkerInfo *stream_apply_worker = NULL; /* A list to maintain subtransactions, if any. */ static List *subxactlist = NIL; +/* GUC variable */ +int stream_serialize_threshold = -1; + static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); static PartialFileSetState pa_get_fileset_state(void); @@ -1226,32 +1229,43 @@ pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data, for (;;) { - result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); + bool force_serialize = (stream_serialize_threshold != -1 && + (stream_serialize_threshold == 0 || + stream_serialize_threshold < parallel_stream_nchunks)); - if (result == SHM_MQ_SUCCESS) - break; - else if (result == SHM_MQ_DETACHED) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not send data to shared-memory queue"))); + if (!force_serialize) + { + result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); - Assert(result == SHM_MQ_WOULD_BLOCK); + if (result == SHM_MQ_SUCCESS) + break; + else if (result == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not send data to shared-memory queue"))); - /* Wait before retrying. */ - rc = WaitLatch(MyLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, - SHM_SEND_RETRY_INTERVAL_MS, - WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + Assert(result == SHM_MQ_WOULD_BLOCK); - if (rc & WL_LATCH_SET) - { - ResetLatch(MyLatch); - CHECK_FOR_INTERRUPTS(); + /* Wait before retrying. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, + SHM_SEND_RETRY_INTERVAL_MS, + WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + + if (startTime == 0) + startTime = GetCurrentTimestamp(); } + else if (stream_serialize_threshold != 0) + parallel_stream_nchunks = 0; - if (startTime == 0) - startTime = GetCurrentTimestamp(); - else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), + if (force_serialize || + TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), SHM_SEND_TIMEOUT_MS)) { StringInfoData msg; diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b9c9f7fd50..6fe2cb5f33 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -326,6 +326,12 @@ static TransactionId stream_xid = InvalidTransactionId; */ static uint32 parallel_stream_nchanges = 0; +/* + * The number of streaming chunks sent by leader apply worker during one + * streamed transaction. This is only used when stream_serialize_threshold > 0. + */ +uint32 parallel_stream_nchunks = 0; + /* * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for * the subscription if the remote transaction's finish LSN matches the subskiplsn. @@ -1556,6 +1562,9 @@ apply_handle_stream_start(StringInfo s) case TRANS_LEADER_SEND_TO_PARALLEL: Assert(winfo); + if (stream_serialize_threshold > 0) + parallel_stream_nchunks++; + pa_send_data(winfo, s->len, s->data, !first_segment); if (!winfo->serialize_changes) diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index 21b125fb26..df0e30b35e 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -61,6 +61,7 @@ #include "replication/logicallauncher.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "replication/worker_internal.h" #include "storage/bufmgr.h" #include "storage/large_object.h" #include "storage/pg_shmem.h" @@ -3003,6 +3004,19 @@ struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"stream_serialize_threshold", PGC_SIGHUP, DEVELOPER_OPTIONS, + gettext_noop("Forces the leader apply worker to serialize messages " + "to files after sending specified amount of streaming " + "chunks in streaming parallel mode."), + gettext_noop("A value of -1 disables this feature."), + GUC_NOT_IN_SAMPLE + }, + &stream_serialize_threshold, + -1, -1, INT_MAX, + NULL, NULL, NULL + }, + { {"log_rotation_age", PGC_SIGHUP, LOGGING_WHERE, gettext_noop("Sets the amount of time to wait before forcing " diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h index 232fb92670..84c59c35d4 100644 --- a/src/include/replication/worker_internal.h +++ b/src/include/replication/worker_internal.h @@ -216,6 +216,10 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker; extern PGDLLIMPORT bool in_remote_transaction; +extern PGDLLIMPORT int stream_serialize_threshold; + +extern PGDLLIMPORT uint32 parallel_stream_nchunks; + extern void logicalrep_worker_attach(int slot); extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid, bool only_running); diff --git a/src/test/subscription/t/032_stream_parallel_conflict.pl b/src/test/subscription/t/032_stream_parallel_conflict.pl index 10efd2d376..59e7e7534f 100644 --- a/src/test/subscription/t/032_stream_parallel_conflict.pl +++ b/src/test/subscription/t/032_stream_parallel_conflict.pl @@ -13,8 +13,11 @@ my $offset = 0; # Create publisher node my $node_publisher = PostgreSQL::Test::Cluster->new('publisher'); $node_publisher->init(allows_streaming => 'logical'); -$node_publisher->append_conf('postgresql.conf', - 'logical_decoding_work_mem = 64kB'); +$node_publisher->append_conf( + 'postgresql.conf', qq( + logical_decoding_work_mem = 64kB + max_prepared_transactions = 10 +)); $node_publisher->start; # Create subscriber node @@ -24,7 +27,11 @@ $node_subscriber->init; # Check if any streaming chunks are applied using the parallel apply worker. We # have to look for the DEBUG1 log messages about that, so bump up the log # verbosity. -$node_subscriber->append_conf('postgresql.conf', "log_min_messages = debug1"); +$node_subscriber->append_conf( + 'postgresql.conf', qq( + log_min_messages = debug1 + max_prepared_transactions = 10 +)); $node_subscriber->start; @@ -47,7 +54,7 @@ $node_subscriber->safe_psql( CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub - WITH (streaming = parallel, copy_data = false)"); + WITH (streaming = parallel, two_phase = on, copy_data = false)"); $node_publisher->wait_for_catchup($appname); @@ -146,6 +153,132 @@ $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); is($result, qq(10000), 'data replicated to subscriber after dropping index'); +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# ============================================================================ +# Test serializing messages to disk +# ============================================================================ + +# Set stream_serialize_threshold to zero, so the messages will be serialized to disk. +$node_subscriber->safe_psql('postgres', + 'ALTER SYSTEM SET stream_serialize_threshold = 0;'); +$node_subscriber->reload; + +# Serialize the COMMIT transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql('postgres', "INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i)"); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the PREPARE transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + PREPARE TRANSACTION 'xact'; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is in prepared state on subscriber +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_prepared_xacts;"); +is($result, qq(1), 'transaction is prepared on subscriber'); + +# Check that 2PC gets committed on subscriber +$node_publisher->safe_psql('postgres', + "COMMIT PREPARED 'xact';"); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is committed on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT top-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + ROLLBACK; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that transaction is aborted on subscriber +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(0), 'data replicated to subscriber by serializing messages to disk'); + +# Clean up test data from the environment. +$node_publisher->safe_psql('postgres', "TRUNCATE TABLE test_tab"); +$node_publisher->wait_for_catchup($appname); + +# Serialize the ABORT sub-transaction. +# Check the subscriber log from now on. +$offset = -s $node_subscriber->logfile; + +$node_publisher->safe_psql( + 'postgres', q{ + BEGIN; + INSERT INTO test_tab SELECT i FROM generate_series(1, 5000) s(i); + SAVEPOINT sp; + INSERT INTO test_tab SELECT i FROM generate_series(5001, 10000) s(i); + ROLLBACK TO sp; + COMMIT; + }); + +# Ensure that the messages are serialized. +$node_subscriber->wait_for_log( + qr/DEBUG: ( [A-Z0-9]+:)? opening file ".*\.changes" for streamed changes/, + $offset); + +$node_publisher->wait_for_catchup($appname); + +# Check that only sub-transaction is aborted on subscriber. +$result = + $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM test_tab"); +is($result, qq(5000), 'data replicated to subscriber by serializing messages to disk'); + $node_subscriber->stop; $node_publisher->stop; -- 2.23.0.windows.1