From 6d784e7e678268af1eadb7b50f96308e5bd3d15c Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Fri, 30 Nov 2018 18:36:58 -0500 Subject: [PATCH 2/5] Client-initiated CopyDone during transaction decoding in walsender --- src/backend/replication/logical/logical.c | 14 ++++++--- src/backend/replication/logical/logicalfuncs.c | 2 +- src/backend/replication/logical/reorderbuffer.c | 11 +++++-- src/backend/replication/slotfuncs.c | 4 +-- src/backend/replication/walsender.c | 42 ++++++++++++++++--------- src/include/replication/logical.h | 6 ++-- src/include/replication/reorderbuffer.h | 13 ++++++++ 7 files changed, 65 insertions(+), 27 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 9f99e4f..9ce1f6e 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -126,7 +126,8 @@ StartupDecodingContext(List *output_plugin_options, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + ContinueDecodingCB continue_decoding_cb) { ReplicationSlot *slot; MemoryContext context, @@ -193,6 +194,7 @@ StartupDecodingContext(List *output_plugin_options, ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; + ctx->reorder->continue_decoding_cb = continue_decoding_cb; ctx->out = makeStringInfo(); ctx->prepare_write = prepare_write; @@ -231,7 +233,8 @@ CreateInitDecodingContext(char *plugin, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + ContinueDecodingCB continue_decoding_cb) { TransactionId xmin_horizon = InvalidTransactionId; ReplicationSlot *slot; @@ -319,7 +322,7 @@ CreateInitDecodingContext(char *plugin, ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, need_full_snapshot, false, read_page, prepare_write, do_write, - update_progress); + update_progress, continue_decoding_cb); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -366,7 +369,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress) + LogicalOutputPluginWriterUpdateProgress update_progress, + ContinueDecodingCB continue_decoding_cb) { LogicalDecodingContext *ctx; ReplicationSlot *slot; @@ -417,7 +421,7 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, fast_forward, read_page, prepare_write, - do_write, update_progress); + do_write, update_progress, continue_decoding_cb); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c index 45aae71..3d8e61d 100644 --- a/src/backend/replication/logical/logicalfuncs.c +++ b/src/backend/replication/logical/logicalfuncs.c @@ -254,7 +254,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin false, logical_read_local_xlog_page, LogicalOutputPrepareWrite, - LogicalOutputWrite, NULL); + LogicalOutputWrite, NULL, NULL); MemoryContextSwitchTo(oldcontext); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 23466ba..66b6e90 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1497,7 +1497,9 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, rb->begin(rb, txn); iterstate = ReorderBufferIterTXNInit(rb, txn); - while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) + while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL && + (rb->continue_decoding_cb == NULL || + rb->continue_decoding_cb())) { Relation relation = NULL; Oid reloid; @@ -1774,8 +1776,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, ReorderBufferIterTXNFinish(rb, iterstate); iterstate = NULL; - /* call commit callback */ - rb->commit(rb, txn, commit_lsn); + if (rb->continue_decoding_cb == NULL || rb->continue_decoding_cb()) + { + /* call commit callback */ + rb->commit(rb, txn, commit_lsn); + } /* this is just a sanity check against bad output plugin behaviour */ if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8782bad..82e8f37 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -137,7 +137,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, false, /* do not build snapshot */ logical_read_local_xlog_page, NULL, NULL, - NULL); + NULL, NULL); /* build initial snapshot, might take a while */ DecodingContextFindStartpoint(ctx); @@ -370,7 +370,7 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto) NIL, true, /* fast_forward */ logical_read_local_xlog_page, - NULL, NULL, NULL); + NULL, NULL, NULL, NULL); /* * Start reading at the slot's restart_lsn, which we know to point to diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 93f2648..f0f0390 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -258,6 +258,23 @@ static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); +/* + * Return true until either the client or server side have requested that we wind + * up COPY BOTH mode by sending a CopyDone. + * + * If we receive a CopyDone from the client we should avoid sending any further + * CopyData messages and return to command mode as promptly as possible. + * + * While in the middle of sending data to a client we notice a client-initated + * CopyDone when WalSndWriteData() calls ProcessRepliesIfAny() and it + * sets streamingDoneSending. + */ +static +bool IsStreamingActive(void) +{ + return !streamingDoneReceiving && !streamingDoneSending; +} + /* Initialize walsender process before entering the main command loop */ void @@ -940,7 +957,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + WalSndUpdateProgress, IsStreamingActive); /* * Signal that we don't need the timeout mechanism. We're just @@ -1091,7 +1108,8 @@ StartLogicalReplication(StartReplicationCmd *cmd) CreateDecodingContext(cmd->startpoint, cmd->options, false, logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData, - WalSndUpdateProgress); + WalSndUpdateProgress, + IsStreamingActive); WalSndSetState(WALSNDSTATE_CATCHUP); @@ -1194,17 +1212,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, CHECK_FOR_INTERRUPTS(); - /* Try to flush pending output to the client */ - if (pq_flush_if_writable() != 0) - WalSndShutdown(); - - /* Try taking fast path unless we get too close to walsender timeout. */ - if (now < TimestampTzPlusMilliseconds(last_reply_timestamp, - wal_sender_timeout / 2) && - !pq_is_send_pending()) - { - return; - } /* If we have pending write here, go to slow path */ for (;;) @@ -1358,7 +1365,14 @@ WalSndWaitForWal(XLogRecPtr loc) break; /* - * We only send regular messages to the client for full decoded + * If we have received CopyDone from the client, sent CopyDone + * ourselves, it's time to exit streaming. + */ + if (!IsStreamingActive()) { + break; + } + + /* We only send regular messages to the client for full decoded * transactions, but a synchronous replication and walsender shutdown * possibly are waiting for a later location. So we send pings * containing the flush location every now and then. diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index c25ac1f..4822bfc 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -100,7 +100,8 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress); + LogicalOutputPluginWriterUpdateProgress update_progress, + ContinueDecodingCB continue_decoding_cb); extern LogicalDecodingContext *CreateDecodingContext( XLogRecPtr start_lsn, List *output_plugin_options, @@ -108,7 +109,8 @@ extern LogicalDecodingContext *CreateDecodingContext( XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, - LogicalOutputPluginWriterUpdateProgress update_progress); + LogicalOutputPluginWriterUpdateProgress update_progress, + ContinueDecodingCB continue_decoding_cb); extern void DecodingContextFindStartpoint(LogicalDecodingContext *ctx); extern bool DecodingContextReady(LogicalDecodingContext *ctx); extern void FreeDecodingContext(LogicalDecodingContext *ctx); diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 7787edf..a3f7e8d 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -326,6 +326,14 @@ typedef void (*ReorderBufferMessageCB) ( const char *prefix, Size sz, const char *message); + +/* + * Callback function that allow interrupt logical replication during decoding. + * Function return true if decoding can be continue decode, but if function return false + * logical decoding will stop as soon as possible. + */ +typedef bool (*ContinueDecodingCB) (void); + struct ReorderBuffer { /* @@ -365,6 +373,11 @@ struct ReorderBuffer ReorderBufferMessageCB message; /* + * Callback to define status of decoding. Return false if decoding not necessary continue + */ + ContinueDecodingCB continue_decoding_cb; + + /* * Pointer that will be passed untouched to the callbacks. */ void *private_data; -- 2.6.4