From 2bebd82bc25af8d1d5b3420fea183a090d207163 Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Tue, 6 Jul 2021 14:22:01 +1000 Subject: [PATCH v93] Skip empty streaming in-progress transaction for logical replication. This improves the behaviour of skipping empty transaction to also include empty streamed in-progress transactions. --- src/backend/replication/pgoutput/pgoutput.c | 142 ++++++++++++++++++++++++---- 1 file changed, 124 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7ebdb4e..02ed5a6 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -65,6 +65,8 @@ static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, @@ -134,9 +136,21 @@ typedef struct RelationSyncEntry TupleConversionMap *map; } RelationSyncEntry; +/* + * Maintain the per-transaction level variables to track whether the + * transaction and or streams have written any changes. + * BEGIN / BEGIN PREPARE is held back until the first + * change needs to be sent. In streaming mode the transaction can + * be decoded in streams, so along with maintaining whether the + * transaction has written any changes, we also need to track whether the + * current stream has written any changes. START STREAM is held back until + * the first change is streamed. This is done so that empty transactions and + * streams which do not have any changes can be dropped. + */ typedef struct PGOutputTxnData { bool sent_begin_txn; /* flag indicating whether begin has been sent */ + bool sent_stream_start; /* flag indicating if stream start has been sent */ } PGOutputTxnData; /* Map used to remember which relation schemas we sent. */ @@ -746,9 +760,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; - /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ - if (!in_streaming) - Assert(txndata); + /* should have set up txndata as part of BEGIN/BEGIN PREPARE/START STREAM */ + Assert(txndata); if (!is_publishable_relation(relation)) return; @@ -783,8 +796,11 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); /* output BEGIN if we haven't yet */ - if (!in_streaming && !txndata->sent_begin_txn) + else if (!txndata->sent_begin_txn) { if (rbtxn_prepared(txn)) pgoutput_begin_prepare(ctx, txn); @@ -902,9 +918,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ - if (!in_streaming) - Assert(txndata); + /* Should have setup txndata as part of BEGIN/BEGIN PREPARE/START STREAM */ + Assert(txndata); /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) @@ -942,8 +957,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); /* output BEGIN if we haven't yet */ - if (!in_streaming && !txndata->sent_begin_txn) + else if (!txndata->sent_begin_txn) { if (rbtxn_prepared(txn)) pgoutput_begin_prepare(ctx, txn); @@ -984,16 +1002,24 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; - /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ - if (!in_streaming && transactional) + /* Set up txndata for streaming and transactional messages */ + if (in_streaming || transactional) { txndata = (PGOutputTxnData *) txn->output_plugin_private; - if (!txndata->sent_begin_txn) + + /* If streaming, send STREAM START if we haven't yet */ + if (in_streaming && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + else if (transactional) { - if (rbtxn_prepared(txn)) - pgoutput_begin_prepare(ctx, txn); - else - pgoutput_begin(ctx, txn); + if (!txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } } } @@ -1076,12 +1102,37 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = txn->output_plugin_private; /* we can't nest streaming of transactions */ Assert(!in_streaming); /* + * Don't actually send stream start here, instead set a flag that indicates + * that stream start hasn't been sent and wait for the first actual change + * for this stream to be sent and then send stream start. This is done + * to avoid sending empty streams without any changes. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData)); + txndata->sent_begin_txn = false; + txn->output_plugin_private = txndata; + } + + txndata->sent_stream_start = false; + in_streaming = true; +} + +static void +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* * If we already sent the first stream for this transaction then don't * send the origin id in the subsequent streams. */ @@ -1096,8 +1147,12 @@ pgoutput_stream_start(struct LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); - /* we're streaming a chunk of transaction now */ - in_streaming = true; + /* + * Set the flags that indicate that changes were sent as part of + * the transaction and the stream. + */ + txndata->sent_begin_txn = txndata->sent_stream_start = true; + } /* @@ -1107,9 +1162,18 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = txn->output_plugin_private; + /* we should be streaming a trasanction */ Assert(in_streaming); + if (!data->sent_stream_start) + { + in_streaming = false; + elog(DEBUG1, "Skipping replication of an empty transaction in stream stop"); + return; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); @@ -1128,6 +1192,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn) { ReorderBufferTXN *toptxn; + PGOutputTxnData *txndata; + bool sent_begin_txn; /* * The abort should happen outside streaming block, even for streamed @@ -1137,6 +1203,21 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, /* determine the toplevel transaction */ toptxn = (txn->toptxn) ? txn->toptxn : txn; + txndata = toptxn->output_plugin_private; + sent_begin_txn = txndata->sent_begin_txn; + + if (txn->toptxn == NULL) + { + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream abort"); + return; + } + Assert(rbtxn_is_streamed(toptxn)); @@ -1156,6 +1237,9 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. @@ -1163,6 +1247,16 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + /* If no changes were part of this transaction then drop the commit */ + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream commit"); + return; + } + OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); @@ -1182,8 +1276,20 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare"); + return; + } + OutputPluginUpdateProgress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); -- 1.8.3.1