*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 106,111 **** PostgreSQL documentation
--- 106,127 ----
+
+
+
+
+ How often should pg_receivexlog issue sync
+ commands to ensure the received WAL file is safely
+ flushed to disk without being asked by the server to do so. Specifying
+ an interval of 0 together the consecutive data.
+ Not specifying an interval disables issuing fsyncs altogether,
+ while still reporting progress the server. In this case, data may be
+ lost in the event of a crash.
+
+
+
+
+
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL))
/*
* Any errors will already have been reported in the function process,
--- 370,376 ----
if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
param->sysidentifier, param->xlogdir,
reached_end_position, standby_message_timeout,
! NULL, -1))
/*
* Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,41 **** static char *basedir = NULL;
--- 36,42 ----
static int verbose = 0;
static int noloop = 0;
static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
+ static int fsync_interval = -1; /* Invalid = default */
static volatile bool time_to_abort = false;
***************
*** 62,67 **** usage(void)
--- 63,70 ----
printf(_("\nOptions:\n"));
printf(_(" -D, --directory=DIR receive transaction log files into this directory\n"));
printf(_(" -n, --no-loop do not loop on connection lost\n"));
+ printf(_(" -F --fsync-interval=SECS\n"
+ " frequency of syncs to the output file (default: file close only)\n"));
printf(_(" -v, --verbose output verbose messages\n"));
printf(_(" -V, --version output version information, then exit\n"));
printf(_(" -?, --help show this help, then exit\n"));
***************
*** 330,336 **** StreamLog(void)
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial");
PQfinish(conn);
}
--- 333,339 ----
starttli);
ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
! stop_streaming, standby_message_timeout, ".partial", fsync_interval);
PQfinish(conn);
}
***************
*** 360,365 **** main(int argc, char **argv)
--- 363,369 ----
{"port", required_argument, NULL, 'p'},
{"username", required_argument, NULL, 'U'},
{"no-loop", no_argument, NULL, 'n'},
+ {"fsync-interval", required_argument, NULL, 'F'},
{"no-password", no_argument, NULL, 'w'},
{"password", no_argument, NULL, 'W'},
{"status-interval", required_argument, NULL, 's'},
***************
*** 389,395 **** main(int argc, char **argv)
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
long_options, &option_index)) != -1)
{
switch (c)
--- 393,399 ----
}
}
! while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
long_options, &option_index)) != -1)
{
switch (c)
***************
*** 436,441 **** main(int argc, char **argv)
--- 440,454 ----
case 'n':
noloop = 1;
break;
+ case 'F':
+ fsync_interval = atoi(optarg) * 1000;
+ if (fsync_interval < 0)
+ {
+ fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+ progname, optarg);
+ exit(1);
+ }
+ break;
case 'v':
verbose++;
break;
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 30,46 **** static int walfile = -1;
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
-
/*
* Open a new WAL file in the specified directory.
*
--- 30,46 ----
static char current_walfile_name[MAXPGPATH] = "";
static bool reportFlushPosition = false;
static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
+ static int64 output_last_fsync = -1;
static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
uint32 timeline, char *basedir,
stream_stop_callback stream_stop, int standby_message_timeout,
! char *partial_suffix, XLogRecPtr *stoppos,int fsync_interval);
static int CopyStreamPoll(PGconn *conn, long timeout_ms);
static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
uint32 *timeline);
/*
* Open a new WAL file in the specified directory.
*
***************
*** 419,431 **** CheckServerVersionForStreaming(PGconn *conn)
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix)
{
char query[128];
char slotcmd[128];
--- 419,434 ----
* allows you to tell the difference between partial and completed files,
* so that you can continue later where you left.
*
+ * fsync_interval controls how often we flush to the received
+ * WAL file, in seconds.
+ *
* Note: The log position *must* be at a log segment start!
*/
bool
ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *sysidentifier, char *basedir,
stream_stop_callback stream_stop,
! int standby_message_timeout, char *partial_suffix, int fsync_interval)
{
char query[128];
char slotcmd[128];
***************
*** 570,576 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos);
if (res == NULL)
goto error;
--- 573,579 ----
/* Stream the WAL */
res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
standby_message_timeout, partial_suffix,
! &stoppos, fsync_interval);
if (res == NULL)
goto error;
***************
*** 731,737 **** static PGresult *
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos)
{
char *copybuf = NULL;
int64 last_status = -1;
--- 734,740 ----
HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
char *basedir, stream_stop_callback stream_stop,
int standby_message_timeout, char *partial_suffix,
! XLogRecPtr *stoppos, int fsync_interval)
{
char *copybuf = NULL;
int64 last_status = -1;
***************
*** 747,752 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
--- 750,757 ----
int64 now;
int hdr_len;
long sleeptime;
+ int64 message_target = 0;
+ int64 fsync_target = 0;
/*
* Check if we should continue streaming, or abort at this point.
***************
*** 780,796 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
goto error;
last_status = now;
}
!
! /*
! * Compute how long send/receive loops should sleep
! */
! if (standby_message_timeout && still_sending)
{
int64 targettime;
long secs;
int usecs;
! targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
feTimestampDifference(now,
targettime,
&secs,
--- 785,813 ----
goto error;
last_status = now;
}
!
! /* Compute when we need to wakeup to send a keepalive message. */
! if (standby_message_timeout)
! message_target = last_status + (standby_message_timeout - 1) *
! ((int64) 1000);
!
! /* Compute when we need to wakeup to fsync the output file. */
! if (fsync_interval > 0 && lastFlushPosition < blockpos)
! fsync_target = output_last_fsync + (fsync_interval - 1) *
! ((int64) 1000);
!
! /* Now compute when to wakeup. Compute how long send/receive loops should sleep*/
! if (still_sending && (message_target > 0 || fsync_target > 0))
{
int64 targettime;
long secs;
int usecs;
! targettime = message_target;
!
! if (fsync_target > 0 && fsync_target < targettime)
! targettime = fsync_target;
!
feTimestampDifference(now,
targettime,
&secs,
***************
*** 808,1016 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
sleeptime = -1;
r = CopyStreamReceive(conn, sleeptime, ©buf);
! if (r == 0)
! continue;
! if (r == -1)
! goto error;
! if (r == -2)
{
! PGresult *res = PQgetResult(conn);
!
! /*
! * The server closed its end of the copy stream. If we haven't
! * closed ours already, we need to do so now, unless the server
! * threw an error, in which case we don't.
! */
! if (still_sending)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
{
! /* Error message written in close_walfile() */
! PQclear(res);
goto error;
}
! if (PQresultStatus(res) == PGRES_COPY_IN)
{
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr,
! _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
! PQclear(res);
goto error;
! }
! res = PQgetResult(conn);
}
- still_sending = false;
- }
- if (copybuf != NULL)
- PQfreemem(copybuf);
- copybuf = NULL;
- *stoppos = blockpos;
- return res;
- }
-
- /* Check the message type. */
- if (copybuf[0] == 'k')
- {
- int pos;
- bool replyRequested;
-
- /*
- * Parse the keepalive message, enclosed in the CopyData message.
- * We just check if the server requested a reply, and ignore the
- * rest.
- */
- pos = 1; /* skip msgtype 'k' */
- pos += 8; /* skip walEnd */
- pos += 8; /* skip sendTime */
-
- if (r < pos + 1)
- {
- fprintf(stderr, _("%s: streaming header too small: %d\n"),
- progname, r);
- goto error;
}
! replyRequested = copybuf[pos];
!
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
{
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
! goto error;
! last_status = now;
! }
! }
! else if (copybuf[0] == 'w')
! {
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! continue;
!
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
! {
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
! goto error;
! }
! blockpos = fe_recvint64(©buf[1]);
!
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
! /*
! * Verify that the initial location in the stream matches where we
! * think we are.
! */
! if (walfile == -1)
! {
! /* No file open yet */
! if (xlogoff != 0)
! {
! fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
! goto error;
! }
! }
! else
! {
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
{
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
goto error;
}
! }
!
! bytes_left = r - hdr_len;
! bytes_written = 0;
! while (bytes_left)
! {
! int bytes_to_write;
/*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
*/
- if (xlogoff + bytes_left > XLOG_SEG_SIZE)
- bytes_to_write = XLOG_SEG_SIZE - xlogoff;
- else
- bytes_to_write = bytes_left;
-
if (walfile == -1)
{
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
{
! /* Error logged by open_walfile */
goto error;
}
}
!
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
{
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
! goto error;
}
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
goto error;
! xlogoff = 0;
! if (still_sending && stream_stop(blockpos, timeline, false))
{
! if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
! {
! fprintf(stderr, _("%s: could not send copy-end packet: %s"),
! progname, PQerrorMessage(conn));
goto error;
}
- still_sending = false;
- break; /* ignore the rest of this XLogData packet */
}
}
}
! /* No more data left to write, receive next copy packet */
}
! else
{
! fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
! progname, copybuf[0]);
goto error;
}
}
--- 825,1061 ----
sleeptime = -1;
r = CopyStreamReceive(conn, sleeptime, ©buf);
! while(r > 0)
{
! /* Check the message type. */
! if (copybuf[0] == 'k')
{
! int pos;
! bool replyRequested;
!
! /*
! * Parse the keepalive message, enclosed in the CopyData message.
! * We just check if the server requested a reply, and ignore the
! * rest.
! */
! pos = 1; /* skip msgtype 'k' */
! pos += 8; /* skip walEnd */
! pos += 8; /* skip sendTime */
!
! if (r < pos + 1)
{
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
goto error;
}
! replyRequested = copybuf[pos];
!
! /* If the server requested an immediate reply, send one. */
! if (replyRequested && still_sending)
{
! now = feGetCurrentTimestamp();
! if (!sendFeedback(conn, blockpos, now, false))
goto error;
! last_status = now;
}
}
! else if (copybuf[0] == 'w')
{
! /*
! * Once we've decided we don't want to receive any more, just
! * ignore any subsequent XLogData messages.
! */
! if (!still_sending)
! break;
! /*
! * Read the header of the XLogData message, enclosed in the
! * CopyData message. We only need the WAL location field
! * (dataStart), the rest of the header is ignored.
! */
! hdr_len = 1; /* msgtype 'w' */
! hdr_len += 8; /* dataStart */
! hdr_len += 8; /* walEnd */
! hdr_len += 8; /* sendTime */
! if (r < hdr_len)
{
! fprintf(stderr, _("%s: streaming header too small: %d\n"),
! progname, r);
goto error;
}
! blockpos = fe_recvint64(©buf[1]);
! /* Extract WAL location for this block */
! xlogoff = blockpos % XLOG_SEG_SIZE;
/*
! * Verify that the initial location in the stream matches where we
! * think we are.
*/
if (walfile == -1)
{
! /* No file open yet */
! if (xlogoff != 0)
{
! fprintf(stderr,
! _("%s: received transaction log record for offset %u with no file open\n"),
! progname, xlogoff);
goto error;
}
}
! else
{
! /* More data in existing segment */
! /* XXX: store seek value don't reseek all the time */
! if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
! {
! fprintf(stderr,
! _("%s: got WAL data offset %08x, expected %08x\n"),
! progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
! goto error;
! }
}
! bytes_left = r - hdr_len;
! bytes_written = 0;
! while (bytes_left)
{
! int bytes_to_write;
!
! /*
! * If crossing a WAL boundary, only write up until we reach
! * XLOG_SEG_SIZE.
! */
! if (xlogoff + bytes_left > XLOG_SEG_SIZE)
! bytes_to_write = XLOG_SEG_SIZE - xlogoff;
! else
! bytes_to_write = bytes_left;
!
! if (walfile == -1)
! {
! if (!open_walfile(blockpos, timeline,
! basedir, partial_suffix))
! {
! /* Error logged by open_walfile */
! goto error;
! }
! }
!
! if (write(walfile,
! copybuf + hdr_len + bytes_written,
! bytes_to_write) != bytes_to_write)
! {
! fprintf(stderr,
! _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
! progname, bytes_to_write, current_walfile_name,
! strerror(errno));
goto error;
+ }
! /* Write was successful, advance our position */
! bytes_written += bytes_to_write;
! bytes_left -= bytes_to_write;
! blockpos += bytes_to_write;
! xlogoff += bytes_to_write;
! /* Did we reach the end of a WAL segment? */
! if (blockpos % XLOG_SEG_SIZE == 0)
{
! if (!close_walfile(basedir, partial_suffix, blockpos))
! /* Error message written in close_walfile() */
goto error;
+
+ xlogoff = 0;
+
+ if (still_sending && stream_stop(blockpos, timeline, false))
+ {
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ goto error;
+ }
+ still_sending = false;
+ break; /* ignore the rest of this XLogData packet */
}
}
}
+ /* No more data left to write, receive next copy packet */
+ }
+ else
+ {
+ fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+ progname, copybuf[0]);
+ goto error;
}
! r = CopyStreamReceive(conn, -1, ©buf);
}
! if (r == 0)
{
! /* --fsync-interval argument has been specified */
! if (fsync_interval >= 0)
! {
! /* interval has been specified */
! if (fsync_interval > 0)
! {
! now = feGetCurrentTimestamp();
! if (!feTimestampDifferenceExceeds(output_last_fsync, now, fsync_interval))
! continue;
! output_last_fsync = now;
! }
! /* check the need for flush */
! if (walfile != -1 && lastFlushPosition < blockpos)
! {
! if (fsync(walfile) != 0)
! {
! fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
! progname, current_walfile_name, strerror(errno));
! goto error;
! }
! lastFlushPosition = blockpos;
! }
! }
! continue;
! }
! if (r == -1)
goto error;
+ if (r == -2)
+ {
+ PGresult *res = PQgetResult(conn);
+
+ /*
+ * The server closed its end of the copy stream. If we haven't
+ * closed ours already, we need to do so now, unless the server
+ * threw an error, in which case we don't.
+ */
+ if (still_sending)
+ {
+ if (!close_walfile(basedir, partial_suffix, blockpos))
+ {
+ /* Error message written in close_walfile() */
+ PQclear(res);
+ goto error;
+ }
+ if (PQresultStatus(res) == PGRES_COPY_IN)
+ {
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ {
+ fprintf(stderr,
+ _("%s: could not send copy-end packet: %s"),
+ progname, PQerrorMessage(conn));
+ PQclear(res);
+ goto error;
+ }
+ res = PQgetResult(conn);
+ }
+ still_sending = false;
+ }
+ if (copybuf != NULL)
+ PQfreemem(copybuf);
+ copybuf = NULL;
+ *stoppos = blockpos;
+ return res;
}
}
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 16,19 **** extern bool ReceiveXlogStream(PGconn *conn,
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix);
--- 16,20 ----
char *basedir,
stream_stop_callback stream_stop,
int standby_message_timeout,
! char *partial_suffix,
! int fsync_interval);