From 867251ede310384ec3c9c72487411e634bd006fe Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Mon, 3 Dec 2018 06:28:31 -0500 Subject: [PATCH 3/4] Add ability for pg_recvlogical to stop replication from client side --- src/bin/pg_basebackup/pg_recvlogical.c | 490 +++++++++++++++++++-------------- 1 file changed, 290 insertions(+), 200 deletions(-) diff --git a/src/bin/pg_basebackup/pg_recvlogical.c b/src/bin/pg_basebackup/pg_recvlogical.c index a242e0b..2d69aa7 100644 --- a/src/bin/pg_basebackup/pg_recvlogical.c +++ b/src/bin/pg_basebackup/pg_recvlogical.c @@ -56,8 +56,12 @@ static const char *plugin = "test_decoding"; /* Global State */ static int outfd = -1; static volatile sig_atomic_t time_to_abort = false; +static volatile sig_atomic_t force_time_to_abort = false; static volatile sig_atomic_t output_reopen = false; +static bool copyDoneSent; +static bool copyDoneReceived; static bool output_isfile; +static int64 last_status_time; static TimestampTz output_last_fsync = -1; static bool output_needs_fsync = false; static XLogRecPtr output_written_lsn = InvalidXLogRecPtr; @@ -205,6 +209,222 @@ OutputFsync(TimestampTz now) return true; } +static bool +ProcessKeepalive(PGconn *conn, char *msgBuf, int msgLength) +{ + int pos; + bool replyRequested; + XLogRecPtr walEnd; + + /* + * Parse the keepalive message, enclosed in the CopyData message. + * We just check if the server requested a reply, and ignore the + * rest. + */ + pos = 1; /* skip msgtype 'k' */ + + /* read walEnd */ + walEnd = fe_recvint64(&msgBuf[pos]); + output_written_lsn = Max(walEnd, output_written_lsn); + + pos += 8; /* skip sendTime */ + + if (msgLength < pos + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, msgLength); + return -1; + } + replyRequested = msgBuf[pos]; + + /* If the server requested an immediate reply, send one. */ + if (replyRequested) + { + int64 now = feGetCurrentTimestamp(); + + /* fsync data, so we send a recent flush pointer */ + if (!OutputFsync(now)) + { + return false; + } + + if (!sendFeedback(conn, now, true, false)) + { + return false; + } + last_status_time = now; + } + + return true; +} + +static bool +ProcessXLogData(PGconn *conn, char *msgBuf, int msgLength) +{ + int bytes_left; + int bytes_written; + + /* + * Read the header of the XLogData message, enclosed in the CopyData + * message. We only need the WAL location field (dataStart), the rest + * of the header is ignored. + */ + int hdr_len = 1; /* msgtype 'w' */ + hdr_len += 8; /* dataStart */ + hdr_len += 8; /* walEnd */ + hdr_len += 8; /* sendTime */ + if (msgLength < hdr_len + 1) + { + fprintf(stderr, _("%s: streaming header too small: %d\n"), + progname, msgLength); + return false; + } + + if (time_to_abort && copyDoneSent) + { + /* + * We've sent feedback and sent CopyDone, so we are now discarding + * xlog data input to find the server's reply CopyDone. That way when + * another client connects to the slot later they start replay exactly + * where we left off - or at least at the last commit we flushed to + * disk. This is not an error condition. + */ + return true; + } + + /* Extract WAL location for this block */ + { + XLogRecPtr temp = fe_recvint64(&msgBuf[1]); + + output_written_lsn = Max(temp, output_written_lsn); + } + + bytes_left = msgLength - hdr_len; + bytes_written = 0; + + /* signal that a fsync is needed */ + output_needs_fsync = true; + + while (bytes_left) + { + int ret; + + ret = write(outfd, + msgBuf + hdr_len + bytes_written, + bytes_left); + + if (ret < 0) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, bytes_left, outfile, + strerror(errno)); + return false; + } + + /* Write was successful, advance our position */ + bytes_written += ret; + bytes_left -= ret; + } + + if (write(outfd, "\n", 1) != 1) + { + fprintf(stderr, + _("%s: could not write %u bytes to log file \"%s\": %s\n"), + progname, 1, outfile, + strerror(errno)); + return false; + } + + return true; +} + +static bool +ProcessReceiveMsg(PGconn *conn, unsigned char type, char *msgBuf, int msgLength) +{ + bool success = false; + switch (type) + { + case 'k': + success = ProcessKeepalive(conn, msgBuf, msgLength); + break; + case 'w': + success = ProcessXLogData(conn, msgBuf, msgLength); + break; + default: + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, type); + } + + return success; +} + +/* + * Sync wait activity on socket. Waiting can be interrupt by fsync or keepalive timeout. + * Returns the number of ready descriptors, or -1 for errors. + */ +static int +WaitSocketActivity(PGconn *conn, int64 now) +{ + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + int64 message_target = 0; + int64 fsync_target = 0; + struct timeval timeout; + struct timeval *timeoutptr = NULL; + + if (PQsocket(conn) < 0) + { + fprintf(stderr, + _("%s: invalid socket: %s"), + progname, PQerrorMessage(conn)); + return -1; + } + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + + /* Compute when we need to wakeup to send a keepalive message. */ + if (standby_message_timeout) + message_target = last_status_time + (standby_message_timeout - 1) * + ((int64) 1000); + + /* Compute when we need to wakeup to fsync the output file. */ + if (fsync_interval > 0 && output_needs_fsync) + fsync_target = output_last_fsync + (fsync_interval - 1) * + ((int64) 1000); + + /* Now compute when to wakeup. */ + if (message_target > 0 || fsync_target > 0) + { + int64 targettime; + long secs; + int usecs; + + targettime = message_target; + + if (fsync_target > 0 && fsync_target < targettime) + targettime = fsync_target; + + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + else + timeout.tv_sec = secs; + timeout.tv_usec = usecs; + timeoutptr = &timeout; + } + + return select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); +} + /* * Start the log streaming */ @@ -213,13 +433,14 @@ StreamLogicalLog(void) { PGresult *res; char *copybuf = NULL; - TimestampTz last_status = -1; int i; PQExpBuffer query; output_written_lsn = InvalidXLogRecPtr; output_fsync_lsn = InvalidXLogRecPtr; - + last_status_time = -1; + copyDoneReceived = false; + copyDoneSent = false; query = createPQExpBuffer(); /* @@ -281,13 +502,10 @@ StreamLogicalLog(void) _("%s: streaming initiated\n"), progname); - while (!time_to_abort) + while (!force_time_to_abort) { int r; - int bytes_left; - int bytes_written; TimestampTz now; - int hdr_len; XLogRecPtr cur_record_lsn = InvalidXLogRecPtr; if (copybuf != NULL) @@ -309,15 +527,50 @@ StreamLogicalLog(void) goto error; } - if (standby_message_timeout > 0 && - feTimestampDifferenceExceeds(last_status, now, + if (standby_message_timeout > 0 && !time_to_abort && + feTimestampDifferenceExceeds(last_status_time, now, standby_message_timeout)) { /* Time to send feedback! */ if (!sendFeedback(conn, now, true, false)) goto error; - last_status = now; + last_status_time = now; + } + + if (time_to_abort && !copyDoneSent) + { + if (verbose) + { + fprintf(stderr, + _("%s: stopping write up to %X/%X, flush to %X/%X (slot %s)\n"), + progname, + (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn, + (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn, + replication_slot); + } + + /* + * Force fsync and send feedback just before we send CopyDone to + * make sure the server knows exactly what we replayed up to. We'll + * discard data received after we request the end of COPY BOTH mode + * so we know we've written everything we're going to. + */ + if (!OutputFsync(now)) + goto error; + + if (!sendFeedback(conn, now, true, false)) + goto error; + + last_status_time = now; + + if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) + { + fprintf(stderr, _("%s: could not send copy-end packet: %s"), + progname, PQerrorMessage(conn)); + goto error; + } + copyDoneSent = true; } /* got SIGHUP, close output file */ @@ -360,64 +613,9 @@ StreamLogicalLog(void) r = PQgetCopyData(conn, ©buf, 1); if (r == 0) { - /* - * In async mode, and no data available. We block on reading but - * not more than the specified timeout, so that we can send a - * response back to the client. - */ - fd_set input_mask; - TimestampTz message_target = 0; - TimestampTz fsync_target = 0; - struct timeval timeout; - struct timeval *timeoutptr = NULL; - - if (PQsocket(conn) < 0) - { - fprintf(stderr, - _("%s: invalid socket: %s"), - progname, PQerrorMessage(conn)); - goto error; - } - - FD_ZERO(&input_mask); - FD_SET(PQsocket(conn), &input_mask); - - /* Compute when we need to wakeup to send a keepalive message. */ - if (standby_message_timeout) - message_target = last_status + (standby_message_timeout - 1) * - ((int64) 1000); - - /* Compute when we need to wakeup to fsync the output file. */ - if (fsync_interval > 0 && output_needs_fsync) - fsync_target = output_last_fsync + (fsync_interval - 1) * - ((int64) 1000); - - /* Now compute when to wakeup. */ - if (message_target > 0 || fsync_target > 0) - { - TimestampTz targettime; - long secs; - int usecs; - - targettime = message_target; - - if (fsync_target > 0 && fsync_target < targettime) - targettime = fsync_target; - - feTimestampDifference(now, - targettime, - &secs, - &usecs); - if (secs <= 0) - timeout.tv_sec = 1; /* Always sleep at least 1 sec */ - else - timeout.tv_sec = secs; - timeout.tv_usec = usecs; - timeoutptr = &timeout; - } - - r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (r == 0 || (r < 0 && errno == EINTR)) + int readyMsg = WaitSocketActivity(conn, now); + + if (readyMsg == 0 || (readyMsg < 0 && errno == EINTR)) { /* * Got a timeout or signal. Continue the loop and either @@ -426,7 +624,7 @@ StreamLogicalLog(void) */ continue; } - else if (r < 0) + else if (readyMsg < 0) { fprintf(stderr, _("%s: select() failed: %s\n"), progname, strerror(errno)); @@ -441,12 +639,26 @@ StreamLogicalLog(void) progname, PQerrorMessage(conn)); goto error; } + continue; } - /* End of copy stream */ + /* + * End of copy stream (server sent CopyDone) + * + * This is where we exit on normal time_to_abort because our own + * CopyDone caused the server to shut down streaming on its end. + */ if (r == -1) + { + copyDoneReceived = true; + if (verbose && time_to_abort && copyDoneSent) + { + fprintf(stderr, + _("%s: streaming ended by user request"), progname); + } break; + } /* Failure while reading the copy stream */ if (r == -2) @@ -456,138 +668,8 @@ StreamLogicalLog(void) goto error; } - /* Check the message type. */ - if (copybuf[0] == 'k') - { - int pos; - bool replyRequested; - XLogRecPtr walEnd; - bool endposReached = false; - - /* - * Parse the keepalive message, enclosed in the CopyData message. - * We just check if the server requested a reply, and ignore the - * rest. - */ - pos = 1; /* skip msgtype 'k' */ - walEnd = fe_recvint64(©buf[pos]); - output_written_lsn = Max(walEnd, output_written_lsn); - - pos += 8; /* read walEnd */ - - pos += 8; /* skip sendTime */ - - if (r < pos + 1) - { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); - goto error; - } - replyRequested = copybuf[pos]; - - if (endpos != InvalidXLogRecPtr && walEnd >= endpos) - { - /* - * If there's nothing to read on the socket until a keepalive - * we know that the server has nothing to send us; and if - * walEnd has passed endpos, we know nothing else can have - * committed before endpos. So we can bail out now. - */ - endposReached = true; - } - - /* Send a reply, if necessary */ - if (replyRequested || endposReached) - { - if (!flushAndSendFeedback(conn, &now)) - goto error; - last_status = now; - } - - if (endposReached) - { - prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr); - time_to_abort = true; - break; - } - - continue; - } - else if (copybuf[0] != 'w') - { - fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), - progname, copybuf[0]); - goto error; - } - - /* - * Read the header of the XLogData message, enclosed in the CopyData - * message. We only need the WAL location field (dataStart), the rest - * of the header is ignored. - */ - hdr_len = 1; /* msgtype 'w' */ - hdr_len += 8; /* dataStart */ - hdr_len += 8; /* walEnd */ - hdr_len += 8; /* sendTime */ - if (r < hdr_len + 1) + if(!ProcessReceiveMsg(conn, copybuf[0], copybuf, r)) { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); - goto error; - } - - /* Extract WAL location for this block */ - cur_record_lsn = fe_recvint64(©buf[1]); - - if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos) - { - /* - * We've read past our endpoint, so prepare to go away being - * cautious about what happens to our output data. - */ - if (!flushAndSendFeedback(conn, &now)) - goto error; - prepareToTerminate(conn, endpos, false, cur_record_lsn); - time_to_abort = true; - break; - } - - output_written_lsn = Max(cur_record_lsn, output_written_lsn); - - bytes_left = r - hdr_len; - bytes_written = 0; - - /* signal that a fsync is needed */ - output_needs_fsync = true; - - while (bytes_left) - { - int ret; - - ret = write(outfd, - copybuf + hdr_len + bytes_written, - bytes_left); - - if (ret < 0) - { - fprintf(stderr, - _("%s: could not write %u bytes to log file \"%s\": %s\n"), - progname, bytes_left, outfile, - strerror(errno)); - goto error; - } - - /* Write was successful, advance our position */ - bytes_written += ret; - bytes_left -= ret; - } - - if (write(outfd, "\n", 1) != 1) - { - fprintf(stderr, - _("%s: could not write %u bytes to log file \"%s\": %s\n"), - progname, 1, outfile, - strerror(errno)); goto error; } @@ -656,6 +738,14 @@ error: static void sigint_handler(int signum) { + /* + * Backward compatible, allow force interrupt logical replication + * after second SIGINT without wait CopyDone from server + */ + if (time_to_abort) + { + force_time_to_abort = true; + } time_to_abort = true; } -- 2.6.4