From d8a3de33a634cc2388ac91132bbf2704bba73c11 Mon Sep 17 00:00:00 2001 From: Vladimir Gordiychuk Date: Wed, 7 Sep 2016 00:39:18 +0300 Subject: [PATCH 2/2] Respect client-initiated CopyDone during transaction decoding in walsender The prior patch caused the walsender to react to a client-initiated CopyDone while it's in the WalSndLoop. That's not enough to let clients end logical decoding mid-transaction because we loop in ReorderBufferCommit during decoding of a transaction without ever returning to WalSndLoop. Allow breaking out of ReorderBufferCommit by detecting that client input has requested an end to COPY BOTH mode, so clients can abort decoding mid-transaction. --- src/backend/replication/logical/logical.c | 15 ++++++---- src/backend/replication/logical/logicalfuncs.c | 3 +- src/backend/replication/logical/reorderbuffer.c | 16 ++++++++-- src/backend/replication/slotfuncs.c | 2 +- src/backend/replication/walsender.c | 39 ++++++++++++++++++------- src/include/replication/logical.h | 6 ++-- src/include/replication/reorderbuffer.h | 14 +++++++++ 7 files changed, 72 insertions(+), 23 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1512be5..4065899 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -116,7 +116,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + ContinueDecodingCB continue_decoding_cb) { ReplicationSlot *slot; MemoryContext context, @@ -180,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_change = change_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->continue_decoding_cb = continue_decoding_cb; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -212,7 +214,8 @@ CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + ContinueDecodingCB continue_decoding_cb) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -288,7 +291,7 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, continue_decoding_cb); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -328,7 +331,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write) + LogicalOutputPluginWriterWrite do_write, + ContinueDecodingCB continue_decoding_cb) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -378,7 +382,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, - read_page, prepare_write, do_write); + read_page, prepare_write, do_write, + continue_decoding_cb); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 9c7be2d..767d6ce 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -249,7 +249,8 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin options, logical_read_local_xlog_page, LogicalOutputPrepareWrite, - LogicalOutputWrite); + LogicalOutputWrite, + NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 9b430b9..dc3c88f 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1341,6 +1341,11 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) * cache invalidations. Thus, once a toplevel commit is read, we iterate over * the top and subtransactions (using a k-way merge) and replay the changes in * lsn order. + * + * ReorderBufferCommit processes all changes in a transaction and feeds them to + * output plugin callbacks. It does not return until the full transaction has + * been consumed or continue_decoding_cb() returns false to indicate an early + * abort request. */ void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, @@ -1417,7 +1422,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && + (rb->continue_decoding_cb == NULL || + rb->continue_decoding_cb())) { Relation relation = NULL; Oid reloid; @@ -1643,8 +1650,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb()) + { + /* call commit callback unless aborting early */ + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index f908761..976bc0c 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -127,7 +127,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) */ ctx = CreateInitDecodingContext( NameStr(*plugin), NIL, - logical_read_local_xlog_page, NULL, NULL); + logical_read_local_xlog_page, NULL, NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index ad94c13..91c5b41 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -218,6 +218,23 @@ static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +/* + * Return true until either the client or server side have requested that we wind + * up COPY BOTH mode by sending a CopyDone. + * + * If we receive a CopyDone from the client we should avoid sending any further + * CopyData messages and return to command mode as promptly as possible. + * + * While in the middle of sending data to a client we notice a client-initated + * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it + * sets streamingDoneSending. + */ +static +bool IsStreamingActive(void) +{ + return !streamingDoneReceiving && !streamingDoneSending; +} + /* Initialize walsender process before entering the main command loop */ void @@ -823,7 +840,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, + IsStreamingActive); /* * Signal that we don't need the timeout mechanism. We're just @@ -1002,7 +1020,7 @@ StartLogicalReplication(StartReplicationCmd *cmd) logical_decoding_ctx = CreateDecodingContext( cmd->startpoint, cmd->options, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, IsStreamingActive); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1093,14 +1111,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)], tmpbuf.data, sizeof(int64)); - /* fast path */ - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); - - if (!pq_is_send_pending()) - return; - for (;;) { int wakeEvents; @@ -1235,7 +1245,14 @@ WalSndWaitForWal(XLogRecPtr loc) break; /* - * We only send regular messages to the client for full decoded + * If we have received CopyDone from the client, sent CopyDone + * ourselves, it's time to exit streaming. + */ + if (!IsStreamingActive()) { + break; + } + + /* We only send regular messages to the client for full decoded * transactions, but a synchronous replication and walsender shutdown * possibly are waiting for a later location. So we send pings * containing the flush location every now and then. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 947000e..9fc4098 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -81,13 +81,15 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + ContinueDecodingCB continue_decoding_cb); extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, - LogicalOutputPluginWriterWrite do_write); + LogicalOutputPluginWriterWrite do_write, + ContinueDecodingCB continue_decoding_cb); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 9e209ae..f5d9f5d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -292,6 +292,12 @@ typedef void (*ReorderBufferMessageCB) ( const char *prefix, Size sz, const char *message); +/* + * Callback function that allows streaming of a change to be interrupted + * part-way through, between output plugin calls. + */ +typedef bool (*ContinueDecodingCB) (void); + struct ReorderBuffer { /* @@ -321,6 +327,14 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callback to control whether reorder buffer processing should end before + * normal completion. Normally true; if it returns false, reorder buffer + * processing (e.g. in ReorderBufferCommit) will bail out as soon as + * possible. + */ + ContinueDecodingCB continue_decoding_cb; + + /* * Pointer that will be passed untouched to the callbacks. */ void *private_data; -- 2.5.5