*** a/doc/src/sgml/ref/pg_basebackup.sgml --- b/doc/src/sgml/ref/pg_basebackup.sgml *************** *** 143,150 **** PostgreSQL documentation ! ! Includes the required transaction log files (WAL files) in the --- 143,150 ---- ! ! Includes the required transaction log files (WAL files) in the *************** *** 154,169 **** PostgreSQL documentation to consult the log archive, thus making this a completely standalone backup. ! ! ! The transaction log files are collected at the end of the backup. ! Therefore, it is necessary for the ! parameter to be set high ! enough that the log is not removed before the end of the backup. ! If the log has been rotated when it's time to transfer it, the ! backup will fail and be unusable. ! ! --- 154,196 ---- to consult the log archive, thus making this a completely standalone backup. ! ! The following methods for collecting the transaction logs are ! supported: ! ! ! ! f ! fetch ! ! ! The transaction log files are collected at the end of the backup. ! Therefore, it is necessary for the ! parameter to be set high ! enough that the log is not removed before the end of the backup. ! If the log has been rotated when it's time to transfer it, the ! backup will fail and be unusable. ! ! ! ! ! ! s ! stream ! ! ! Stream the transaction log while the backup is created. This will ! open a second connection to the server and start streaming the ! transaction log in parallel while running the backup. Therefore, ! it will use up two slots configured by the ! parameter. As long as the ! client can keep up with transaction log received, using this mode ! requires no extra transaction logs to be saved on the master. ! ! ! ! ! *************** *** 261,266 **** PostgreSQL documentation --- 288,307 ---- + + + + + Specifies the number of seconds between status packets sent back to the + server. This is required when streaming the transaction log (using + --xlog=stream) if replication timeout is configured + on the server, and allows for easier monitoring. The default value is + 10 seconds. + + + + + *** /dev/null --- b/doc/src/sgml/ref/pg_receivexlog.sgml *************** *** 0 **** --- 1,270 ---- + + + + + pg_receivexlog + 1 + Application + + + + pg_receivexlog + streams transaction logs from a PostgreSQL cluster + + + + pg_receivexlog + + + + + pg_receivexlog + option + + + + + + Description + + + pg_receivexlog is used to stream transaction log + from a running PostgreSQL cluster. The transaction + log is streamed using the streaming replication protocol, and is written + to a local directory of files. This directory can be used as the archive + location for doing a restore using point-in-time recovery (see + ). + + + + pg_receivexlog streams the transaction + log in real time as it's being generated on the server, and does not wait + for segments to complete like does. + For this reason, it is not necessary to set + when using + pg_receivexlog. + + + + The transaction log is streamed over a regular + PostgreSQL connection, and uses the + replication protocol. The connection must be + made with a user having REPLICATION permissions (see + ), and the user must be granted explicit + permissions in pg_hba.conf. The server must also + be configured with set high enough + to leave at least one session available for the stream. + + + + + Options + + + The following command-line options control the location and format of the + output. + + + + + + + + Directory to write the output to. + + + This parameter is required. + + + + + + + The following command-line options control the running of the program. + + + + + + + + Enables verbose mode. + + + + + + + + + The following command-line options control the database connection parameters. + + + + + + + + Specifies the number of seconds between status packets sent back to the + server. This is required if replication timeout is configured on the + server, and allows for easier monitoring. The default value is + 10 seconds. + + + + + + + + + + Specifies the host name of the machine on which the server is + running. If the value begins with a slash, it is used as the + directory for the Unix domain socket. The default is taken + from the PGHOST environment variable, if set, + else a Unix domain socket connection is attempted. + + + + + + + + + + Specifies the TCP port or local Unix domain socket file + extension on which the server is listening for connections. + Defaults to the PGPORT environment variable, if + set, or a compiled-in default. + + + + + + + + + + User name to connect as. + + + + + + + + + + Never issue a password prompt. If the server requires + password authentication and a password is not available by + other means such as a .pgpass file, the + connection attempt will fail. This option can be useful in + batch jobs and scripts where no user is present to enter a + password. + + + + + + + + + + Force pg_basebackup to prompt for a + password before connecting to a database. + + + + This option is never essential, since + pg_bsaebackup will automatically prompt + for a password if the server demands password authentication. + However, pg_basebackup will waste a + connection attempt finding out that the server wants a password. + In some cases it is worth typing + + + + + + + Other, less commonly used, parameters are also available: + + + + + + + + Print the pg_receivexlog version and exit. + + + + + + + + + + Show help about pg_receivexlog command line + arguments, and exit. + + + + + + + + + + + Environment + + + This utility, like most other PostgreSQL utilities, + uses the environment variables supported by libpq + (see ). + + + + + + Notes + + + When using pg_receivexlog instead of + , the server will continue to + recycle transaction log files even if the backups are not properly + archived, since there is no command that fails. This can be worked + around by having an that fails + when the file has not been properly archived yet. + + + + + + Examples + + + To stream the transaction log from the server at + mydbserver and store it in the local directory + /usr/local/pgsql/archive: + + $ pg_receivexlog -h mydbserver -D /usr/local/pgsql/archive + + + + + + See Also + + + + + + + *** a/doc/src/sgml/reference.sgml --- b/doc/src/sgml/reference.sgml *************** *** 212,217 **** --- 212,218 ---- &pgConfig; &pgDump; &pgDumpall; + &pgReceivexlog; &pgRestore; &psqlRef; &reindexdb; *** a/src/bin/pg_basebackup/.gitignore --- b/src/bin/pg_basebackup/.gitignore *************** *** 1 **** --- 1,2 ---- /pg_basebackup + /pg_receivexlog *** a/src/bin/pg_basebackup/Makefile --- b/src/bin/pg_basebackup/Makefile *************** *** 18,38 **** include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) ! OBJS= pg_basebackup.o $(WIN32RES) ! all: pg_basebackup ! pg_basebackup: $(OBJS) | submake-libpq submake-libpgport ! $(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)' uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' clean distclean maintainer-clean: ! rm -f pg_basebackup$(X) $(OBJS) --- 18,43 ---- override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) ! OBJS=receivelog.o streamutil.o $(WIN32RES) ! all: pg_basebackup pg_receivexlog ! pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport ! $(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) ! ! pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport ! $(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X) install: all installdirs $(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + $(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' installdirs: $(MKDIR_P) '$(DESTDIR)$(bindir)' uninstall: rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)' + rm -f '$(DESTDIR)$(bindir)/pg_receivexlog$(X)' clean distclean maintainer-clean: ! rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o *** a/src/bin/pg_basebackup/pg_basebackup.c --- b/src/bin/pg_basebackup/pg_basebackup.c *************** *** 11,22 **** *------------------------------------------------------------------------- */ ! #include "postgres_fe.h" #include "libpq-fe.h" #include #include #include #ifdef HAVE_LIBZ #include --- 11,30 ---- *------------------------------------------------------------------------- */ ! /* ! * We have to use postgres.h not postgres_fe.h here, because there's so much ! * backend-only stuff in the XLOG include files we need. But we need a ! * frontend-ish environment otherwise. Hence this ugly hack. ! */ ! #define FRONTEND 1 ! #include "postgres.h" #include "libpq-fe.h" #include #include #include + #include + #include #ifdef HAVE_LIBZ #include *************** *** 24,32 **** #include "getopt_long.h" /* Global options */ - static const char *progname; char *basedir = NULL; char format = 'p'; /* p(lain)/t(ar) */ char *label = "pg_basebackup base backup"; --- 32,42 ---- #include "getopt_long.h" + #include "receivelog.h" + #include "streamutil.h" + /* Global options */ char *basedir = NULL; char format = 'p'; /* p(lain)/t(ar) */ char *label = "pg_basebackup base backup"; *************** *** 34,71 **** bool showprogress = false; int verbose = 0; int compresslevel = 0; bool includewal = false; bool fastcheckpoint = false; ! char *dbhost = NULL; ! char *dbuser = NULL; ! char *dbport = NULL; ! int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ /* Progress counters */ static uint64 totalsize; static uint64 totaldone; static int tablespacecount; ! /* Connection kept global so we can disconnect easily */ ! static PGconn *conn = NULL; ! #define disconnect_and_exit(code) \ ! { \ ! if (conn != NULL) PQfinish(conn); \ ! exit(code); \ ! } /* Function headers */ - static char *xstrdup(const char *s); - static void *xmalloc0(int size); static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); - static PGconn *GetConnection(void); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); #ifdef HAVE_LIBZ static const char * get_gz_error(gzFile *gzf) --- 44,81 ---- int verbose = 0; int compresslevel = 0; bool includewal = false; + bool streamwal = false; bool fastcheckpoint = false; ! int standby_message_timeout = 10; /* 10 sec = default */ /* Progress counters */ static uint64 totalsize; static uint64 totaldone; static int tablespacecount; ! /* Pipe to communicate with background wal receiver process */ ! #ifndef WIN32 ! static int bgpipe[2] = {-1, -1}; ! #endif ! /* Handle to child process */ ! static pid_t bgchild = -1; ! ! /* End position for xlog streaming, empty string if unknown yet */ ! static XLogRecPtr xlogendptr; ! static int has_xlogendptr = 0; /* Function headers */ static void usage(void); static void verify_dir_is_empty_or_create(char *dirname); static void progress_report(int tablespacenum, const char *filename); static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum); static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum); static void BaseBackup(void); + static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + #ifdef HAVE_LIBZ static const char * get_gz_error(gzFile *gzf) *************** *** 81,119 **** get_gz_error(gzFile *gzf) } #endif - /* - * strdup() and malloc() replacements that prints an error and exits - * if something goes wrong. Can never return NULL. - */ - static char * - xstrdup(const char *s) - { - char *result; - - result = strdup(s); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - return result; - } - - static void * - xmalloc0(int size) - { - void *result; - - result = malloc(size); - if (!result) - { - fprintf(stderr, _("%s: out of memory\n"), progname); - exit(1); - } - MemSet(result, 0, size); - return result; - } - static void usage(void) --- 91,96 ---- *************** *** 125,131 **** usage(void) printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain, tar)\n")); ! printf(_(" -x, --xlog include required WAL files in backup\n")); printf(_(" -z, --gzip compress tar output\n")); printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n")); printf(_("\nGeneral options:\n")); --- 102,108 ---- printf(_("\nOptions controlling the output:\n")); printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n")); printf(_(" -F, --format=p|t output format (plain, tar)\n")); ! printf(_(" -x, --xlog=fetch|stream include required WAL files in backup\n")); printf(_(" -z, --gzip compress tar output\n")); printf(_(" -Z, --compress=0-9 compress tar output with given compression level\n")); printf(_("\nGeneral options:\n")); *************** *** 137,142 **** usage(void) --- 114,120 ---- printf(_(" --help show this help, then exit\n")); printf(_(" --version output version information, then exit\n")); printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); printf(_(" -p, --port=PORT database server port number\n")); printf(_(" -U, --username=NAME connect as specified database user\n")); *************** *** 147,152 **** usage(void) --- 125,321 ---- /* + * Called in the background process whenever a complete segment of WAL + * has been received. + * On Unix, we check to see if there is any data on our pipe + * (which would mean we have a stop position), and if it is, check if + * it is time to stop. + * On Windows, we are in a single process, so we can just check if it's + * time to stop. + */ + static bool + segment_callback(XLogRecPtr segendpos, uint32 timeline) + { + if (!has_xlogendptr) + { + #ifndef WIN32 + fd_set fds; + struct timeval tv; + int r; + + /* + * Don't have the end pointer yet - check our pipe to see if it has + * been sent yet. + */ + FD_ZERO(&fds); + FD_SET(bgpipe[0], &fds); + + MemSet(&tv, 0, sizeof(tv)); + + r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv); + if (r == 1) + { + char xlogend[64]; + + MemSet(xlogend, 0, sizeof(xlogend)); + r = piperead(bgpipe[0], xlogend, sizeof(xlogend)); + if (r < 0) + { + fprintf(stderr, _("%s: could not read from ready pipe: %s\n"), + progname, strerror(errno)); + exit(1); + } + + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + has_xlogendptr = 1; + + /* + * Fall through to check if we've reached the point further + * already. + */ + } + else + { + /* + * No data received on the pipe means we don't know the end + * position yet - so just say it's not time to stop yet. + */ + return false; + } + #else + + /* + * On win32, has_xlogendptr is set by the main thread, so if it's not + * set here, we just go back and wait until it shows up. + */ + return false; + #endif + } + + /* + * At this point we have an end pointer, so compare it to the current + * position to figure out if it's time to stop. + */ + if (segendpos.xlogid > xlogendptr.xlogid || + (segendpos.xlogid == xlogendptr.xlogid && + segendpos.xrecoff >= xlogendptr.xrecoff)) + return true; + + /* + * Have end pointer, but haven't reached it yet - so tell the caller to + * keep streaming. + */ + return false; + } + + typedef struct + { + PGconn *bgconn; + XLogRecPtr startptr; + char xlogdir[MAXPGPATH]; + int timeline; + } logstreamer_param; + + static int + LogStreamerMain(logstreamer_param * param) + { + if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline, + param->xlogdir, segment_callback, NULL, + standby_message_timeout)) + + /* + * Any errors will already have been reported in the function process, + * but we need to tell the parent that we didn't shutdown in a nice + * way. + */ + return 1; + + PQfinish(param->bgconn); + return 0; + } + + /* + * Initiate background process for receiving xlog during the backup. + * The background stream will use its own database connection so we can + * stream the logfile in parallel with the backups. + */ + static void + StartLogStreamer(char *startpos, uint32 timeline) + { + logstreamer_param *param; + + param = xmalloc0(sizeof(logstreamer_param)); + param->timeline = timeline; + + /* Convert the starting position */ + if (sscanf(startpos, "%X/%X", ¶m->startptr.xlogid, ¶m->startptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: invalid format of xlog location: %s\n"), + progname, startpos); + disconnect_and_exit(1); + } + /* Round off to even segment position */ + param->startptr.xrecoff -= param->startptr.xrecoff % XLOG_SEG_SIZE; + + #ifndef WIN32 + /* Create our background pipe */ + if (pgpipe(bgpipe) < 0) + { + fprintf(stderr, _("%s: could not create pipe for background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + #endif + + /* Get a second connection */ + param->bgconn = GetConnection(); + + /* + * Always in plain format, so we can write to basedir/pg_xlog. But the + * directory entry in the tar file may arrive later, so make sure it's + * created before we start. + */ + snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir); + verify_dir_is_empty_or_create(param->xlogdir); + + /* + * Start a child process and tell it to start streaming. On Unix, this is + * a fork(). On Windows, we create a thread. + */ + #ifndef WIN32 + bgchild = fork(); + if (bgchild == 0) + { + /* in child process */ + exit(LogStreamerMain(param)); + } + else if (bgchild < 0) + { + fprintf(stderr, _("%s: could not create background process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* + * Else we are in the parent process and all is well. + */ + #else /* WIN32 */ + bgchild = _beginthreadex(NULL, 0, (void *) LogStreamerMain, param, 0, NULL); + if (bgchild == 0) + { + fprintf(stderr, _("%s: could not create background thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + #endif + } + + /* * Verify that the given directory exists and is empty. If it does not * exist, it is created. If it exists but is not empty, an error will * be give and the process ended. *************** *** 492,502 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) strcpy(current_path, PQgetvalue(res, rownum, 1)); /* - * Make sure we're unpacking into an empty directory - */ - verify_dir_is_empty_or_create(current_path); - - /* * Get the COPY data */ res = PQgetResult(conn); --- 661,666 ---- *************** *** 586,598 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Directory */ ! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (mkdir(filename, S_IRWXU) != 0) { ! fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), ! progname, filename, strerror(errno)); ! disconnect_and_exit(1); } #ifndef WIN32 if (chmod(filename, (mode_t) filemode)) --- 750,770 ---- /* * Directory */ ! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (mkdir(filename, S_IRWXU) != 0) { ! /* ! * When streaming WAL, pg_xlog will have been created ! * by the wal receiver process, so just ignore failure ! * on that. ! */ ! if (!streamwal || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0) ! { ! fprintf(stderr, _("%s: could not create directory \"%s\": %s\n"), ! progname, filename, strerror(errno)); ! disconnect_and_exit(1); ! } } #ifndef WIN32 if (chmod(filename, (mode_t) filemode)) *************** *** 605,616 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) /* * Symbolic link */ ! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (symlink(©buf[157], filename) != 0) { fprintf(stderr, _("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"), ! progname, filename, ©buf[157], strerror(errno)); disconnect_and_exit(1); } } --- 777,788 ---- /* * Symbolic link */ ! filename[strlen(filename) - 1] = '\0'; /* Remove trailing slash */ if (symlink(©buf[157], filename) != 0) { fprintf(stderr, _("%s: could not create symbolic link from \"%s\" to \"%s\": %s\n"), ! progname, filename, ©buf[157], strerror(errno)); disconnect_and_exit(1); } } *************** *** 703,796 **** ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum) } - static PGconn * - GetConnection(void) - { - PGconn *tmpconn; - int argcount = 4; /* dbname, replication, fallback_app_name, - * password */ - int i; - const char **keywords; - const char **values; - char *password = NULL; - - if (dbhost) - argcount++; - if (dbuser) - argcount++; - if (dbport) - argcount++; - - keywords = xmalloc0((argcount + 1) * sizeof(*keywords)); - values = xmalloc0((argcount + 1) * sizeof(*values)); - - keywords[0] = "dbname"; - values[0] = "replication"; - keywords[1] = "replication"; - values[1] = "true"; - keywords[2] = "fallback_application_name"; - values[2] = progname; - i = 3; - if (dbhost) - { - keywords[i] = "host"; - values[i] = dbhost; - i++; - } - if (dbuser) - { - keywords[i] = "user"; - values[i] = dbuser; - i++; - } - if (dbport) - { - keywords[i] = "port"; - values[i] = dbport; - i++; - } - - while (true) - { - if (dbgetpassword == 1) - { - /* Prompt for a password */ - password = simple_prompt(_("Password: "), 100, false); - keywords[argcount - 1] = "password"; - values[argcount - 1] = password; - } - - tmpconn = PQconnectdbParams(keywords, values, true); - if (password) - free(password); - - if (PQstatus(tmpconn) == CONNECTION_BAD && - PQconnectionNeedsPassword(tmpconn) && - dbgetpassword != -1) - { - dbgetpassword = 1; /* ask for password next time */ - PQfinish(tmpconn); - continue; - } - - if (PQstatus(tmpconn) != CONNECTION_OK) - { - fprintf(stderr, _("%s: could not connect to server: %s"), - progname, PQerrorMessage(tmpconn)); - exit(1); - } - - /* Connection ok! */ - free(values); - free(keywords); - return tmpconn; - } - } - static void BaseBackup(void) { PGresult *res; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; --- 875,885 ---- } static void BaseBackup(void) { PGresult *res; + uint32 timeline; char current_path[MAXPGPATH]; char escaped_label[MAXPGPATH]; int i; *************** *** 803,815 **** BaseBackup(void) conn = GetConnection(); /* * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s", escaped_label, showprogress ? "PROGRESS" : "", ! includewal ? "WAL" : "", fastcheckpoint ? "FAST" : "", includewal ? "NOWAIT" : ""); --- 892,923 ---- conn = GetConnection(); /* + * Run IDENFITY_SYSTEM so we can get the timeline + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + timeline = atoi(PQgetvalue(res, 0, 1)); + PQclear(res); + + /* * Start the actual backup */ PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i); snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s", escaped_label, showprogress ? "PROGRESS" : "", ! includewal && !streamwal ? "WAL" : "", fastcheckpoint ? "FAST" : "", includewal ? "NOWAIT" : ""); *************** *** 888,893 **** BaseBackup(void) --- 996,1013 ---- } /* + * If we're streaming WAL, start the streaming session before we start + * receiving the actual data chunks. + */ + if (streamwal) + { + if (verbose) + fprintf(stderr, _("%s: starting background WAL receiver\n"), + progname); + StartLogStreamer(xlogstart, timeline); + } + + /* * Start receiving chunks */ for (i = 0; i < PQntuples(res); i++) *************** *** 934,939 **** BaseBackup(void) --- 1054,1145 ---- disconnect_and_exit(1); } + if (bgchild > 0) + { + int status; + + #ifndef WIN32 + int r; + #endif + + if (verbose) + fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname); + + #ifndef WIN32 + if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend)) + { + fprintf(stderr, _("%s: could not send command to background pipe: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + + /* Just wait for the background process to exit */ + r = waitpid(bgchild, &status, 0); + if (r == -1) + { + fprintf(stderr, _("%s: could not wait for child process: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (r != bgchild) + { + fprintf(stderr, _("%s: child %i died, expected %i\n"), + progname, r, bgchild); + disconnect_and_exit(1); + } + if (!WIFEXITED(status)) + { + fprintf(stderr, _("%s: child process did not exit normally\n"), + progname); + disconnect_and_exit(1); + } + if (WEXITSTATUS(status) != 0) + { + fprintf(stderr, _("%s: child process exited with error %i\n"), + progname, WEXITSTATUS(status)); + disconnect_and_exit(1); + } + /* Exited normally, we're happy! */ + #else /* WIN32 */ + + /* + * On Windows, since we are in the same process, we can just store the + * value directly in the variable, and then set the flag that says + * it's there. + */ + if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"), + progname, xlogend); + exit(1); + } + InterlockedIncrement(&has_xlogendptr); + + /* First wait for the thread to exit */ + if (WaitForSingleObjectEx((HANDLE) bgchild, INFINITE, FALSE) != WAIT_OBJECT_0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not wait for child thread: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (GetExitCodeThread((HANDLE) bgchild, &status) == 0) + { + _dosmaperr(GetLastError()); + fprintf(stderr, _("%s: could not get child thread exit status: %s\n"), + progname, strerror(errno)); + disconnect_and_exit(1); + } + if (status != 0) + { + fprintf(stderr, _("%s: child thread exited with error %u\n"), + progname, status); + disconnect_and_exit(1); + } + /* Exited normally, we're happy */ + #endif + } + /* * End of copy data. Final result is already checked inside the loop. */ *************** *** 953,959 **** main(int argc, char **argv) {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, ! {"xlog", no_argument, NULL, 'x'}, {"gzip", no_argument, NULL, 'z'}, {"compress", required_argument, NULL, 'Z'}, {"label", required_argument, NULL, 'l'}, --- 1159,1165 ---- {"pgdata", required_argument, NULL, 'D'}, {"format", required_argument, NULL, 'F'}, {"checkpoint", required_argument, NULL, 'c'}, ! {"xlog", required_argument, NULL, 'x'}, {"gzip", no_argument, NULL, 'z'}, {"compress", required_argument, NULL, 'Z'}, {"label", required_argument, NULL, 'l'}, *************** *** 962,967 **** main(int argc, char **argv) --- 1168,1174 ---- {"username", required_argument, NULL, 'U'}, {"no-password", no_argument, NULL, 'w'}, {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, {"verbose", no_argument, NULL, 'v'}, {"progress", no_argument, NULL, 'P'}, {NULL, 0, NULL, 0} *************** *** 988,994 **** main(int argc, char **argv) } } ! while ((c = getopt_long(argc, argv, "D:F:xl:zZ:c:h:p:U:wWvP", long_options, &option_index)) != -1) { switch (c) --- 1195,1201 ---- } } ! while ((c = getopt_long(argc, argv, "D:F:x:l:zZ:c:h:p:U:s:wWvP", long_options, &option_index)) != -1) { switch (c) *************** *** 1010,1015 **** main(int argc, char **argv) --- 1217,1234 ---- break; case 'x': includewal = true; + if (strcmp(optarg, "f") == 0 || + strcmp(optarg, "fetch") == 0) + streamwal = false; + else if (strcmp(optarg, "s") == 0 || + strcmp(optarg, "stream") == 0) + streamwal = true; + else + { + fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty, \"fetch\" or \"stream\"\n"), + progname, optarg); + exit(1); + } break; case 'l': label = xstrdup(optarg); *************** *** 1057,1062 **** main(int argc, char **argv) --- 1276,1290 ---- case 'W': dbgetpassword = 1; break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; case 'v': verbose++; break; *************** *** 1111,1116 **** main(int argc, char **argv) --- 1339,1354 ---- exit(1); } + if (format != 'p' && streamwal) + { + fprintf(stderr, + _("%s: wal streaming can only be used in plain mode\n"), + progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + #ifndef HAVE_LIBZ if (compresslevel != 0) { *** /dev/null --- b/src/bin/pg_basebackup/pg_receivexlog.c *************** *** 0 **** --- 1,440 ---- + /*------------------------------------------------------------------------- + * + * pg_receivexlog.c - receive streaming transaction log data and write it + * to a local file. + * + * Author: Magnus Hagander + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/pg_receivexlog.c + *------------------------------------------------------------------------- + */ + + /* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ + #define FRONTEND 1 + #include "postgres.h" + #include "libpq-fe.h" + #include "libpq/pqsignal.h" + #include "access/xlog_internal.h" + + #include "receivelog.h" + #include "streamutil.h" + + #include + #include + #include + #include + + #include "getopt_long.h" + + /* Global options */ + char *basedir = NULL; + int verbose = 0; + int standby_message_timeout = 10; /* 10 sec = default */ + volatile bool time_to_abort = false; + + + static void usage(void); + static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline); + static void StreamLog(); + static bool segment_callback(XLogRecPtr segendpos, uint32 timeline); + + static void + usage(void) + { + printf(_("%s receives PostgreSQL streaming transaction logs\n\n"), + progname); + printf(_("Usage:\n")); + printf(_(" %s [OPTION]...\n"), progname); + printf(_("\nOptions controlling the output:\n")); + printf(_(" -D, --dir=directory receive xlog files into this directory\n")); + printf(_("\nGeneral options:\n")); + printf(_(" -v, --verbose output verbose messages\n")); + printf(_(" -?, --help show this help, then exit\n")); + printf(_(" -V, --version output version information, then exit\n")); + printf(_("\nConnection options:\n")); + printf(_(" -s, --statusint=INTERVAL time between status packets sent to server (in seconds)\n")); + printf(_(" -h, --host=HOSTNAME database server host or socket directory\n")); + printf(_(" -p, --port=PORT database server port number\n")); + printf(_(" -U, --username=NAME connect as specified database user\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt (should happen automatically)\n")); + printf(_("\nReport bugs to .\n")); + } + + static bool + segment_callback(XLogRecPtr segendpos, uint32 timeline) + { + char fn[MAXPGPATH]; + struct stat statbuf; + + if (verbose) + fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"), + progname, segendpos.xlogid, segendpos.xrecoff, timeline); + + /* + * Check if there is a partial file for the name we just finished, and if + * there is, remove it under the assumption that we have now got all the + * data we need. + */ + PrevLogSeg(segendpos.xlogid, segendpos.xrecoff); + snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial", + basedir, timeline, + segendpos.xlogid, + segendpos.xrecoff / XLOG_SEG_SIZE); + if (stat(fn, &statbuf) == 0) + { + /* File existed, get rid of it */ + if (verbose) + fprintf(stderr, _("%s: removing file \"%s\"\n"), + progname, fn); + unlink(fn); + } + + /* + * Never abort from this - we handle all aborting in continue_streaming() + */ + return false; + } + + static bool + continue_streaming() + { + if (time_to_abort) + { + fprintf(stderr, _("%s: received interrupt signal, exiting.\n"), + progname); + return true; + } + return false; + } + + /* + * Determine starting location for streaming, based on: + * 1. If there are existing xlog segments, start at the end of the last one + * 2. If the last one is a partial segment, rename it and start over, since + * we don't sync after every write. + * 3. If no existing xlog exists, start from the beginning of the current + * WAL segment. + */ + static XLogRecPtr + FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline) + { + DIR *dir; + struct dirent *dirent; + int i; + bool b; + XLogRecPtr high = {0, 0}; + + dir = opendir(basedir); + if (dir == NULL) + { + fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"), + progname, basedir, strerror(errno)); + disconnect_and_exit(1); + } + + while ((dirent = readdir(dir)) != NULL) + { + char fullpath[MAXPGPATH]; + struct stat statbuf; + uint32 tli, + log, + seg; + + if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, "..")) + continue; + + /* xlog files are always 24 characters */ + if (strlen(dirent->d_name) != 24) + continue; + + /* Filenames are always made out of 0-9 and A-F */ + b = false; + for (i = 0; i < 24; i++) + { + if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') && + !(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F')) + { + b = true; + break; + } + } + if (b) + continue; + + /* + * Looks like an xlog file. Parse it's position. + */ + if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3) + { + fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"), + progname, dirent->d_name); + disconnect_and_exit(1); + } + log *= XLOG_SEG_SIZE; + + /* Ignore any files that are for another timeline */ + if (tli != currenttimeline) + continue; + + /* Check if this is a completed segment or not */ + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + if (stat(fullpath, &statbuf) != 0) + { + fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + + if (statbuf.st_size == 16 * 1024 * 1024) + { + /* Completed segment */ + if (log > high.xlogid || + (log == high.xlogid && seg > high.xrecoff)) + { + high.xlogid = log; + high.xrecoff = seg; + continue; + } + } + else + { + /* + * This is a partial file. Rename it out of the way. + */ + char newfn[MAXPGPATH]; + + fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"), + progname, dirent->d_name, dirent->d_name); + + snprintf(newfn, sizeof(newfn), "%s/%s.partial", + basedir, dirent->d_name); + + if (stat(newfn, &statbuf) == 0) + { + fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"), + progname, newfn); + disconnect_and_exit(1); + } + if (rename(fullpath, newfn) != 0) + { + fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"), + progname, fullpath, newfn, strerror(errno)); + disconnect_and_exit(1); + } + + /* Don't continue looking for more, we assume this is the last */ + break; + } + } + + closedir(dir); + + if (high.xlogid > 0 && high.xrecoff > 0) + return high; + + return currentpos; + } + + /* + * Start the log streaming + */ + static void + StreamLog(void) + { + PGresult *res; + uint32 timeline; + XLogRecPtr startpos; + + /* + * Connect in replication mode to the server + */ + conn = GetConnection(); + + /* + * Run IDENFITY_SYSTEM so we can get the timeline and current xlog + * position. + */ + res = PQexec(conn, "IDENTIFY_SYSTEM"); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + fprintf(stderr, _("%s: could not identify system: %s\n"), + progname, PQerrorMessage(conn)); + disconnect_and_exit(1); + } + if (PQntuples(res) != 1) + { + fprintf(stderr, _("%s: could not identify system, got %i rows\n"), + progname, PQntuples(res)); + disconnect_and_exit(1); + } + timeline = atoi(PQgetvalue(res, 0, 1)); + if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2) + { + fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"), + progname, PQgetvalue(res, 0, 2)); + disconnect_and_exit(1); + } + PQclear(res); + + /* + * Figure out where to start streaming. + */ + startpos = FindStreamingStart(startpos, timeline); + + /* + * Always start streaming at the beginning of a segment + */ + startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE; + + /* + * Start the replication + */ + if (verbose) + fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"), + progname, startpos.xlogid, startpos.xrecoff, timeline); + + ReceiveXlogStream(conn, startpos, timeline, basedir, + segment_callback, continue_streaming, + standby_message_timeout); + } + + /* + * When sigint is called, just tell the system to exit at the next possible + * moment. + */ + static void + sigint_handler(int signum) + { + time_to_abort = true; + } + + int + main(int argc, char **argv) + { + static struct option long_options[] = { + {"help", no_argument, NULL, '?'}, + {"version", no_argument, NULL, 'V'}, + {"dir", required_argument, NULL, 'D'}, + {"host", required_argument, NULL, 'h'}, + {"port", required_argument, NULL, 'p'}, + {"username", required_argument, NULL, 'U'}, + {"no-password", no_argument, NULL, 'w'}, + {"password", no_argument, NULL, 'W'}, + {"statusint", required_argument, NULL, 's'}, + {"verbose", no_argument, NULL, 'v'}, + {NULL, 0, NULL, 0} + }; + int c; + + int option_index; + + progname = get_progname(argv[0]); + set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog")); + + if (argc > 1) + { + if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) + { + usage(); + exit(0); + } + else if (strcmp(argv[1], "-V") == 0 + || strcmp(argv[1], "--version") == 0) + { + puts("pg_receivexlog (PostgreSQL) " PG_VERSION); + exit(0); + } + } + + while ((c = getopt_long(argc, argv, "D:h:p:U:s:wWv", + long_options, &option_index)) != -1) + { + switch (c) + { + case 'D': + basedir = xstrdup(optarg); + break; + case 'h': + dbhost = xstrdup(optarg); + break; + case 'p': + if (atoi(optarg) <= 0) + { + fprintf(stderr, _("%s: invalid port number \"%s\"\n"), + progname, optarg); + exit(1); + } + dbport = xstrdup(optarg); + break; + case 'U': + dbuser = xstrdup(optarg); + break; + case 'w': + dbgetpassword = -1; + break; + case 'W': + dbgetpassword = 1; + break; + case 's': + standby_message_timeout = atoi(optarg); + if (standby_message_timeout < 0) + { + fprintf(stderr, _("%s: invalid status interval \"%s\"\n"), + progname, optarg); + exit(1); + } + break; + case 'v': + verbose++; + break; + default: + + /* + * getopt_long already emitted a complaint + */ + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + } + + /* + * Any non-option arguments? + */ + if (optind < argc) + { + fprintf(stderr, + _("%s: too many command-line arguments (first is \"%s\")\n"), + progname, argv[optind]); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + /* + * Required arguments + */ + if (basedir == NULL) + { + fprintf(stderr, _("%s: no target directory specified\n"), progname); + fprintf(stderr, _("Try \"%s --help\" for more information.\n"), + progname); + exit(1); + } + + #ifndef WIN32 + pqsignal(SIGINT, sigint_handler); + #endif + + StreamLog(); + + exit(0); + } *** /dev/null --- b/src/bin/pg_basebackup/receivelog.c *************** *** 0 **** --- 1,353 ---- + /*------------------------------------------------------------------------- + * + * receivelog.c - receive transaction log files using the streaming + * replication protocol. + * + * Author: Magnus Hagander + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/receivelog.c + *------------------------------------------------------------------------- + */ + + /* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ + #define FRONTEND 1 + #include "postgres.h" + #include "libpq-fe.h" + #include "access/xlog_internal.h" + #include "replication/walprotocol.h" + #include "utils/datetime.h" + + #include "receivelog.h" + #include "streamutil.h" + + #include + #include + #include + + + /* Size of the streaming replication protocol header */ + #define STREAMING_HEADER_SIZE (1+8+8+8) + + const XLogRecPtr InvalidXLogRecPtr = {0, 0}; + + /* + * Open a new WAL file in the specified directory. Store the name + * (not including the full directory) in namebuf. Assumes there is + * enough room in this buffer... + */ + static int + open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf) + { + int f; + char fn[MAXPGPATH]; + + XLogFileName(namebuf, timeline, startpoint.xlogid, + startpoint.xrecoff / XLOG_SEG_SIZE); + + snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf); + f = open(fn, O_WRONLY | O_CREAT | O_EXCL | PG_BINARY, 0666); + if (f == -1) + fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"), + progname, namebuf, strerror(errno)); + return f; + } + + /* + * Local version of GetCurrentTimestamp(), since we are not linked with + * backend code. + */ + static TimestampTz + localGetCurrentTimestamp(void) + { + TimestampTz result; + struct timeval tp; + + gettimeofday(&tp, NULL); + + result = (TimestampTz) tp.tv_sec - + ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY); + + #ifdef HAVE_INT64_TIMESTAMP + result = (result * USECS_PER_SEC) + tp.tv_usec; + #else + result = result + (tp.tv_usec / 1000000.0); + #endif + + return result; + } + + /* + * Receive a log stream starting at the specified position. + * + * Note: The log position *must* be at a log segment start! + */ + bool + ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish, stream_continue_callback stream_continue, int standby_message_timeout) + { + char query[128]; + char current_walfile_name[MAXPGPATH]; + PGresult *res; + char *copybuf = NULL; + int walfile = -1; + int64 last_status = -1; + XLogRecPtr blockpos = InvalidXLogRecPtr; + + /* Initiate the replication stream at specified location */ + snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff); + res = PQexec(conn, query); + if (PQresultStatus(res) != PGRES_COPY_BOTH) + { + fprintf(stderr, _("%s: could not start replication: %s\n"), + progname, PQresultErrorMessage(res)); + return false; + } + PQclear(res); + + /* + * Receive the actual xlog data + */ + while (1) + { + int r; + int xlogoff; + int bytes_left; + int bytes_written; + int64 now; + + if (copybuf != NULL) + { + PQfreemem(copybuf); + copybuf = NULL; + } + + /* + * Check if we should continue streaming, or abort at this point. + */ + if (stream_continue && stream_continue()) + { + if (walfile != -1) + { + fsync(walfile); + close(walfile); + } + return true; + } + + /* + * Potentially send a status message to the master + */ + now = localGetCurrentTimestamp(); + if (standby_message_timeout > 0 && + last_status < now - standby_message_timeout * 1000000) + { + /* Time to send feedback! */ + char replybuf[sizeof(StandbyReplyMessage) + 1]; + StandbyReplyMessage *replymsg = (StandbyReplyMessage *) (replybuf + 1); + + replymsg->write = blockpos; + replymsg->flush = InvalidXLogRecPtr; + replymsg->apply = InvalidXLogRecPtr; + replymsg->sendTime = now; + replybuf[0] = 'r'; + + if (PQputCopyData(conn, replybuf, sizeof(replybuf)) <= 0 || + PQflush(conn)) + { + fprintf(stderr, _("%s: could not send feedback packet: %s"), + progname, PQerrorMessage(conn)); + return false; + } + + last_status = now; + } + + r = PQgetCopyData(conn, ©buf, 1); + if (r == 0) + { + /* + * In async mode, and no data available. We block on reading but + * not more than the specified timeout, so that we can send a + * response back to the client. + */ + fd_set input_mask; + struct timeval timeout; + struct timeval *timeoutptr; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(conn), &input_mask); + if (standby_message_timeout) + { + timeout.tv_sec = last_status + standby_message_timeout - now - 1; + if (timeout.tv_sec <= 0) + timeout.tv_sec = 1; /* Always sleep at least 1 sec */ + timeout.tv_usec = 0; + timeoutptr = &timeout; + } + else + timeoutptr = NULL; + + r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr); + if (r == 0 || (r < 0 && errno == EINTR)) + { + /* + * Got a timeout or signal. Continue the loop and either + * deliver a status packet to the server or just go back into + * blocking. + */ + continue; + } + else if (r < 0) + { + fprintf(stderr, _("%s: select() failed: %m\n"), progname); + return false; + } + /* Else there is actually data on the socket */ + if (PQconsumeInput(conn) == 0) + { + fprintf(stderr, _("%s: could not receive data from WAL stream: %s\n"), + progname, PQerrorMessage(conn)); + return false; + } + continue; + } + if (r == -1) + /* End of copy stream */ + break; + if (r == -2) + { + fprintf(stderr, _("%s: could not read copy data: %s\n"), + progname, PQerrorMessage(conn)); + return false; + } + if (r < STREAMING_HEADER_SIZE + 1) + { + fprintf(stderr, _("%s: streaming header too small: %i\n"), + progname, r); + return false; + } + if (copybuf[0] != 'w') + { + fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"), + progname, copybuf[0]); + return false; + } + + /* Extract WAL location for this block */ + memcpy(&blockpos, copybuf + 1, 8); + xlogoff = blockpos.xrecoff % XLOG_SEG_SIZE; + + /* + * Verify that the initial location in the stream matches where we + * think we are. + */ + if (walfile == -1) + { + /* No file open yet */ + if (xlogoff != 0) + { + fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"), + progname, xlogoff); + return false; + } + } + else + { + /* More data in existing segment */ + /* XXX: store seek value don't reseek all the time */ + if (lseek(walfile, 0, SEEK_CUR) != xlogoff) + { + fprintf(stderr, _("%s: got WAL data offset %08x, expected %08x\n"), + progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR)); + return false; + } + } + + bytes_left = r - STREAMING_HEADER_SIZE; + bytes_written = 0; + + while (bytes_left) + { + int bytes_to_write; + + /* + * If crossing a WAL boundary, only write up until we reach + * XLOG_SEG_SIZE. + */ + if (xlogoff + bytes_left > XLOG_SEG_SIZE) + bytes_to_write = XLOG_SEG_SIZE - xlogoff; + else + bytes_to_write = bytes_left; + + if (walfile == -1) + { + walfile = open_walfile(blockpos, timeline, + basedir, current_walfile_name); + if (walfile == -1) + /* Error logged by open_walfile */ + return false; + } + + if (write(walfile, + copybuf + STREAMING_HEADER_SIZE + bytes_written, + bytes_to_write) != bytes_to_write) + { + fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"), + progname, + bytes_to_write, + current_walfile_name, + strerror(errno)); + return false; + } + + /* Write was successful, advance our position */ + bytes_written += bytes_to_write; + bytes_left -= bytes_to_write; + XLByteAdvance(blockpos, bytes_to_write); + xlogoff += bytes_to_write; + + /* Did we reach the end of a WAL segment? */ + if (blockpos.xrecoff % XLOG_SEG_SIZE == 0) + { + fsync(walfile); + close(walfile); + walfile = -1; + xlogoff = 0; + + if (segment_finish != NULL) + { + /* + * Callback when the segment finished, and return if it + * told us to. + */ + if (segment_finish(blockpos, timeline)) + return true; + } + } + } + /* No more data left to write, start receiving next copy packet */ + } + + /* + * The only way to get out of the loop is if the server shut down the + * replication stream. If it's a controlled shutdown, the server will send + * a shutdown message, and we'll return the latest xlog location that has + * been streamed. + */ + + res = PQgetResult(conn); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"), + progname, PQresultErrorMessage(res)); + return false; + } + PQclear(res); + return true; + } *** /dev/null --- b/src/bin/pg_basebackup/receivelog.h *************** *** 0 **** --- 1,21 ---- + #include "access/xlogdefs.h" + + /* + * Called whenever a segment is finished, return true to stop + * the streaming at this point. + */ + typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline); + + /* + * Called before trying to read more data. Return true to stop + * the streaming at this point. + */ + typedef bool (*stream_continue_callback)(void); + + bool ReceiveXlogStream(PGconn *conn, + XLogRecPtr startpos, + uint32 timeline, + char *basedir, + segment_finish_callback segment_finish, + stream_continue_callback stream_continue, + int standby_message_timeout); *** /dev/null --- b/src/bin/pg_basebackup/streamutil.c *************** *** 0 **** --- 1,165 ---- + /*------------------------------------------------------------------------- + * + * streamutil.c - utility functions for pg_basebackup and pg_receivelog + * + * Author: Magnus Hagander + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/bin/pg_basebackup/streamutil.c + *------------------------------------------------------------------------- + */ + + /* + * We have to use postgres.h not postgres_fe.h here, because there's so much + * backend-only stuff in the XLOG include files we need. But we need a + * frontend-ish environment otherwise. Hence this ugly hack. + */ + #define FRONTEND 1 + #include "postgres.h" + #include "streamutil.h" + + #include + #include + + const char *progname; + char *dbhost = NULL; + char *dbuser = NULL; + char *dbport = NULL; + int dbgetpassword = 0; /* 0=auto, -1=never, 1=always */ + static char *dbpassword = NULL; + PGconn *conn = NULL; + + /* + * strdup() and malloc() replacements that prints an error and exits + * if something goes wrong. Can never return NULL. + */ + char * + xstrdup(const char *s) + { + char *result; + + result = strdup(s); + if (!result) + { + fprintf(stderr, _("%s: out of memory\n"), progname); + exit(1); + } + return result; + } + + void * + xmalloc0(int size) + { + void *result; + + result = malloc(size); + if (!result) + { + fprintf(stderr, _("%s: out of memory\n"), progname); + exit(1); + } + MemSet(result, 0, size); + return result; + } + + + PGconn * + GetConnection(void) + { + PGconn *tmpconn; + int argcount = 4; /* dbname, replication, fallback_app_name, + * password */ + int i; + const char **keywords; + const char **values; + char *password = NULL; + + if (dbhost) + argcount++; + if (dbuser) + argcount++; + if (dbport) + argcount++; + + keywords = xmalloc0((argcount + 1) * sizeof(*keywords)); + values = xmalloc0((argcount + 1) * sizeof(*values)); + + keywords[0] = "dbname"; + values[0] = "replication"; + keywords[1] = "replication"; + values[1] = "true"; + keywords[2] = "fallback_application_name"; + values[2] = progname; + i = 3; + if (dbhost) + { + keywords[i] = "host"; + values[i] = dbhost; + i++; + } + if (dbuser) + { + keywords[i] = "user"; + values[i] = dbuser; + i++; + } + if (dbport) + { + keywords[i] = "port"; + values[i] = dbport; + i++; + } + + while (true) + { + if (password) + free(password); + + if (dbpassword) + { + /* + * We've saved a password when a previous connection succeeded, + * meaning this is the call for a second session to the same + * database, so just forcibly reuse that password. + */ + keywords[argcount - 1] = "password"; + values[argcount - 1] = dbpassword; + dbgetpassword = -1; /* Don't try again if this fails */ + } + else if (dbgetpassword == 1) + { + password = simple_prompt(_("Password: "), 100, false); + keywords[argcount - 1] = "password"; + values[argcount - 1] = password; + } + + tmpconn = PQconnectdbParams(keywords, values, true); + + if (PQstatus(tmpconn) == CONNECTION_BAD && + PQconnectionNeedsPassword(tmpconn) && + dbgetpassword != -1) + { + dbgetpassword = 1; /* ask for password next time */ + PQfinish(tmpconn); + continue; + } + + if (PQstatus(tmpconn) != CONNECTION_OK) + { + fprintf(stderr, _("%s: could not connect to server: %s\n"), + progname, PQerrorMessage(tmpconn)); + exit(1); + } + + /* Connection ok! */ + free(values); + free(keywords); + + /* Store the password for next run */ + if (password) + dbpassword = password; + return tmpconn; + } + } *** /dev/null --- b/src/bin/pg_basebackup/streamutil.h *************** *** 0 **** --- 1,22 ---- + #include "libpq-fe.h" + + extern const char *progname; + extern char *dbhost; + extern char *dbuser; + extern char *dbport; + extern int dbgetpassword; + + /* Connection kept global so we can disconnect easily */ + extern PGconn *conn; + + #define disconnect_and_exit(code) \ + { \ + if (conn != NULL) PQfinish(conn); \ + exit(code); \ + } + + + char *xstrdup(const char *s); + void *xmalloc0(int size); + + PGconn *GetConnection(void); *** a/src/tools/msvc/Mkvcbuild.pm --- b/src/tools/msvc/Mkvcbuild.pm *************** *** 304,309 **** sub mkvcbuild --- 304,316 ---- $initdb->AddLibrary('ws2_32.lib'); my $pgbasebackup = AddSimpleFrontend('pg_basebackup', 1); + $pgbasebackup->AddFile('src\bin\pg_basebackup\pg_basebackup.c'); + $pgbasebackup->AddLibrary('ws2_32.lib'); + + my $pgreceivexlog = AddSimpleFrontend('pg_basebackup', 1); + $pgreceivexlog->{name} = 'pg_receivexlog'; + $pgreceivexlog->AddFile('src\bin\pg_basebackup\pg_receivexlog.c'); + $pgreceivexlog->AddLibrary('ws2_32.lib'); my $pgconfig = AddSimpleFrontend('pg_config');