From 3fb6ae7c4aeae53fae8bf9304878f144e39cb6cf Mon Sep 17 00:00:00 2001 From: Ajin Cherian Date: Mon, 28 Sep 2020 03:08:30 -0400 Subject: [PATCH v6] Support two phase commits in streaming mode in logical decoding Add APIs to the streaming APIS for PREPARE, COMMIT PREPARED and ROLLBACK PREPARED --- contrib/test_decoding/test_decoding.c | 84 ++++++++++++++++++ doc/src/sgml/logicaldecoding.sgml | 56 +++++++++++- src/backend/replication/logical/logical.c | 111 ++++++++++++++++++++++++ src/backend/replication/logical/reorderbuffer.c | 41 +++++++-- src/include/replication/output_plugin.h | 30 +++++++ src/include/replication/reorderbuffer.h | 21 +++++ 6 files changed, 331 insertions(+), 12 deletions(-) diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 1bb17a6..bb9f787 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -78,6 +78,15 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_stream_commit_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); @@ -130,6 +139,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_start_cb = pg_decode_stream_start; cb->stream_stop_cb = pg_decode_stream_stop; cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_prepare_cb = pg_decode_stream_prepare; + cb->stream_commit_prepared_cb = pg_decode_stream_commit_prepared; + cb->stream_abort_prepared_cb = pg_decode_stream_abort_prepared; cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; @@ -812,6 +824,78 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, } static void +pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "preparing streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "preparing streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_commit_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "commit prepared streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "commit prepared streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void +pg_decode_stream_abort_prepared(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->skip_empty_xacts && !data->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "abort prepared streamed transaction TXN %u", txn->xid); + else + appendStringInfo(ctx->out, "abort prepared streamed transaction"); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index bd4542e..a8be9bf 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -396,6 +396,9 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; + LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; @@ -418,7 +421,9 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb); in-progress transactions. The stream_start_cb, stream_stop_cb, stream_abort_cb, stream_commit_cb and stream_change_cb - are required, while stream_message_cb and + are required, while stream_message_cb, + stream_prepare_cb, stream_commit_prepared_cb, + stream_abort_prepared_cb, stream_truncate_cb are optional. @@ -839,6 +844,45 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, + + Stream Prepare Callback + + The stream_prepare_cb callback is called to prepare + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Stream Commit Prepared Callback + + The stream_commit_prepared_cb callback is called to commit prepared + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + + + Stream Abort Prepared Callback + + The stream_abort_prepared_cb callback is called to abort prepared + a previously streamed transaction as part of a two phase commit. + +typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr abort_lsn); + + + + Stream Commit Callback @@ -1017,9 +1061,13 @@ OutputPluginWrite(ctx, true); When streaming an in-progress transaction, the changes (and messages) are streamed in blocks demarcated by stream_start_cb and stream_stop_cb callbacks. Once all the decoded - changes are transmitted, the transaction is committed using the - stream_commit_cb callback (or possibly aborted using - the stream_abort_cb callback). + changes are transmitted, the transaction can be committed using the + the stream_commit_cb callback + (or possibly aborted using the stream_abort_cb callback). + If two phase commits are supported, the transaction can be prepared using the + stream_prepare_cb callback, commit prepared using the + stream_commit_prepared_cb callback or aborted using the + stream_abort_prepared_cb diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 4e95337..47968cb 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -81,6 +81,12 @@ static void stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr last_lsn); static void stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, @@ -232,6 +238,9 @@ StartupDecodingContext(List *output_plugin_options, ctx->streaming = (ctx->callbacks.stream_start_cb != NULL) || (ctx->callbacks.stream_stop_cb != NULL) || (ctx->callbacks.stream_abort_cb != NULL) || + (ctx->callbacks.stream_prepare_cb != NULL) || + (ctx->callbacks.stream_commit_prepared_cb != NULL) || + (ctx->callbacks.stream_abort_prepared_cb != NULL) || (ctx->callbacks.stream_commit_cb != NULL) || (ctx->callbacks.stream_change_cb != NULL) || (ctx->callbacks.stream_message_cb != NULL) || @@ -261,6 +270,9 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->stream_start = stream_start_cb_wrapper; ctx->reorder->stream_stop = stream_stop_cb_wrapper; ctx->reorder->stream_abort = stream_abort_cb_wrapper; + ctx->reorder->stream_prepare = stream_prepare_cb_wrapper; + ctx->reorder->stream_commit_prepared = stream_commit_prepared_cb_wrapper; + ctx->reorder->stream_abort_prepared = stream_abort_prepared_cb_wrapper; ctx->reorder->stream_commit = stream_commit_cb_wrapper; ctx->reorder->stream_change = stream_change_cb_wrapper; ctx->reorder->stream_message = stream_message_cb_wrapper; @@ -1231,6 +1243,105 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, } static void +stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_prepare"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + ctx->callbacks.stream_prepare_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_commit_prepared"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + ctx->callbacks.stream_commit_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void +stream_abort_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + /* We're only supposed to call this when streaming is supported. */ + Assert(ctx->streaming); + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "stream_abort_prepared"; + state.report_location = txn->final_lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + ctx->write_location = txn->end_lsn; + + ctx->callbacks.stream_abort_prepared_cb(ctx, txn, commit_lsn); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + +static void stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 5ff920b..e124c35 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1834,9 +1834,18 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) ReorderBufferStreamTXN(rb, txn); - rb->stream_commit(rb, txn, txn->final_lsn); - - ReorderBufferCleanupTXN(rb, txn); + if (rbtxn_prepared(txn)) + { + rb->stream_prepare(rb, txn, txn->final_lsn); + ReorderBufferTruncateTXN(rb, txn, true); + /* Reset the CheckXidAlive */ + CheckXidAlive = InvalidTransactionId; + } + else + { + rb->stream_commit(rb, txn, txn->final_lsn); + ReorderBufferCleanupTXN(rb, txn); + } } /* @@ -2672,15 +2681,31 @@ ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, txn->gid = palloc(strlen(gid) + 1); /* trailing '\0' */ strcpy(txn->gid, gid); - if (is_commit) + if (rbtxn_is_streamed(txn)) { - txn->txn_flags |= RBTXN_COMMIT_PREPARED; - rb->commit_prepared(rb, txn, commit_lsn); + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->stream_commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->stream_abort_prepared(rb, txn, commit_lsn); + } } else { - txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; - rb->abort_prepared(rb, txn, commit_lsn); + if (is_commit) + { + txn->txn_flags |= RBTXN_COMMIT_PREPARED; + rb->commit_prepared(rb, txn, commit_lsn); + } + else + { + txn->txn_flags |= RBTXN_ROLLBACK_PREPARED; + rb->abort_prepared(rb, txn, commit_lsn); + } } /* cleanup: make sure there's no cache pollution */ diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h index 96e269b..6000096 100644 --- a/src/include/replication/output_plugin.h +++ b/src/include/replication/output_plugin.h @@ -157,6 +157,33 @@ typedef void (*LogicalDecodeStreamAbortCB) (struct LogicalDecodingContext *ctx, XLogRecPtr abort_lsn); /* + * Called to prepare changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit and only when + * two-phased commits are supported + */ +typedef void (*LogicalDecodeStreamPrepareCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called to commit prepared changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit and only when + * two-phased commits are supported + */ +typedef void (*LogicalDecodeStreamCommitPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* + * Called to abort/rollback prepared changes streamed to remote node from in-progress + * transaction. This is called as part of a two-phase commit and only when + * two-phased commits are supported + */ +typedef void (*LogicalDecodeStreamAbortPreparedCB) (struct LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* * Called to apply changes streamed to remote node from in-progress * transaction. */ @@ -214,6 +241,9 @@ typedef struct OutputPluginCallbacks LogicalDecodeStreamStartCB stream_start_cb; LogicalDecodeStreamStopCB stream_stop_cb; LogicalDecodeStreamAbortCB stream_abort_cb; + LogicalDecodeStreamPrepareCB stream_prepare_cb; + LogicalDecodeStreamCommitPreparedCB stream_commit_prepared_cb; + LogicalDecodeStreamAbortPreparedCB stream_abort_prepared_cb; LogicalDecodeStreamCommitCB stream_commit_cb; LogicalDecodeStreamChangeCB stream_change_cb; LogicalDecodeStreamMessageCB stream_message_cb; diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 4d4e35d..a4dc509 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -466,6 +466,24 @@ typedef void (*ReorderBufferStreamAbortCB) ( ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamPrepareCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamCommitPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + +/* prepare streamed transaction callback signature */ +typedef void (*ReorderBufferStreamAbortPreparedCB) ( + ReorderBuffer *rb, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); + /* commit streamed transaction callback signature */ typedef void (*ReorderBufferStreamCommitCB) ( ReorderBuffer *rb, @@ -545,6 +563,9 @@ struct ReorderBuffer ReorderBufferStreamStartCB stream_start; ReorderBufferStreamStopCB stream_stop; ReorderBufferStreamAbortCB stream_abort; + ReorderBufferStreamPrepareCB stream_prepare; + ReorderBufferStreamCommitPreparedCB stream_commit_prepared; + ReorderBufferStreamAbortPreparedCB stream_abort_prepared; ReorderBufferStreamCommitCB stream_commit; ReorderBufferStreamChangeCB stream_change; ReorderBufferStreamMessageCB stream_message; -- 1.8.3.1