From 433ea40a02ab823f3aa70c18928b9862f0eb004b Mon Sep 17 00:00:00 2001 From: Euler Taveira Date: Fri, 8 Nov 2019 12:48:03 -0300 Subject: [PATCH] Skip empty transactions for logical replication The current logical replication behavior is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit those empty transactions. Postpone the BEGIN message until the first change. While processing a COMMIT message, if there is not a previous wrote change for that transaction, does not send COMMIT message. It means that pgoutput will skip BEGIN / COMMIT messages for transactions that do not wrote changes. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 34 +++++++++++++++++++++++++++++ src/include/replication/pgoutput.h | 3 +++ 2 files changed, 37 insertions(+) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9c08757..eed1093 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -212,6 +212,22 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputData *data = ctx->output_plugin_private; + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, common scenarios is to replicate a set + * of tables (instead of all tables) and transactions whose changes were to + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->xact_wrote_changes = false; +} + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; OutputPluginPrepareWrite(ctx, !send_replication_origin); @@ -249,8 +265,14 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputData *data = ctx->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (!data->xact_wrote_changes) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -335,6 +357,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes) + pgoutput_begin(ctx, txn); + + data->xact_wrote_changes = true; + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -415,6 +443,12 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!data->xact_wrote_changes) + pgoutput_begin(ctx, txn); + + data->xact_wrote_changes = true; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, nrelids, diff --git a/src/include/replication/pgoutput.h b/src/include/replication/pgoutput.h index 8870721..cb57e76 100644 --- a/src/include/replication/pgoutput.h +++ b/src/include/replication/pgoutput.h @@ -20,6 +20,9 @@ typedef struct PGOutputData MemoryContext context; /* private memory context for transient * allocations */ + /* control wether messages can already be sent */ + bool xact_wrote_changes; + /* client info */ uint32 protocol_version; -- 2.7.4