*** a/doc/src/sgml/ref/pg_receivexlog.sgml --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 66,71 **** PostgreSQL documentation --- 66,78 ---- as possible. To avoid this behavior, use the -n parameter. + + + Synchronous mode offers the ability to confirm WAL have been streamed + in the same way as synchronous replication. To use synchronous mode, + set up synchronous replication as described in + , and set parameter(that is, -m and --slot). + *************** *** 106,111 **** PostgreSQL documentation --- 113,130 ---- + + + + + Enables synchronous mode. Add to flush of the change timing of a WAL file. + If we've written some records, flush them to disk. + We only report the flush position,when a slot has explicitly been used. + + + + + *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 370,376 **** LogStreamerMain(logstreamer_param *param) if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL)) /* * Any errors will already have been reported in the function process, --- 370,376 ---- if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, param->sysidentifier, param->xlogdir, reached_end_position, standby_message_timeout, ! NULL, 0)) /* * Any errors will already have been reported in the function process, *** a/src/bin/pg_basebackup/pg_receivexlog.c --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 35,40 **** --- 35,41 ---- static char *basedir = NULL; static int verbose = 0; static int noloop = 0; + static int syncmode = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static volatile bool time_to_abort = false; *************** *** 62,67 **** usage(void) --- 63,69 ---- printf(_("\nOptions:\n")); printf(_(" -D, --directory=DIR receive transaction log files into this directory\n")); printf(_(" -n, --no-loop do not loop on connection lost\n")); + printf(_(" -m, --sync-mode this mode is written some records, flush them to disk.\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); printf(_(" -?, --help show this help, then exit\n")); *************** *** 330,336 **** StreamLog(void) starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial"); PQfinish(conn); } --- 332,338 ---- starttli); ReceiveXlogStream(conn, startpos, starttli, NULL, basedir, ! stop_streaming, standby_message_timeout, ".partial", syncmode); PQfinish(conn); } *************** *** 360,365 **** main(int argc, char **argv) --- 362,368 ---- {"port", required_argument, NULL, 'p'}, {"username", required_argument, NULL, 'U'}, {"no-loop", no_argument, NULL, 'n'}, + {"sync-mode", no_argument, NULL, 'm'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, {"status-interval", required_argument, NULL, 's'}, *************** *** 389,395 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv", long_options, &option_index)) != -1) { switch (c) --- 392,398 ---- } } ! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm", long_options, &option_index)) != -1) { switch (c) *************** *** 436,441 **** main(int argc, char **argv) --- 439,447 ---- case 'n': noloop = 1; break; + case 'm': + syncmode = 1; + break; case 'v': verbose++; break; *** a/src/bin/pg_basebackup/receivelog.c --- b/src/bin/pg_basebackup/receivelog.c *************** *** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr; static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); --- 34,40 ---- static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, XLogRecPtr *stoppos, int syncmode); static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline); *************** *** 417,429 **** CheckServerVersionForStreaming(PGconn *conn) * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix) { char query[128]; char slotcmd[128]; --- 417,433 ---- * allows you to tell the difference between partial and completed files, * so that you can continue later where you left. * + * If 'syncmode' is not zero, synchronous mode. Flush is executed after all + * received WAL is written.We only report the flush position,when a slot + * has explicitly been used. + * * Note: The log position *must* be at a log segment start! */ bool ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *sysidentifier, char *basedir, stream_stop_callback stream_stop, ! int standby_message_timeout, char *partial_suffix, int syncmode) { char query[128]; char slotcmd[128]; *************** *** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos); if (res == NULL) goto error; --- 572,578 ---- /* Stream the WAL */ res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop, standby_message_timeout, partial_suffix, ! &stoppos, syncmode); if (res == NULL) goto error; *************** *** 717,724 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline) return true; } /* ! * The main loop of ReceiveXlogStream. Handles the COPY stream after * initiating streaming with the START_STREAMING command. * * If the COPY ends (not necessarily successfully) due a message from the --- 721,808 ---- return true; } + + static int + rcv_receive(bool timeout, char **copybuf, PGconn *conn, int standby_message_timeout, int64 last_status, int64 now) + { + int r; + + r = PQgetCopyData(conn, copybuf, 1); + if (r == 0) + { + if (timeout) + { + /* + * No data available. Wait for some to appear, but not longer than + * the specified timeout, so that we can ping the server. + */ + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + if (standby_message_timeout) + { + int64 targettime; + long secs; + int usecs; + + targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); + feTimestampDifference(now, + targettime, + &secs, + &usecs); + if (secs <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + else + timeout.tv_sec = secs; + timeout.tv_usec = usecs; + timeoutptr = &timeout; + } + else + timeoutptr = NULL; + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + return 0; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %s\n"), + progname, strerror(errno)); + return -2; + } + } + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, + _("%s: could not receive data from WAL stream: %s"), + progname, PQerrorMessage(conn)); + return -2; + } + r = PQgetCopyData(conn, copybuf, 1); + } + if (r == -2) + { + fprintf(stderr, _("%s: could not read COPY data: %s"), + progname, PQerrorMessage(conn)); + } + + /* Return received messages to caller */ + return r; + + } + /* ! * The main loop of ReceiveXLogStream. Handles the COPY stream after * initiating streaming with the START_STREAMING command. * * If the COPY ends (not necessarily successfully) due a message from the *************** *** 729,735 **** static PGresult * HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos) { char *copybuf = NULL; int64 last_status = -1; --- 813,819 ---- HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, char *partial_suffix, ! XLogRecPtr *stoppos, int syncmode) { char *copybuf = NULL; int64 last_status = -1; *************** *** 784,850 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, last_status = now; } ! r = PQgetCopyData(conn, ©buf, 1); ! if (r == 0) ! { ! /* ! * No data available. Wait for some to appear, but not longer than ! * the specified timeout, so that we can ping the server. ! */ ! fd_set input_mask; ! struct timeval timeout; ! struct timeval *timeoutptr; ! FD_ZERO(&input_mask); ! FD_SET(PQsocket(conn), &input_mask); ! if (standby_message_timeout && still_sending) { ! int64 targettime; ! long secs; ! int usecs; - targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000); - feTimestampDifference(now, - targettime, - &secs, - &usecs); - if (secs <= 0) - timeout.tv_sec = 1; /* Always sleep at least 1 sec */ - else - timeout.tv_sec = secs; - timeout.tv_usec = usecs; - timeoutptr = &timeout; - } - else - timeoutptr = NULL; - - r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); - if (r == 0 || (r < 0 && errno == EINTR)) - { /* ! * Got a timeout or signal. Continue the loop and either ! * deliver a status packet to the server or just go back into ! * blocking. */ ! continue; } ! else if (r < 0) { ! fprintf(stderr, _("%s: select() failed: %s\n"), ! progname, strerror(errno)); ! goto error; } ! /* Else there is actually data on the socket */ ! if (PQconsumeInput(conn) == 0) { ! fprintf(stderr, ! _("%s: could not receive data from WAL stream: %s"), ! progname, PQerrorMessage(conn)); goto error; } ! continue; } ! if (r == -1) { PGresult *res = PQgetResult(conn); --- 868,1043 ---- last_status = now; } ! r = rcv_receive(true , ©buf, conn, standby_message_timeout, last_status, now); ! while(r > 0) ! { ! /* Check the message type. */ ! if (copybuf[0] == 'k') { ! int pos; ! bool replyRequested; /* ! * Parse the keepalive message, enclosed in the CopyData message. ! * We just check if the server requested a reply, and ignore the ! * rest. */ ! pos = 1; /* skip msgtype 'k' */ ! pos += 8; /* skip walEnd */ ! pos += 8; /* skip sendTime */ ! ! if (r < pos + 1) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); ! goto error; ! } ! replyRequested = copybuf[pos]; ! ! /* If the server requested an immediate reply, send one. */ ! if (replyRequested && still_sending) ! { ! now = feGetCurrentTimestamp(); ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; ! last_status = now; ! } } ! else if (copybuf[0] == 'w') { ! /* ! * Once we've decided we don't want to receive any more, just ! * ignore any subsequent XLogData messages. ! */ ! if (!still_sending) ! continue; ! ! /* ! * Read the header of the XLogData message, enclosed in the ! * CopyData message. We only need the WAL location field ! * (dataStart), the rest of the header is ignored. ! */ ! hdr_len = 1; /* msgtype 'w' */ ! hdr_len += 8; /* dataStart */ ! hdr_len += 8; /* walEnd */ ! hdr_len += 8; /* sendTime */ ! if (r < hdr_len) ! { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); ! goto error; ! } ! blockpos = fe_recvint64(©buf[1]); ! ! /* Extract WAL location for this block */ ! xlogoff = blockpos % XLOG_SEG_SIZE; ! ! /* ! * Verify that the initial location in the stream matches where we ! * think we are. ! */ ! if (walfile == -1) ! { ! /* No file open yet */ ! if (xlogoff != 0) ! { ! fprintf(stderr, ! _("%s: received transaction log record for offset %u with no file open\n"), ! progname, xlogoff); ! goto error; ! } ! } ! else ! { ! /* More data in existing segment */ ! /* XXX: store seek value don't reseek all the time */ ! if (lseek(walfile, 0, SEEK_CUR) != xlogoff) ! { ! fprintf(stderr, ! _("%s: got WAL data offset %08x, expected %08x\n"), ! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); ! goto error; ! } ! } ! ! bytes_left = r - hdr_len; ! bytes_written = 0; ! ! while (bytes_left) ! { ! int bytes_to_write; ! ! /* ! * If crossing a WAL boundary, only write up until we reach ! * XLOG_SEG_SIZE. ! */ ! if (xlogoff + bytes_left > XLOG_SEG_SIZE) ! bytes_to_write = XLOG_SEG_SIZE - xlogoff; ! else ! bytes_to_write = bytes_left; ! ! if (walfile == -1) ! { ! if (!open_walfile(blockpos, timeline, ! basedir, partial_suffix)) ! { ! /* Error logged by open_walfile */ ! goto error; ! } ! } ! ! if (write(walfile, ! copybuf + hdr_len + bytes_written, ! bytes_to_write) != bytes_to_write) ! { ! fprintf(stderr, ! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), ! progname, bytes_to_write, current_walfile_name, ! strerror(errno)); ! goto error; ! } ! ! /* Write was successful, advance our position */ ! bytes_written += bytes_to_write; ! bytes_left -= bytes_to_write; ! blockpos += bytes_to_write; ! xlogoff += bytes_to_write; ! ! /* Did we reach the end of a WAL segment? */ ! if (blockpos % XLOG_SEG_SIZE == 0) ! { ! if (!close_walfile(basedir, partial_suffix, blockpos)) ! /* Error message written in close_walfile() */ ! goto error; ! ! xlogoff = 0; ! ! if (still_sending && stream_stop(blockpos, timeline, false)) ! { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! goto error; ! } ! still_sending = false; ! break; /* ignore the rest of this XLogData packet */ ! } ! } ! } ! /* No more data left to write, receive next copy packet */ } ! else { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); goto error; } ! r = rcv_receive(false , ©buf, conn, standby_message_timeout, last_status, now); } ! ! if(r == -1) { PGresult *res = PQgetResult(conn); *************** *** 880,1054 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, *stoppos = blockpos; return res; } - if (r == -2) - { - fprintf(stderr, _("%s: could not read COPY data: %s"), - progname, PQerrorMessage(conn)); - goto error; - } - - /* Check the message type. */ - if (copybuf[0] == 'k') - { - int pos; - bool replyRequested; - - /* - * Parse the keepalive message, enclosed in the CopyData message. - * We just check if the server requested a reply, and ignore the - * rest. - */ - pos = 1; /* skip msgtype 'k' */ - pos += 8; /* skip walEnd */ - pos += 8; /* skip sendTime */ - if (r < pos + 1) - { - fprintf(stderr, _("%s: streaming header too small: %d\n"), - progname, r); - goto error; - } - replyRequested = copybuf[pos]; ! /* If the server requested an immediate reply, send one. */ ! if (replyRequested && still_sending) ! { ! now = feGetCurrentTimestamp(); ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; ! last_status = now; ! } ! } ! else if (copybuf[0] == 'w') { ! /* ! * Once we've decided we don't want to receive any more, just ! * ignore any subsequent XLogData messages. ! */ ! if (!still_sending) ! continue; ! ! /* ! * Read the header of the XLogData message, enclosed in the ! * CopyData message. We only need the WAL location field ! * (dataStart), the rest of the header is ignored. ! */ ! hdr_len = 1; /* msgtype 'w' */ ! hdr_len += 8; /* dataStart */ ! hdr_len += 8; /* walEnd */ ! hdr_len += 8; /* sendTime */ ! if (r < hdr_len) { ! fprintf(stderr, _("%s: streaming header too small: %d\n"), ! progname, r); goto error; } ! blockpos = fe_recvint64(©buf[1]); ! ! /* Extract WAL location for this block */ ! xlogoff = blockpos % XLOG_SEG_SIZE; ! ! /* ! * Verify that the initial location in the stream matches where we ! * think we are. ! */ ! if (walfile == -1) ! { ! /* No file open yet */ ! if (xlogoff != 0) ! { ! fprintf(stderr, ! _("%s: received transaction log record for offset %u with no file open\n"), ! progname, xlogoff); ! goto error; ! } ! } ! else ! { ! /* More data in existing segment */ ! /* XXX: store seek value don't reseek all the time */ ! if (lseek(walfile, 0, SEEK_CUR) != xlogoff) ! { ! fprintf(stderr, ! _("%s: got WAL data offset %08x, expected %08x\n"), ! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); ! goto error; ! } ! } ! ! bytes_left = r - hdr_len; ! bytes_written = 0; ! ! while (bytes_left) ! { ! int bytes_to_write; ! ! /* ! * If crossing a WAL boundary, only write up until we reach ! * XLOG_SEG_SIZE. ! */ ! if (xlogoff + bytes_left > XLOG_SEG_SIZE) ! bytes_to_write = XLOG_SEG_SIZE - xlogoff; ! else ! bytes_to_write = bytes_left; ! ! if (walfile == -1) ! { ! if (!open_walfile(blockpos, timeline, ! basedir, partial_suffix)) ! { ! /* Error logged by open_walfile */ ! goto error; ! } ! } ! ! if (write(walfile, ! copybuf + hdr_len + bytes_written, ! bytes_to_write) != bytes_to_write) ! { ! fprintf(stderr, ! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"), ! progname, bytes_to_write, current_walfile_name, ! strerror(errno)); ! goto error; ! } ! ! /* Write was successful, advance our position */ ! bytes_written += bytes_to_write; ! bytes_left -= bytes_to_write; ! blockpos += bytes_to_write; ! xlogoff += bytes_to_write; ! ! /* Did we reach the end of a WAL segment? */ ! if (blockpos % XLOG_SEG_SIZE == 0) ! { ! if (!close_walfile(basedir, partial_suffix, blockpos)) ! /* Error message written in close_walfile() */ ! goto error; ! ! xlogoff = 0; ! ! if (still_sending && stream_stop(blockpos, timeline, false)) ! { ! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)) ! { ! fprintf(stderr, _("%s: could not send copy-end packet: %s"), ! progname, PQerrorMessage(conn)); ! goto error; ! } ! still_sending = false; ! break; /* ignore the rest of this XLogData packet */ ! } ! } ! } ! /* No more data left to write, receive next copy packet */ ! } ! else ! { ! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), ! progname, copybuf[0]); ! goto error; } } error: --- 1073,1093 ---- *stoppos = blockpos; return res; } ! if (walfile != -1 && lastFlushPosition < blockpos && syncmode) { ! if (fsync(walfile) != 0) { ! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"), ! progname, current_walfile_name, strerror(errno)); goto error; } ! lastFlushPosition = blockpos; ! if (!sendFeedback(conn, blockpos, now, false)) ! goto error; } + } error: *** a/src/bin/pg_basebackup/receivelog.h --- b/src/bin/pg_basebackup/receivelog.h *************** *** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn, char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix); --- 16,20 ---- char *basedir, stream_stop_callback stream_stop, int standby_message_timeout, ! char *partial_suffix, ! int syncmode);