From 3763a3b454f319f561c8c8bac4eedd81488d8160 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Wed, 14 Apr 2021 22:54:52 -0400 Subject: [PATCH v2] Skip empty transactions for logical replication. The current logical replication behavior is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit these empty transactions. Postpone the BEGIN message until the first change. While processing a COMMIT message, if there is no other change for that transaction, do not send COMMIT message. It means that pgoutput will skip BEGIN / COMMIT messages for transactions that are empty. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 45 +++++++++++++++++++++++++++++ src/include/replication/pgoutput.h | 3 ++ src/test/subscription/t/020_messages.pl | 5 ++-- 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f68348d..64c76d1 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -345,10 +345,28 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = ctx->output_plugin_private; + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, common scenarios is to replicate a set + * of tables (instead of all tables) and transactions whose changes were to + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->xact_wrote_changes = false; + elog(LOG,"Holding of begin"); +} + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + elog(LOG,"Sending begin"); if (send_replication_origin) { @@ -384,8 +402,14 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data = ctx->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (!data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -551,6 +575,13 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes && !in_streaming) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -693,6 +724,13 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes && !in_streaming) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, xid, @@ -725,6 +763,13 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + if (!data->xact_wrote_changes && !in_streaming && transactional) + { + pgoutput_begin(ctx, txn); + data->xact_wrote_changes = true; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 51e7c03..e820790 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -20,6 +20,9 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ + /* control wether messages can already be sent */ + bool xact_wrote_changes; + /* client-supplied info: */ uint32 protocol_version; List *publication_names; diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index c8be26b..2ea790f 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -78,9 +78,8 @@ $result = $node_publisher->safe_psql( 'publication_names', 'tap_pub') )); -# 66 67 == B C == BEGIN COMMIT -is($result, qq(66 -67), +# no message and no BEGIN and COMMIT because of empty transaction optimization +is($result, qq(), 'option messages defaults to false so message (M) is not available on slot'); $node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); -- 1.8.3.1