*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 66,71 **** PostgreSQL documentation
--- 66,78 ----
as possible. To avoid this behavior, use the -n
parameter.
+
+
+ Synchronous mode offers the ability to confirm WAL have been streamed
+ in the same way as synchronous replication. To use synchronous mode,
+ set up synchronous replication as described in
+ , and set parameter(that is, -m and --slot).
+
***************
*** 106,111 **** PostgreSQL documentation
--- 113,130 ----
+
+
+
+
+ Enables synchronous mode. Add to flush of the change timing of a WAL file.
+ If we've written some records, flush them to disk.
+ We only report the flush position,when a slot has explicitly been used.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, 0))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 35,40 ****
--- 35,41 ----
static char *basedir = NULL;
static int verbose = 0;
static int noloop = 0;
+ static int syncmode = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
static volatile bool time_to_abort = false;
***************
*** 62,67 **** usage(void)
--- 63,69 ----
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -m, --sync-mode this mode is written some records, flush them to disk.\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial");
PQfinish(conn);
}
--- 332,338 ----
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial", syncmode);
PQfinish(conn);
}
***************
*** 360,365 **** main(int argc, char **argv)
--- 362,368 ----
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"sync-mode", no_argument, NULL, 'm'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 392,398 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWvm",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 439,447 ----
case 'n':
noloop = 1;
break;
+ case 'm':
+ syncmode = 1;
+ break;
case 'v':
verbose++;
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 34,40 **** static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
--- 34,40 ----
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos, int syncmode);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
***************
*** 417,429 **** CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix)
{
char query[128];
char slotcmd[128];
--- 417,433 ----
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * If 'syncmode' is not zero, synchronous mode. Flush is executed after all
+ * received WAL is written.We only report the flush position,when a slot
+ * has explicitly been used.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix, int syncmode)
{
char query[128];
char slotcmd[128];
***************
*** 568,574 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos);
if (res == NULL)
goto error;
--- 572,578 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, syncmode);
if (res == NULL)
goto error;
***************
*** 717,724 **** ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos, uint32 *timeline)
return true;
}
/*
! * The main loop of ReceiveXlogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
* If the COPY ends (not necessarily successfully) due a message from the
--- 721,808 ----
return true;
}
+
+ static int
+ rcv_receive(bool timeout, char **copybuf, PGconn *conn, int standby_message_timeout, int64 last_status, int64 now)
+ {
+ int r;
+
+ r = PQgetCopyData(conn, copybuf, 1);
+ if (r == 0)
+ {
+ if (timeout)
+ {
+ /*
+ * No data available. Wait for some to appear, but not longer than
+ * the specified timeout, so that we can ping the server.
+ */
+ fd_set input_mask;
+ struct timeval timeout;
+ struct timeval *timeoutptr;
+
+ FD_ZERO(&input_mask);
+ FD_SET(PQsocket(conn), &input_mask);
+ if (standby_message_timeout)
+ {
+ int64 targettime;
+ long secs;
+ int usecs;
+
+ targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
+ feTimestampDifference(now,
+ targettime,
+ &secs,
+ &usecs);
+ if (secs <= 0)
+ timeout.tv_sec = 1; /* Always sleep at least 1 sec */
+ else
+ timeout.tv_sec = secs;
+ timeout.tv_usec = usecs;
+ timeoutptr = &timeout;
+ }
+ else
+ timeoutptr = NULL;
+
+ r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
+ if (r == 0 || (r < 0 && errno == EINTR))
+ {
+ /*
+ * Got a timeout or signal. Continue the loop and either
+ * deliver a status packet to the server or just go back into
+ * blocking.
+ */
+ return 0;
+ }
+ else if (r < 0)
+ {
+ fprintf(stderr, _("%s: select() failed: %s\n"),
+ progname, strerror(errno));
+ return -2;
+ }
+ }
+ /* Else there is actually data on the socket */
+ if (PQconsumeInput(conn) == 0)
+ {
+ fprintf(stderr,
+ _("%s: could not receive data from WAL stream: %s"),
+ progname, PQerrorMessage(conn));
+ return -2;
+ }
+ r = PQgetCopyData(conn, copybuf, 1);
+ }
+ if (r == -2)
+ {
+ fprintf(stderr, _("%s: could not read COPY data: %s"),
+ progname, PQerrorMessage(conn));
+ }
+
+ /* Return received messages to caller */
+ return r;
+
+ }
+
/*
! * The main loop of ReceiveXLogStream. Handles the COPY stream after
* initiating streaming with the START_STREAMING command.
*
* If the COPY ends (not necessarily successfully) due a message from the
***************
*** 729,735 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
--- 813,819 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int syncmode)
{
char *copybuf = NULL;
int64 last_status = -1;
***************
*** 784,850 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
last_status = now;
}
! r = PQgetCopyData(conn, ©buf, 1);
! if (r == 0)
! {
! /*
! * No data available. Wait for some to appear, but not longer than
! * the specified timeout, so that we can ping the server.
! */
! fd_set input_mask;
! struct timeval timeout;
! struct timeval *timeoutptr;
! FD_ZERO(&input_mask);
! FD_SET(PQsocket(conn), &input_mask);
! if (standby_message_timeout && still_sending)
{
! int64 targettime;
! long secs;
! int usecs;
- targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
- feTimestampDifference(now,
- targettime,
- &secs,
- &usecs);
- if (secs <= 0)
- timeout.tv_sec = 1; /* Always sleep at least 1 sec */
- else
- timeout.tv_sec = secs;
- timeout.tv_usec = usecs;
- timeoutptr = &timeout;
- }
- else
- timeoutptr = NULL;
-
- r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
- if (r == 0 || (r < 0 && errno == EINTR))
- {
/*
! * Got a timeout or signal. Continue the loop and either
! * deliver a status packet to the server or just go back into
! * blocking.
*/
! continue;
}
! else if (r < 0)
{
! fprintf(stderr, _("%s: select() failed: %s\n"),
! progname, strerror(errno));
! goto error;
}
! /* Else there is actually data on the socket */
! if (PQconsumeInput(conn) == 0)
{
! fprintf(stderr,
! _("%s: could not receive data from WAL stream: %s"),
! progname, PQerrorMessage(conn));
goto error;
}
! continue;
}
! if (r == -1)
{
PGresult *res = PQgetResult(conn);
--- 868,1043 ----
last_status = now;
}
! r = rcv_receive(true , ©buf, conn, standby_message_timeout, last_status, now);
! while(r > 0)
! {
! /* Check the message type. */
! if (copybuf[0] == 'k')
{
! int pos;
! bool replyRequested;
/*
! * Parse the keepalive message, enclosed in the CopyData message.
! * We just check if the server requested a reply, and ignore the
! * rest.
*/
! pos = 1; /* skip msgtype 'k' */
! pos += 8; /* skip walEnd */
! pos += 8; /* skip sendTime */
!
! if (r < pos + 1)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! replyRequested = copybuf[pos];
!
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
! {
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
! }
}
! else if (copybuf[0] == 'w')
{
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! continue;
!
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! blockpos = fe_recvint64(©buf[1]);
!
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
!
! /*
! * Verify that the initial location in the stream matches where we
! * think we are.
! */
! if (walfile == -1)
! {
! /* No file open yet */
! if (xlogoff != 0)
! {
! fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
! goto error;
! }
! }
! else
! {
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! {
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! goto error;
! }
! }
!
! bytes_left = r - hdr_len;
! bytes_written = 0;
!
! while (bytes_left)
! {
! int bytes_to_write;
!
! /*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
! */
! if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! else
! bytes_to_write = bytes_left;
!
! if (walfile == -1)
! {
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
! {
! /* Error logged by open_walfile */
! goto error;
! }
! }
!
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
! {
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
! goto error;
! }
!
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
!
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
! {
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
! goto error;
!
! xlogoff = 0;
!
! if (still_sending && stream_stop(blockpos, timeline, false))
! {
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! goto error;
! }
! still_sending = false;
! break; /* ignore the rest of this XLogData packet */
! }
! }
! }
! /* No more data left to write, receive next copy packet */
}
! else
{
! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! progname, copybuf[0]);
goto error;
}
! r = rcv_receive(false , ©buf, conn, standby_message_timeout, last_status, now);
}
!
! if(r == -1)
{
PGresult *res = PQgetResult(conn);
***************
*** 880,1054 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
*stoppos = blockpos;
return res;
}
- if (r == -2)
- {
- fprintf(stderr, _("%s: could not read COPY data: %s"),
- progname, PQerrorMessage(conn));
- goto error;
- }
-
- /* Check the message type. */
- if (copybuf[0] == 'k')
- {
- int pos;
- bool replyRequested;
-
- /*
- * Parse the keepalive message, enclosed in the CopyData message.
- * We just check if the server requested a reply, and ignore the
- * rest.
- */
- pos = 1; /* skip msgtype 'k' */
- pos += 8; /* skip walEnd */
- pos += 8; /* skip sendTime */
- if (r < pos + 1)
- {
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
- goto error;
- }
- replyRequested = copybuf[pos];
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
! {
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
! }
! }
! else if (copybuf[0] == 'w')
{
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! continue;
!
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
{
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
goto error;
}
! blockpos = fe_recvint64(©buf[1]);
!
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
!
! /*
! * Verify that the initial location in the stream matches where we
! * think we are.
! */
! if (walfile == -1)
! {
! /* No file open yet */
! if (xlogoff != 0)
! {
! fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
! goto error;
! }
! }
! else
! {
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! {
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! goto error;
! }
! }
!
! bytes_left = r - hdr_len;
! bytes_written = 0;
!
! while (bytes_left)
! {
! int bytes_to_write;
!
! /*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
! */
! if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! else
! bytes_to_write = bytes_left;
!
! if (walfile == -1)
! {
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
! {
! /* Error logged by open_walfile */
! goto error;
! }
! }
!
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
! {
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
! goto error;
! }
!
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
!
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
! {
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
! goto error;
!
! xlogoff = 0;
!
! if (still_sending && stream_stop(blockpos, timeline, false))
! {
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! goto error;
! }
! still_sending = false;
! break; /* ignore the rest of this XLogData packet */
! }
! }
! }
! /* No more data left to write, receive next copy packet */
! }
! else
! {
! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! progname, copybuf[0]);
! goto error;
}
}
error:
--- 1073,1093 ----
*stoppos = blockpos;
return res;
}
! if (walfile != -1 && lastFlushPosition < blockpos && syncmode)
{
! if (fsync(walfile) != 0)
{
! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! progname, current_walfile_name, strerror(errno));
goto error;
}
! lastFlushPosition = blockpos;
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
}
+
}
error:
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix);
--- 16,20 ----
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix,
! int syncmode);