From 14bbd25cf27c3555126b571ee7cb41524f2a8729 Mon Sep 17 00:00:00 2001 From: Craig Ringer Date: Tue, 10 May 2016 10:34:10 +0800 Subject: [PATCH 1/2] Respect client-initiated CopyDone in walsender The walsender never reacted to CopyDone sent by the client unless it had already decided it was done sending data and dispatched its own CopyDone message. It actually noticed CopyDone from the client when WalSndWriteData() called ProcessRepliesIfAny() but it didn't react to it until it separately decided to end streaming from the walsender end. Modify the walsender so it checks for client-initiated CopyDone when in COPY BOTH mode. It now cleans up what it's doing, reples with its own CopyDone and returns to command mode. In logical decoding this will allow the client to end a logical decoding session between transactions without just unilaterally closing its connection. For physical walsender connections this allows the client to end streaming before the end of a timeline. This change does not allow a client to end COPY BOTH session in the middle of processing a logical decoding commit (in ReorderBufferCommit) or while decoding a large WAL record, so there can still be a significant delay before the walsender reacts to the client. --- src/backend/replication/walsender.c | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1ea2a5c..ad94c13 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -759,6 +759,13 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req /* make sure we have enough WAL available */ flushptr = WalSndWaitForWal(targetPagePtr + reqLen); + /* + * If the client sent CopyDone while we were waiting, + * bail out so we can wind up the decoding session. + */ + if (streamingDoneSending) + return -1; + /* more than one block available */ if (targetPagePtr + XLOG_BLCKSZ <= flushptr) count = XLOG_BLCKSZ; @@ -1220,8 +1227,11 @@ WalSndWaitForWal(XLogRecPtr loc) * It's important to do this check after the recomputation of * RecentFlushPtr, so we can send all remaining data before shutting * down. + * + * We'll also exit here if the client sent CopyDone because it wants + * to return to command mode. */ - if (walsender_ready_to_stop) + if (walsender_ready_to_stop || streamingDoneReceiving) break; /* @@ -1850,10 +1860,15 @@ WalSndLoop(WalSndSendDataCallback send_data) * some more. If there is some, we don't bother to call send_data * again until we've flushed it ... but we'd better assume we are not * caught up. + * + * If we're trying to finish sending and exit we shouldn't enqueue more + * data to libpq. We need to finish writing out whatever we already + * have in libpq's send buffer to maintain protocol sync so we still + * need to loop until it's flushed. */ - if (!pq_is_send_pending()) + if (!pq_is_send_pending() && !streamingDoneSending) send_data(); - else + else if (!streamingDoneSending) WalSndCaughtUp = false; /* Try to flush pending output to the client */ @@ -2909,7 +2924,7 @@ WalSndKeepaliveIfNecessary(TimestampTz now) if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0) return; - if (waiting_for_ping_response) + if (waiting_for_ping_response || streamingDoneSending) return; /* -- 2.5.5