From bf252c75fb5a5467d85c7c6356fa59d2be8a8424 Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Thu, 3 Mar 2022 04:04:01 -0500 Subject: [PATCH v23 2/2] Skip empty streamed transactions for logical replication. This patch postpones the START STREAM message while streaming large in-progress transactions until the first change. While processing the STOP STREAM message, if there was no other change for that transaction, do not send the STOP STREAM message. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- src/backend/replication/pgoutput/pgoutput.c | 189 ++++++++++++++++++++++++---- 1 file changed, 167 insertions(+), 22 deletions(-) diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index f071beb..84aa9c8 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -67,6 +67,8 @@ static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, TimestampTz prepare_time); static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); +static void pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, @@ -169,12 +171,15 @@ typedef struct RelationSyncEntry /* * Maintain a per-transaction level variable to track whether the * transaction has sent BEGIN. BEGIN is only sent when the first - * change in a transaction is processed. This makes it possible - * to skip transactions that are empty. + * change in a transaction is processed. Similarly while streaming + * transactions, STREAM_START is only sent with the first change. + * This makes it possible to skip transactions that are empty. */ typedef struct PGOutputTxnData { bool sent_begin_txn; /* flag indicating whether BEGIN has been sent */ + bool sent_stream_start; /* flag indicating if stream start has been sent */ + bool sent_first_stream; /* flag indicating if any stream has been sent */ } PGOutputTxnData; /* Map used to remember which relation schemas we sent. */ @@ -661,9 +666,18 @@ maybe_send_schema(LogicalDecodingContext *ctx, /* set up txndata */ txndata = toptxn->output_plugin_private; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, toptxn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx, toptxn); + } /* * Send the schema. If the changes will be published using an ancestor's @@ -1288,9 +1302,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, &action)) break; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } /* * Schema should be sent using the original relation because it @@ -1342,9 +1365,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } maybe_send_schema(ctx, change, relation, relentry); @@ -1404,9 +1436,18 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) - pgoutput_send_begin(ctx, txn); + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } maybe_send_schema(ctx, change, relation, relentry); @@ -1484,9 +1525,18 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { txndata = (PGOutputTxnData *) txn->output_plugin_private; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx, txn); + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, @@ -1528,9 +1578,18 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, { PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; - /* Send BEGIN if we haven't yet */ - if (txndata && !txndata->sent_begin_txn) + if (in_streaming) + { + /* If streaming, send STREAM START if we haven't yet */ + if (txndata && !txndata->sent_stream_start) + pgoutput_send_stream_start(ctx, txn); + } + else + { + /* If not streaming, send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) pgoutput_send_begin(ctx, txn); + } } OutputPluginPrepareWrite(ctx, true); @@ -1615,28 +1674,60 @@ static void pgoutput_stream_start(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { - bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = txn->output_plugin_private; /* we can't nest streaming of transactions */ Assert(!in_streaming); /* + * Don't actually send stream start here, instead set a flag that indicates + * that stream start hasn't been sent and wait for the first actual change + * for this stream to be sent and then send stream start. This is done + * to avoid sending empty streams without any changes. + */ + if (txndata == NULL) + { + txndata = + MemoryContextAllocZero(ctx->context, sizeof(PGOutputTxnData)); + txn->output_plugin_private = txndata; + } + + txndata->sent_stream_start = false; + in_streaming = true; +} + +/* + * Actually send START STREAM + */ +static void +pgoutput_send_stream_start(struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn) +{ + bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + + /* * If we already sent the first stream for this transaction then don't * send the origin id in the subsequent streams. */ - if (rbtxn_is_streamed(txn)) + if (txndata->sent_first_stream) send_replication_origin = false; OutputPluginPrepareWrite(ctx, !send_replication_origin); - logicalrep_write_stream_start(ctx->out, txn->xid, !rbtxn_is_streamed(txn)); + logicalrep_write_stream_start(ctx->out, txn->xid, !txndata->sent_first_stream); send_repl_origin(ctx, txn->origin_id, InvalidXLogRecPtr, send_replication_origin); OutputPluginWrite(ctx, true); - /* we're streaming a chunk of transaction now */ - in_streaming = true; + /* + * Set the flags that indicate that changes were sent as part of + * the transaction and the stream. + */ + txndata->sent_begin_txn = txndata->sent_stream_start = true; + txndata->sent_first_stream = true; } /* @@ -1646,9 +1737,18 @@ static void pgoutput_stream_stop(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = txn->output_plugin_private; + /* we should be streaming a trasanction */ Assert(in_streaming); + if (!data->sent_stream_start) + { + in_streaming = false; + elog(DEBUG1, "Skipping replication of an empty transaction in stream stop"); + return; + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_stop(ctx->out); OutputPluginWrite(ctx, true); @@ -1667,6 +1767,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn) { ReorderBufferTXN *toptxn; + PGOutputTxnData *txndata; + bool sent_first_stream; /* * The abort should happen outside streaming block, even for streamed @@ -1676,6 +1778,20 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx, /* determine the toplevel transaction */ toptxn = (txn->toptxn) ? txn->toptxn : txn; + txndata = toptxn->output_plugin_private; + sent_first_stream = txndata->sent_first_stream; + + if (txn->toptxn == NULL) + { + pfree(txndata); + txn->output_plugin_private = NULL; + } + + if (!sent_first_stream) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream abort"); + return; + } Assert(rbtxn_is_streamed(toptxn)); @@ -1695,6 +1811,9 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_first_stream = txndata->sent_first_stream; + /* * The commit should happen outside streaming block, even for streamed * transactions. The transaction has to be marked as streamed, though. @@ -1702,6 +1821,20 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + /* + * If no changes were part of this transaction then drop the commit + * but send the update progress. + */ + if (!sent_first_stream) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream commit"); + OutputPluginUpdateProgress(ctx, true); + return; + } + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); @@ -1721,8 +1854,20 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *txndata = txn->output_plugin_private; + bool sent_begin_txn = txndata->sent_begin_txn; + Assert(rbtxn_is_streamed(txn)); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "Skipping replication of an empty transaction in stream prepare"); + return; + } + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); -- 1.8.3.1