From 163c9e5d7a6be6655c079a8151378d7bbb2e4a3b Mon Sep 17 00:00:00 2001 From: Peter Smith Date: Wed, 2 Jun 2021 08:49:50 +1000 Subject: [PATCH v82] Skip empty transactions for logical replication. The current logical replication behavior is to send every transaction to subscriber even though the transaction is empty (because it does not contain changes from the selected publications). It is a waste of CPU cycles and network bandwidth to build/transmit these empty transactions. This patch addresses the above problem by postponing the BEGIN / BEGIN PREPARE message until the first change. While processing a COMMIT message or a PREPARE message, if there is no other change for that transaction, do not send COMMIT message or PREPARE message. It means that pgoutput will skip BEGIN / COMMIT or BEGIN PREPARE / PREPARE messages for transactions that are empty. Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com --- contrib/test_decoding/test_decoding.c | 7 +- doc/src/sgml/logicaldecoding.sgml | 12 +- doc/src/sgml/protocol.sgml | 15 +++ src/backend/replication/logical/logical.c | 9 +- src/backend/replication/logical/proto.c | 16 ++- src/backend/replication/logical/reorderbuffer.c | 2 +- src/backend/replication/logical/worker.c | 36 ++++-- src/backend/replication/pgoutput/pgoutput.c | 141 +++++++++++++++++++++++- src/include/replication/logicalproto.h | 8 +- src/include/replication/output_plugin.h | 4 +- src/include/replication/reorderbuffer.h | 4 +- src/test/subscription/t/020_messages.pl | 5 +- src/test/subscription/t/021_twophase.pl | 41 ++++++- src/tools/pgindent/typedefs.list | 1 + 14 files changed, 266 insertions(+), 35 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e5cd84e..408dbfc 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -86,7 +86,9 @@ static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_lsn); static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -390,7 +392,8 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* COMMIT PREPARED callback */ static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { TestDecodingData *data = ctx->output_plugin_private; diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index d2c6e15..940f80c 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -865,11 +865,19 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, The required commit_prepared_cb callback is called whenever a transaction COMMIT PREPARED has been decoded. The gid field, which is part of the - txn parameter, can be used in this callback. + txn parameter, can be used in this callback. The + parameters prepare_end_lsn and + prepare_time can be used to check if the plugin + has received this PREPARE TRANSACTION in which case + it can commit the transaction, otherwise, it can skip the commit. The + gid alone is not sufficient because the downstream + node can have a prepared transaction with the same identifier. typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 5a38433..0add083 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -7538,6 +7538,13 @@ are available since protocol version 3. Int64 + The end LSN of the prepare. + + + + +Int64 + The LSN of the commit prepared. @@ -7552,6 +7559,14 @@ are available since protocol version 3. Int64 + Prepare timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). + + + + +Int64 + Commit timestamp of the transaction. The value is in number of microseconds since PostgreSQL epoch (2000-01-01). diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index c387997..ed60719 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -63,7 +63,8 @@ static void begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn static void prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -940,7 +941,8 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, static void commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { LogicalDecodingContext *ctx = cache->private_data; LogicalErrorCallbackState state; @@ -975,7 +977,8 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, errmsg("logical replication at prepare time requires commit_prepared_cb callback"))); /* do the actual work: call callback */ - ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn); + ctx->callbacks.commit_prepared_cb(ctx, txn, commit_lsn, prepare_end_lsn, + prepare_time); /* Pop the error context stack */ error_context_stack = errcallback.previous; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 8e03006..4653d6d 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -206,7 +206,9 @@ logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) */ void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { uint8 flags = 0; @@ -222,8 +224,10 @@ logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, pq_sendbyte(out, flags); /* send fields */ + pq_sendint64(out, prepare_end_lsn); pq_sendint64(out, commit_lsn); pq_sendint64(out, txn->end_lsn); + pq_sendint64(out, prepare_time); pq_sendint64(out, txn->xact_time.commit_time); pq_sendint32(out, txn->xid); @@ -244,12 +248,16 @@ logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData * elog(ERROR, "unrecognized flags %u in commit prepared message", flags); /* read fields */ + prepare_data->prepare_end_lsn = pq_getmsgint64(in); + if (prepare_data->prepare_end_lsn == InvalidXLogRecPtr) + elog(ERROR,"prepare_end_lsn is not set in commit prepared message"); prepare_data->commit_lsn = pq_getmsgint64(in); if (prepare_data->commit_lsn == InvalidXLogRecPtr) elog(ERROR, "commit_lsn is not set in commit prepared message"); - prepare_data->end_lsn = pq_getmsgint64(in); - if (prepare_data->end_lsn == InvalidXLogRecPtr) - elog(ERROR, "end_lsn is not set in commit prepared message"); + prepare_data->commit_end_lsn = pq_getmsgint64(in); + if (prepare_data->commit_end_lsn == InvalidXLogRecPtr) + elog(ERROR, "commit_end_lsn is not set in commit prepared message"); + prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->commit_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index da0e5e8..282da49 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -2770,7 +2770,7 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, txn->origin_lsn = origin_lsn; if (is_commit) - rb->commit_prepared(rb, txn, commit_lsn); + rb->commit_prepared(rb, txn, commit_lsn, prepare_end_lsn, prepare_time); else rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 40b00c9..f7db5ef 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -971,26 +971,38 @@ apply_handle_commit_prepared(StringInfo s) /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, gid, sizeof(gid)); - - /* there is no transaction when COMMIT PREPARED is called */ - ensure_transaction(); - /* - * Update origin state so we can restart streaming from correct position - * in case of crash. + * It is possible that we haven't received the prepare because + * the transaction did not have any changes relevant to this + * subscription and was essentially an empty prepare. In which case, + * the walsender is optimized to drop the empty transaction and the + * accompanying prepare. Silently ignore if we don't find the prepared + * transaction. */ - replorigin_session_origin_lsn = prepare_data.end_lsn; - replorigin_session_origin_timestamp = prepare_data.commit_time; + if (LookupGXact(gid, prepare_data.prepare_end_lsn, + prepare_data.prepare_time)) + { - FinishPreparedTransaction(gid, true); - CommitTransactionCommand(); + /* there is no transaction when COMMIT PREPARED is called */ + ensure_transaction(); + + /* + * Update origin state so we can restart streaming from correct position + * in case of crash. + */ + replorigin_session_origin_lsn = prepare_data.commit_end_lsn; + replorigin_session_origin_timestamp = prepare_data.commit_time; + + FinishPreparedTransaction(gid, true); + CommitTransactionCommand(); + } pgstat_report_stat(false); - store_flush_position(prepare_data.end_lsn); + store_flush_position(prepare_data.commit_end_lsn); in_remote_transaction = false; /* Process any tables that are being synchronized in parallel. */ - process_syncing_tables(prepare_data.end_lsn); + process_syncing_tables(prepare_data.commit_end_lsn); pgstat_report_activity(STATE_IDLE, NULL); } diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 7c3a33d..84e9cfe 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -56,7 +56,9 @@ static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn, XLogRecPtr commit_lsn); + ReorderBufferTXN *txn, XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_end_lsn, @@ -135,6 +137,11 @@ typedef struct RelationSyncEntry TupleConversionMap *map; } RelationSyncEntry; +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether begin has been sent */ +} PGOutputTxnData; + /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; @@ -404,10 +411,32 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, static void pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + PGOutputTxnData *data = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + /* + * Don't send BEGIN message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. + */ + data->sent_begin_txn = false; + txn->output_plugin_private = data; +} + + +static void +pgoutput_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + Assert(data); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -422,8 +451,18 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + bool skip; + + Assert(data); + skip = !data->sent_begin_txn; + pfree(data); OutputPluginUpdateProgress(ctx); + /* skip COMMIT message if nothing was sent */ + if (skip) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); OutputPluginWrite(ctx, true); @@ -435,10 +474,28 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { + /* + * Don't send BEGIN PREPARE message here. Instead, postpone it until the first + * change. In logical replication, a common scenario is to replicate a set + * of tables (instead of all tables) and transactions whose changes were on + * table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN PREPARE and COMMIT PREPARED messages + * to subscribers, using bandwidth on something with little/no use + * for logical replication. + */ + pgoutput_begin_txn(ctx, txn); +} + +static void +pgoutput_begin_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + Assert(data); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin_prepare(ctx->out, txn); + data->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -453,8 +510,15 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(data); OutputPluginUpdateProgress(ctx); + /* skip PREPARE message if nothing was sent */ + if (!data->sent_begin_txn) + return; + OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -465,12 +529,28 @@ pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, */ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn) + XLogRecPtr commit_lsn, XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending COMMIT PREPARED message if prepared transaction + * has not been sent. + */ + if (data) + { + bool skip = !data->sent_begin_txn; + pfree(data); + if (skip) + return; + } + OutputPluginPrepareWrite(ctx, true); - logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); + logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn, prepare_end_lsn, + prepare_time); OutputPluginWrite(ctx, true); } @@ -483,8 +563,21 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { + PGOutputTxnData *data = (PGOutputTxnData *) txn->output_plugin_private; + OutputPluginUpdateProgress(ctx); + /* + * skip sending ROLLBACK PREPARED message if prepared transaction + * has not been sent. + */ + if (data) + { + bool skip = !data->sent_begin_txn; + pfree(data); + if (skip) + return; + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, prepare_time); @@ -613,11 +706,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ + if (!in_streaming) + Assert(txndata); + if (!is_publishable_relation(relation)) return; @@ -651,6 +749,15 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Assert(false); } + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + /* Avoid leaking memory by using and resetting our own context */ old = MemoryContextSwitchTo(data->context); @@ -750,6 +857,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -757,6 +865,10 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; + /* If not streaming, should have setup txndata as part of BEGIN/BEGIN PREPARE */ + if (!in_streaming) + Assert(txndata); + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -793,6 +905,15 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (nrelids > 0) { + /* output BEGIN if we haven't yet */ + if (!in_streaming && !txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_truncate(ctx->out, xid, @@ -813,6 +934,7 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, const char *message) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata; TransactionId xid = InvalidTransactionId; if (!data->messages) @@ -825,6 +947,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* output BEGIN if we haven't yet, avoid for streaming and non-transactional messages */ + if (!in_streaming && transactional) + { + txndata = (PGOutputTxnData *) txn->output_plugin_private; + if (!txndata->sent_begin_txn) + { + if (rbtxn_prepared(txn)) + pgoutput_begin_prepare(ctx, txn); + else + pgoutput_begin(ctx, txn); + } + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 9b3e934..a6d9977 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -150,8 +150,10 @@ typedef struct LogicalRepPreparedTxnData */ typedef struct LogicalRepCommitPreparedTxnData { + XLogRecPtr prepare_end_lsn; XLogRecPtr commit_lsn; - XLogRecPtr end_lsn; + XLogRecPtr commit_end_lsn; + TimestampTz prepare_time; TimestampTz commit_time; TransactionId xid; char gid[GIDSIZE]; @@ -190,7 +192,9 @@ extern void logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, extern void logicalrep_read_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data); extern void logicalrep_write_commit_prepared(StringInfo out, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); extern void logicalrep_read_commit_prepared(StringInfo in, LogicalRepCommitPreparedTxnData *prepare_data); extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN *txn, diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 810495e..0d28306 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -128,7 +128,9 @@ typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx, */ typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* * Called for ROLLBACK PREPARED. diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 109000d..7cf4499 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -441,7 +441,9 @@ typedef void (*ReorderBufferPrepareCB) (ReorderBuffer *rb, /* commit prepared callback signature */ typedef void (*ReorderBufferCommitPreparedCB) (ReorderBuffer *rb, ReorderBufferTXN *txn, - XLogRecPtr commit_lsn); + XLogRecPtr commit_lsn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); /* rollback prepared callback signature */ typedef void (*ReorderBufferRollbackPreparedCB) (ReorderBuffer *rb, diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl index 52bd92d..2b43ae0 100644 --- a/src/test/subscription/t/020_messages.pl +++ b/src/test/subscription/t/020_messages.pl @@ -86,9 +86,8 @@ $result = $node_publisher->safe_psql( 'publication_names', 'tap_pub') )); -# 66 67 == B C == BEGIN COMMIT -is( $result, qq(66 -67), +# no message and no BEGIN and COMMIT because of empty transaction optimization +is($result, qq(), 'option messages defaults to false so message (M) is not available on slot' ); diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl index 90430f4..3428c6d 100644 --- a/src/test/subscription/t/021_twophase.pl +++ b/src/test/subscription/t/021_twophase.pl @@ -3,7 +3,7 @@ use strict; use warnings; use PostgresNode; use TestLib; -use Test::More tests => 19; +use Test::More tests => 20; ############################### # Setup @@ -277,6 +277,45 @@ $node_publisher->wait_for_catchup($appname); $result = $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM pg_prepared_xacts;"); is($result, qq(0), 'transaction is aborted on subscriber'); +############################## +# Test empty prepares +############################## + +# create a table that is not part of the publication +$node_publisher->safe_psql('postgres', + "CREATE TABLE tab_nopub (a int PRIMARY KEY)"); + +# disable the subscription so that we can peek at the slot +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE"); + +# wait for the replication slot to become inactive in the publisher +$node_publisher->poll_query_until('postgres', + "SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'tap_sub' AND active='f'", 1); + +# create a transaction with no changes relevant to the slot. +$node_publisher->safe_psql('postgres', " + BEGIN; + INSERT INTO tab_nopub SELECT generate_series(1,10); + PREPARE TRANSACTION 'empty_transaction'; + COMMIT PREPARED 'empty_transaction';"); + +# peek at the contents of the slot +$result = $node_publisher->safe_psql( + 'postgres', qq( + SELECT get_byte(data, 0) + FROM pg_logical_slot_get_binary_changes('tap_sub', NULL, NULL, + 'proto_version', '1', + 'publication_names', 'tap_pub') +)); + +# the empty tranaction should be skipped +is($result, qq(), + 'empty transaction dropped on slot' +); + +# enable the subscription to test cleanup +$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE"); + ############################### # check all the cleanup ############################### diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 2cfc1ae..f0941ad 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -1596,6 +1596,7 @@ PGMessageField PGModuleMagicFunction PGNoticeHooks PGOutputData +PGOutputTxnData PGPROC PGP_CFB PGP_Context -- 1.8.3.1